manager.rs 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. use crate::{errors::FlowyError, services::doc::revision::RevisionCache};
  2. use bytes::Bytes;
  3. use flowy_collaboration::{
  4. entities::doc::Doc,
  5. util::{md5, RevIdCounter},
  6. };
  7. use flowy_error::FlowyResult;
  8. use lib_infra::future::FutureResult;
  9. use lib_ot::{
  10. core::{Operation, OperationTransformable},
  11. revision::{RevState, RevType, Revision, RevisionRange},
  12. rich_text::RichTextDelta,
  13. };
  14. use std::sync::Arc;
  15. pub trait RevisionServer: Send + Sync {
  16. fn fetch_document(&self, doc_id: &str) -> FutureResult<Doc, FlowyError>;
  17. }
  18. pub struct RevisionManager {
  19. doc_id: String,
  20. user_id: String,
  21. rev_id_counter: RevIdCounter,
  22. cache: Arc<RevisionCache>,
  23. }
  24. impl RevisionManager {
  25. pub fn new(user_id: &str, doc_id: &str, cache: Arc<RevisionCache>) -> Self {
  26. let rev_id_counter = RevIdCounter::new(0);
  27. Self {
  28. doc_id: doc_id.to_string(),
  29. user_id: user_id.to_owned(),
  30. rev_id_counter,
  31. cache,
  32. }
  33. }
  34. pub async fn load_document(&mut self, server: Arc<dyn RevisionServer>) -> FlowyResult<RichTextDelta> {
  35. let revisions = RevisionLoader {
  36. doc_id: self.doc_id.clone(),
  37. user_id: self.user_id.clone(),
  38. server,
  39. cache: self.cache.clone(),
  40. }
  41. .load()
  42. .await?;
  43. let doc = mk_doc_from_revisions(&self.doc_id, revisions)?;
  44. self.update_rev_id_counter_value(doc.rev_id);
  45. Ok(doc.delta()?)
  46. }
  47. pub async fn add_remote_revision(&self, revision: &Revision) -> Result<(), FlowyError> {
  48. let _ = self.cache.add_remote_revision(revision.clone()).await?;
  49. Ok(())
  50. }
  51. pub async fn add_local_revision(&self, revision: &Revision) -> Result<(), FlowyError> {
  52. let _ = self.cache.add_local_revision(revision.clone()).await?;
  53. Ok(())
  54. }
  55. pub async fn ack_revision(&self, rev_id: i64) -> Result<(), FlowyError> {
  56. self.cache.ack_revision(rev_id).await;
  57. Ok(())
  58. }
  59. pub fn rev_id(&self) -> i64 { self.rev_id_counter.value() }
  60. pub fn next_rev_id(&self) -> (i64, i64) {
  61. let cur = self.rev_id_counter.value();
  62. let next = self.rev_id_counter.next();
  63. (cur, next)
  64. }
  65. pub fn update_rev_id_counter_value(&self, rev_id: i64) { self.rev_id_counter.set(rev_id); }
  66. pub async fn mk_revisions(&self, range: RevisionRange) -> Result<Revision, FlowyError> {
  67. debug_assert!(range.doc_id == self.doc_id);
  68. let revisions = self.cache.revisions_in_range(range.clone()).await?;
  69. let mut new_delta = RichTextDelta::new();
  70. for revision in revisions {
  71. match RichTextDelta::from_bytes(revision.delta_data) {
  72. Ok(delta) => {
  73. new_delta = new_delta.compose(&delta)?;
  74. },
  75. Err(e) => log::error!("{}", e),
  76. }
  77. }
  78. let delta_data = new_delta.to_bytes();
  79. let md5 = md5(&delta_data);
  80. let revision = Revision::new(
  81. &self.doc_id,
  82. range.start,
  83. range.end,
  84. delta_data,
  85. RevType::Remote,
  86. &self.user_id,
  87. md5,
  88. );
  89. Ok(revision)
  90. }
  91. pub fn next_sync_revision(&self) -> FutureResult<Option<Revision>, FlowyError> { self.cache.next_sync_revision() }
  92. pub async fn latest_revision(&self) -> Revision { self.cache.latest_revision().await }
  93. }
  94. #[cfg(feature = "flowy_unit_test")]
  95. impl RevisionManager {
  96. pub fn revision_cache(&self) -> Arc<RevisionCache> { self.cache.clone() }
  97. }
  98. struct RevisionLoader {
  99. doc_id: String,
  100. user_id: String,
  101. server: Arc<dyn RevisionServer>,
  102. cache: Arc<RevisionCache>,
  103. }
  104. impl RevisionLoader {
  105. async fn load(&self) -> Result<Vec<Revision>, FlowyError> {
  106. let records = self.cache.disk_cache.read_revisions(&self.doc_id)?;
  107. let revisions: Vec<Revision>;
  108. if records.is_empty() {
  109. let doc = self.server.fetch_document(&self.doc_id).await?;
  110. let delta_data = Bytes::from(doc.data.clone());
  111. let doc_md5 = md5(&delta_data);
  112. let revision = Revision::new(
  113. &doc.id,
  114. doc.base_rev_id,
  115. doc.rev_id,
  116. delta_data,
  117. RevType::Remote,
  118. &self.user_id,
  119. doc_md5,
  120. );
  121. let _ = self.cache.add_local_revision(revision.clone()).await?;
  122. revisions = vec![revision];
  123. } else {
  124. for record in &records {
  125. match record.state {
  126. RevState::StateLocal => match self.cache.add_local_revision(record.revision.clone()).await {
  127. Ok(_) => {},
  128. Err(e) => tracing::error!("{}", e),
  129. },
  130. RevState::Ack => {},
  131. }
  132. }
  133. revisions = records.into_iter().map(|record| record.revision).collect::<_>();
  134. }
  135. Ok(revisions)
  136. }
  137. }
  138. fn mk_doc_from_revisions(doc_id: &str, revisions: Vec<Revision>) -> FlowyResult<Doc> {
  139. let (base_rev_id, rev_id) = revisions.last().unwrap().pair_rev_id();
  140. let mut delta = RichTextDelta::new();
  141. for (_, revision) in revisions.into_iter().enumerate() {
  142. match RichTextDelta::from_bytes(revision.delta_data) {
  143. Ok(local_delta) => {
  144. delta = delta.compose(&local_delta)?;
  145. },
  146. Err(e) => {
  147. tracing::error!("Deserialize delta from revision failed: {}", e);
  148. },
  149. }
  150. }
  151. correct_delta_if_need(&mut delta);
  152. Result::<Doc, FlowyError>::Ok(Doc {
  153. id: doc_id.to_owned(),
  154. data: delta.to_json(),
  155. rev_id,
  156. base_rev_id,
  157. })
  158. }
  159. fn correct_delta_if_need(delta: &mut RichTextDelta) {
  160. if delta.ops.last().is_none() {
  161. return;
  162. }
  163. let data = delta.ops.last().as_ref().unwrap().get_data();
  164. if !data.ends_with('\n') {
  165. log::error!("❌The op must end with newline. Correcting it by inserting newline op");
  166. delta.ops.push(Operation::Insert("\n".into()));
  167. }
  168. }