manager.rs 9.9 KB


  1. use crate::{
  2. core::{revision::DocumentRevisionCache, RevisionRecord},
  3. errors::FlowyError,
  4. };
  5. use bytes::Bytes;
  6. use dashmap::DashMap;
  7. use flowy_collaboration::{
  8. entities::{
  9. doc::DocumentInfo,
  10. revision::{RepeatedRevision, Revision, RevisionRange, RevisionState},
  11. },
  12. util::{make_delta_from_revisions, md5, pair_rev_id_from_revisions, RevIdCounter},
  13. };
  14. use flowy_error::FlowyResult;
  15. use futures_util::{future, stream, stream::StreamExt};
  16. use lib_infra::future::FutureResult;
  17. use lib_ot::{core::Operation, errors::OTError, rich_text::RichTextDelta};
  18. use std::{collections::VecDeque, sync::Arc};
  19. use tokio::sync::RwLock;
  20. pub trait RevisionServer: Send + Sync {
  21. fn fetch_document(&self, doc_id: &str) -> FutureResult<DocumentInfo, FlowyError>;
  22. }
  23. pub struct DocumentRevisionManager {
  24. pub(crate) doc_id: String,
  25. user_id: String,
  26. rev_id_counter: RevIdCounter,
  27. cache: Arc<DocumentRevisionCache>,
  28. sync_seq: Arc<RevisionSyncSequence>,
  29. }
  30. impl DocumentRevisionManager {
  31. pub fn new(user_id: &str, doc_id: &str, cache: Arc<DocumentRevisionCache>) -> Self {
  32. let rev_id_counter = RevIdCounter::new(0);
  33. let sync_seq = Arc::new(RevisionSyncSequence::new());
  34. Self {
  35. doc_id: doc_id.to_string(),
  36. user_id: user_id.to_owned(),
  37. rev_id_counter,
  38. cache,
  39. sync_seq,
  40. }
  41. }
  42. pub async fn load_document(&mut self, server: Arc<dyn RevisionServer>) -> FlowyResult<RichTextDelta> {
  43. let revisions = RevisionLoader {
  44. doc_id: self.doc_id.clone(),
  45. user_id: self.user_id.clone(),
  46. server,
  47. cache: self.cache.clone(),
  48. }
  49. .load()
  50. .await?;
  51. let doc = mk_doc_from_revisions(&self.doc_id, revisions)?;
  52. self.rev_id_counter.set(doc.rev_id);
  53. Ok(doc.delta()?)
  54. }
  55. #[tracing::instrument(level = "debug", skip(self, revisions), err)]
  56. pub async fn reset_document(&self, revisions: RepeatedRevision) -> FlowyResult<()> {
  57. let rev_id = pair_rev_id_from_revisions(&revisions).1;
  58. let _ = self
  59. .cache
  60. .reset_with_revisions(&self.doc_id, revisions.into_inner())
  61. .await?;
  62. self.rev_id_counter.set(rev_id);
  63. Ok(())
  64. }
  65. #[tracing::instrument(level = "debug", skip(self, revision), err)]
  66. pub async fn add_remote_revision(&self, revision: &Revision) -> Result<(), FlowyError> {
  67. if revision.delta_data.is_empty() {
  68. return Err(FlowyError::internal().context("Delta data should be empty"));
  69. }
  70. let _ = self.cache.add(revision.clone(), RevisionState::Ack, true).await?;
  71. self.rev_id_counter.set(revision.rev_id);
  72. Ok(())
  73. }
  74. #[tracing::instrument(level = "debug", skip(self, revision))]
  75. pub async fn add_local_revision(&self, revision: &Revision) -> Result<(), FlowyError> {
  76. if revision.delta_data.is_empty() {
  77. return Err(FlowyError::internal().context("Delta data should be empty"));
  78. }
  79. let record = self.cache.add(revision.clone(), RevisionState::Local, true).await?;
  80. self.sync_seq.add_revision(record).await?;
  81. Ok(())
  82. }
  83. #[tracing::instrument(level = "debug", skip(self), err)]
  84. pub async fn ack_revision(&self, rev_id: i64) -> Result<(), FlowyError> {
  85. if self.sync_seq.ack(&rev_id).await.is_ok() {
  86. self.cache.ack(rev_id).await;
  87. }
  88. Ok(())
  89. }
  90. pub fn rev_id(&self) -> i64 { self.rev_id_counter.value() }
  91. pub fn set_rev_id(&self, rev_id: i64) { self.rev_id_counter.set(rev_id); }
  92. pub fn next_rev_id_pair(&self) -> (i64, i64) {
  93. let cur = self.rev_id_counter.value();
  94. let next = self.rev_id_counter.next();
  95. (cur, next)
  96. }
  97. pub async fn get_revisions_in_range(&self, range: RevisionRange) -> Result<Vec<Revision>, FlowyError> {
  98. debug_assert!(range.doc_id == self.doc_id);
  99. let revisions = self.cache.revisions_in_range(range.clone()).await?;
  100. Ok(revisions)
  101. }
  102. pub fn next_sync_revision(&self) -> FutureResult<Option<Revision>, FlowyError> {
  103. let sync_seq = self.sync_seq.clone();
  104. let cache = self.cache.clone();
  105. FutureResult::new(async move {
  106. match sync_seq.next_sync_revision().await {
  107. None => match sync_seq.next_sync_rev_id().await {
  108. None => Ok(None),
  109. Some(rev_id) => Ok(cache.get(rev_id).await.map(|record| record.revision)),
  110. },
  111. Some((_, record)) => Ok(Some(record.revision)),
  112. }
  113. })
  114. }
  115. pub async fn latest_revision(&self) -> Revision { self.cache.latest_revision().await }
  116. pub async fn get_revision(&self, rev_id: i64) -> Option<Revision> {
  117. self.cache.get(rev_id).await.map(|record| record.revision)
  118. }
  119. }
  120. struct RevisionSyncSequence {
  121. revs_map: Arc<DashMap<i64, RevisionRecord>>,
  122. local_revs: Arc<RwLock<VecDeque<i64>>>,
  123. }
  124. impl std::default::Default for RevisionSyncSequence {
  125. fn default() -> Self {
  126. let local_revs = Arc::new(RwLock::new(VecDeque::new()));
  127. RevisionSyncSequence {
  128. revs_map: Arc::new(DashMap::new()),
  129. local_revs,
  130. }
  131. }
  132. }
  133. impl RevisionSyncSequence {
  134. fn new() -> Self { RevisionSyncSequence::default() }
  135. async fn add_revision(&self, record: RevisionRecord) -> Result<(), OTError> {
  136. // The last revision's rev_id must be greater than the new one.
  137. if let Some(rev_id) = self.local_revs.read().await.back() {
  138. if *rev_id >= record.revision.rev_id {
  139. return Err(OTError::revision_id_conflict()
  140. .context(format!("The new revision's id must be greater than {}", rev_id)));
  141. }
  142. }
  143. self.local_revs.write().await.push_back(record.revision.rev_id);
  144. self.revs_map.insert(record.revision.rev_id, record);
  145. Ok(())
  146. }
  147. async fn ack(&self, rev_id: &i64) -> FlowyResult<()> {
  148. if let Some(pop_rev_id) = self.next_sync_rev_id().await {
  149. if &pop_rev_id != rev_id {
  150. let desc = format!(
  151. "The ack rev_id:{} is not equal to the current rev_id:{}",
  152. rev_id, pop_rev_id
  153. );
  154. return Err(FlowyError::internal().context(desc));
  155. }
  156. tracing::trace!("{} revision finish synchronizing", pop_rev_id);
  157. self.revs_map.remove(&pop_rev_id);
  158. let _ = self.local_revs.write().await.pop_front();
  159. }
  160. Ok(())
  161. }
  162. async fn next_sync_revision(&self) -> Option<(i64, RevisionRecord)> {
  163. match self.local_revs.read().await.front() {
  164. None => None,
  165. Some(rev_id) => self.revs_map.get(rev_id).map(|r| (*r.key(), r.value().clone())),
  166. }
  167. }
  168. async fn next_sync_rev_id(&self) -> Option<i64> { self.local_revs.read().await.front().copied() }
  169. }
  170. struct RevisionLoader {
  171. doc_id: String,
  172. user_id: String,
  173. server: Arc<dyn RevisionServer>,
  174. cache: Arc<DocumentRevisionCache>,
  175. }
  176. impl RevisionLoader {
  177. async fn load(&self) -> Result<Vec<Revision>, FlowyError> {
  178. let records = self.cache.batch_get(&self.doc_id)?;
  179. let revisions: Vec<Revision>;
  180. if records.is_empty() {
  181. let doc = self.server.fetch_document(&self.doc_id).await?;
  182. let delta_data = Bytes::from(doc.text.clone());
  183. let doc_md5 = md5(&delta_data);
  184. let revision = Revision::new(
  185. &doc.doc_id,
  186. doc.base_rev_id,
  187. doc.rev_id,
  188. delta_data,
  189. &self.user_id,
  190. doc_md5,
  191. );
  192. let _ = self.cache.add(revision.clone(), RevisionState::Ack, true).await?;
  193. revisions = vec![revision];
  194. } else {
  195. // Sync the records if their state is RevisionState::Local.
  196. stream::iter(records.clone())
  197. .filter(|record| future::ready(record.state == RevisionState::Local))
  198. .for_each(|record| async move {
  199. match self.cache.add(record.revision, record.state, false).await {
  200. Ok(_) => {},
  201. Err(e) => tracing::error!("{}", e),
  202. }
  203. })
  204. .await;
  205. revisions = records.into_iter().map(|record| record.revision).collect::<_>();
  206. }
  207. Ok(revisions)
  208. }
  209. }
  210. fn mk_doc_from_revisions(doc_id: &str, revisions: Vec<Revision>) -> FlowyResult<DocumentInfo> {
  211. let (base_rev_id, rev_id) = revisions.last().unwrap().pair_rev_id();
  212. let mut delta = make_delta_from_revisions(revisions)?;
  213. correct_delta(&mut delta);
  214. Result::<DocumentInfo, FlowyError>::Ok(DocumentInfo {
  215. doc_id: doc_id.to_owned(),
  216. text: delta.to_json(),
  217. rev_id,
  218. base_rev_id,
  219. })
  220. }
  221. // quill-editor requires the delta should end with '\n' and only contains the
  222. // insert operation. The function, correct_delta maybe be removed in the future.
  223. fn correct_delta(delta: &mut RichTextDelta) {
  224. if let Some(op) = delta.ops.last() {
  225. let op_data = op.get_data();
  226. if !op_data.ends_with('\n') {
  227. log::warn!("The document must end with newline. Correcting it by inserting newline op");
  228. delta.ops.push(Operation::Insert("\n".into()));
  229. }
  230. }
  231. if let Some(op) = delta.ops.iter().find(|op| !op.is_insert()) {
  232. log::warn!("The document can only contains insert operations, but found {:?}", op);
  233. delta.ops.retain(|op| op.is_insert());
  234. }
  235. }
  236. #[cfg(feature = "flowy_unit_test")]
  237. impl RevisionSyncSequence {
  238. #[allow(dead_code)]
  239. pub fn revs_map(&self) -> Arc<DashMap<i64, RevisionRecord>> { self.revs_map.clone() }
  240. #[allow(dead_code)]
  241. pub fn pending_revs(&self) -> Arc<RwLock<VecDeque<i64>>> { self.local_revs.clone() }
  242. }
  243. #[cfg(feature = "flowy_unit_test")]
  244. impl DocumentRevisionManager {
  245. pub fn revision_cache(&self) -> Arc<DocumentRevisionCache> { self.cache.clone() }
  246. }