manager.rs 10.0 KB


  1. use crate::{
  2. core::{revision::DocumentRevisionCache, RevisionRecord},
  3. errors::FlowyError,
  4. };
  5. use bytes::Bytes;
  6. use dashmap::DashMap;
  7. use flowy_collaboration::{
  8. entities::{
  9. doc::DocumentInfo,
  10. revision::{RepeatedRevision, Revision, RevisionRange, RevisionState},
  11. },
  12. util::{make_delta_from_revisions, md5, pair_rev_id_from_revisions, RevIdCounter},
  13. };
  14. use flowy_error::FlowyResult;
  15. use futures_util::{future, stream, stream::StreamExt};
  16. use lib_infra::future::FutureResult;
  17. use lib_ot::{core::Operation, errors::OTError, rich_text::RichTextDelta};
  18. use std::{collections::VecDeque, sync::Arc};
  19. use tokio::sync::RwLock;
  20. pub trait RevisionServer: Send + Sync {
  21. fn fetch_document(&self, doc_id: &str) -> FutureResult<DocumentInfo, FlowyError>;
  22. }
  23. pub struct DocumentRevisionManager {
  24. pub(crate) doc_id: String,
  25. user_id: String,
  26. rev_id_counter: RevIdCounter,
  27. cache: Arc<DocumentRevisionCache>,
  28. sync_seq: Arc<RevisionSyncSequence>,
  29. }
  30. impl DocumentRevisionManager {
  31. pub fn new(user_id: &str, doc_id: &str, cache: Arc<DocumentRevisionCache>) -> Self {
  32. let rev_id_counter = RevIdCounter::new(0);
  33. let sync_seq = Arc::new(RevisionSyncSequence::new());
  34. Self {
  35. doc_id: doc_id.to_string(),
  36. user_id: user_id.to_owned(),
  37. rev_id_counter,
  38. cache,
  39. sync_seq,
  40. }
  41. }
  42. pub async fn load_document(&mut self, server: Arc<dyn RevisionServer>) -> FlowyResult<RichTextDelta> {
  43. let revisions = RevisionLoader {
  44. doc_id: self.doc_id.clone(),
  45. user_id: self.user_id.clone(),
  46. server,
  47. cache: self.cache.clone(),
  48. }
  49. .load()
  50. .await?;
  51. let doc = mk_doc_from_revisions(&self.doc_id, revisions)?;
  52. self.rev_id_counter.set(doc.rev_id);
  53. Ok(doc.delta()?)
  54. }
  55. #[tracing::instrument(level = "debug", skip(self, revisions), err)]
  56. pub async fn reset_document(&self, revisions: RepeatedRevision) -> FlowyResult<()> {
  57. let rev_id = pair_rev_id_from_revisions(&revisions).1;
  58. let _ = self
  59. .cache
  60. .reset_with_revisions(&self.doc_id, revisions.into_inner())
  61. .await?;
  62. self.rev_id_counter.set(rev_id);
  63. Ok(())
  64. }
  65. #[tracing::instrument(level = "debug", skip(self, revision), err)]
  66. pub async fn add_remote_revision(&self, revision: &Revision) -> Result<(), FlowyError> {
  67. if revision.delta_data.is_empty() {
  68. return Err(FlowyError::internal().context("Delta data should be empty"));
  69. }
  70. let _ = self.cache.add(revision.clone(), RevisionState::Ack, true).await?;
  71. self.rev_id_counter.set(revision.rev_id);
  72. Ok(())
  73. }
  74. #[tracing::instrument(level = "debug", skip(self, revision))]
  75. pub async fn add_local_revision(&self, revision: &Revision) -> Result<(), FlowyError> {
  76. if revision.delta_data.is_empty() {
  77. return Err(FlowyError::internal().context("Delta data should be empty"));
  78. }
  79. let record = self.cache.add(revision.clone(), RevisionState::Local, true).await?;
  80. self.sync_seq.add_revision(record).await?;
  81. Ok(())
  82. }
  83. #[tracing::instrument(level = "debug", skip(self), err)]
  84. pub async fn ack_revision(&self, rev_id: i64) -> Result<(), FlowyError> {
  85. if self.sync_seq.ack(&rev_id).await.is_ok() {
  86. self.cache.ack(rev_id).await;
  87. }
  88. Ok(())
  89. }
  90. pub fn rev_id(&self) -> i64 {
  91. self.rev_id_counter.value()
  92. }
  93. pub fn set_rev_id(&self, rev_id: i64) {
  94. self.rev_id_counter.set(rev_id);
  95. }
  96. pub fn next_rev_id_pair(&self) -> (i64, i64) {
  97. let cur = self.rev_id_counter.value();
  98. let next = self.rev_id_counter.next();
  99. (cur, next)
  100. }
  101. pub async fn get_revisions_in_range(&self, range: RevisionRange) -> Result<Vec<Revision>, FlowyError> {
  102. debug_assert!(range.doc_id == self.doc_id);
  103. let revisions = self.cache.revisions_in_range(range.clone()).await?;
  104. Ok(revisions)
  105. }
  106. pub fn next_sync_revision(&self) -> FutureResult<Option<Revision>, FlowyError> {
  107. let sync_seq = self.sync_seq.clone();
  108. let cache = self.cache.clone();
  109. FutureResult::new(async move {
  110. match sync_seq.next_sync_revision().await {
  111. None => match sync_seq.next_sync_rev_id().await {
  112. None => Ok(None),
  113. Some(rev_id) => Ok(cache.get(rev_id).await.map(|record| record.revision)),
  114. },
  115. Some((_, record)) => Ok(Some(record.revision)),
  116. }
  117. })
  118. }
  119. pub async fn latest_revision(&self) -> Revision {
  120. self.cache.latest_revision().await
  121. }
  122. pub async fn get_revision(&self, rev_id: i64) -> Option<Revision> {
  123. self.cache.get(rev_id).await.map(|record| record.revision)
  124. }
  125. }
  126. struct RevisionSyncSequence {
  127. revs_map: Arc<DashMap<i64, RevisionRecord>>,
  128. local_revs: Arc<RwLock<VecDeque<i64>>>,
  129. }
  130. impl std::default::Default for RevisionSyncSequence {
  131. fn default() -> Self {
  132. let local_revs = Arc::new(RwLock::new(VecDeque::new()));
  133. RevisionSyncSequence {
  134. revs_map: Arc::new(DashMap::new()),
  135. local_revs,
  136. }
  137. }
  138. }
  139. impl RevisionSyncSequence {
  140. fn new() -> Self {
  141. RevisionSyncSequence::default()
  142. }
  143. async fn add_revision(&self, record: RevisionRecord) -> Result<(), OTError> {
  144. // The last revision's rev_id must be greater than the new one.
  145. if let Some(rev_id) = self.local_revs.read().await.back() {
  146. if *rev_id >= record.revision.rev_id {
  147. return Err(OTError::revision_id_conflict()
  148. .context(format!("The new revision's id must be greater than {}", rev_id)));
  149. }
  150. }
  151. self.local_revs.write().await.push_back(record.revision.rev_id);
  152. self.revs_map.insert(record.revision.rev_id, record);
  153. Ok(())
  154. }
  155. async fn ack(&self, rev_id: &i64) -> FlowyResult<()> {
  156. if let Some(pop_rev_id) = self.next_sync_rev_id().await {
  157. if &pop_rev_id != rev_id {
  158. let desc = format!(
  159. "The ack rev_id:{} is not equal to the current rev_id:{}",
  160. rev_id, pop_rev_id
  161. );
  162. return Err(FlowyError::internal().context(desc));
  163. }
  164. tracing::trace!("{} revision finish synchronizing", pop_rev_id);
  165. self.revs_map.remove(&pop_rev_id);
  166. let _ = self.local_revs.write().await.pop_front();
  167. }
  168. Ok(())
  169. }
  170. async fn next_sync_revision(&self) -> Option<(i64, RevisionRecord)> {
  171. match self.local_revs.read().await.front() {
  172. None => None,
  173. Some(rev_id) => self.revs_map.get(rev_id).map(|r| (*r.key(), r.value().clone())),
  174. }
  175. }
  176. async fn next_sync_rev_id(&self) -> Option<i64> {
  177. self.local_revs.read().await.front().copied()
  178. }
  179. }
  180. struct RevisionLoader {
  181. doc_id: String,
  182. user_id: String,
  183. server: Arc<dyn RevisionServer>,
  184. cache: Arc<DocumentRevisionCache>,
  185. }
  186. impl RevisionLoader {
  187. async fn load(&self) -> Result<Vec<Revision>, FlowyError> {
  188. let records = self.cache.batch_get(&self.doc_id)?;
  189. let revisions: Vec<Revision>;
  190. if records.is_empty() {
  191. let doc = self.server.fetch_document(&self.doc_id).await?;
  192. let delta_data = Bytes::from(doc.text.clone());
  193. let doc_md5 = md5(&delta_data);
  194. let revision = Revision::new(
  195. &doc.doc_id,
  196. doc.base_rev_id,
  197. doc.rev_id,
  198. delta_data,
  199. &self.user_id,
  200. doc_md5,
  201. );
  202. let _ = self.cache.add(revision.clone(), RevisionState::Ack, true).await?;
  203. revisions = vec![revision];
  204. } else {
  205. // Sync the records if their state is RevisionState::Local.
  206. stream::iter(records.clone())
  207. .filter(|record| future::ready(record.state == RevisionState::Local))
  208. .for_each(|record| async move {
  209. match self.cache.add(record.revision, record.state, false).await {
  210. Ok(_) => {}
  211. Err(e) => tracing::error!("{}", e),
  212. }
  213. })
  214. .await;
  215. revisions = records.into_iter().map(|record| record.revision).collect::<_>();
  216. }
  217. Ok(revisions)
  218. }
  219. }
  220. fn mk_doc_from_revisions(doc_id: &str, revisions: Vec<Revision>) -> FlowyResult<DocumentInfo> {
  221. let (base_rev_id, rev_id) = revisions.last().unwrap().pair_rev_id();
  222. let mut delta = make_delta_from_revisions(revisions)?;
  223. correct_delta(&mut delta);
  224. Result::<DocumentInfo, FlowyError>::Ok(DocumentInfo {
  225. doc_id: doc_id.to_owned(),
  226. text: delta.to_json(),
  227. rev_id,
  228. base_rev_id,
  229. })
  230. }
  231. // quill-editor requires the delta should end with '\n' and only contains the
  232. // insert operation. The function, correct_delta maybe be removed in the future.
  233. fn correct_delta(delta: &mut RichTextDelta) {
  234. if let Some(op) = delta.ops.last() {
  235. let op_data = op.get_data();
  236. if !op_data.ends_with('\n') {
  237. log::warn!("The document must end with newline. Correcting it by inserting newline op");
  238. delta.ops.push(Operation::Insert("\n".into()));
  239. }
  240. }
  241. if let Some(op) = delta.ops.iter().find(|op| !op.is_insert()) {
  242. log::warn!("The document can only contains insert operations, but found {:?}", op);
  243. delta.ops.retain(|op| op.is_insert());
  244. }
  245. }
  246. #[cfg(feature = "flowy_unit_test")]
  247. impl RevisionSyncSequence {
  248. #[allow(dead_code)]
  249. pub fn revs_map(&self) -> Arc<DashMap<i64, RevisionRecord>> {
  250. self.revs_map.clone()
  251. }
  252. #[allow(dead_code)]
  253. pub fn pending_revs(&self) -> Arc<RwLock<VecDeque<i64>>> {
  254. self.local_revs.clone()
  255. }
  256. }
  257. #[cfg(feature = "flowy_unit_test")]
  258. impl DocumentRevisionManager {
  259. pub fn revision_cache(&self) -> Arc<DocumentRevisionCache> {
  260. self.cache.clone()
  261. }
  262. }