manager.rs 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. use crate::{errors::FlowyError, services::doc::revision::RevisionCache};
  2. use bytes::Bytes;
  3. use flowy_collaboration::{
  4. entities::{
  5. doc::DocumentInfo,
  6. revision::{RevState, RevType, Revision, RevisionRange},
  7. },
  8. util::{md5, RevIdCounter},
  9. };
  10. use flowy_error::FlowyResult;
  11. use lib_infra::future::FutureResult;
  12. use lib_ot::{
  13. core::{Operation, OperationTransformable},
  14. rich_text::RichTextDelta,
  15. };
  16. use std::sync::Arc;
  17. pub trait RevisionServer: Send + Sync {
  18. fn fetch_document(&self, doc_id: &str) -> FutureResult<DocumentInfo, FlowyError>;
  19. }
  20. pub struct RevisionManager {
  21. doc_id: String,
  22. user_id: String,
  23. rev_id_counter: RevIdCounter,
  24. cache: Arc<RevisionCache>,
  25. }
  26. impl RevisionManager {
  27. pub fn new(user_id: &str, doc_id: &str, cache: Arc<RevisionCache>) -> Self {
  28. let rev_id_counter = RevIdCounter::new(0);
  29. Self {
  30. doc_id: doc_id.to_string(),
  31. user_id: user_id.to_owned(),
  32. rev_id_counter,
  33. cache,
  34. }
  35. }
  36. pub async fn load_document(&mut self, server: Arc<dyn RevisionServer>) -> FlowyResult<RichTextDelta> {
  37. let revisions = RevisionLoader {
  38. doc_id: self.doc_id.clone(),
  39. user_id: self.user_id.clone(),
  40. server,
  41. cache: self.cache.clone(),
  42. }
  43. .load()
  44. .await?;
  45. let doc = mk_doc_from_revisions(&self.doc_id, revisions)?;
  46. self.update_rev_id_counter_value(doc.rev_id);
  47. Ok(doc.delta()?)
  48. }
  49. pub async fn add_remote_revision(&self, revision: &Revision) -> Result<(), FlowyError> {
  50. let _ = self.cache.add_remote_revision(revision.clone()).await?;
  51. Ok(())
  52. }
  53. pub async fn add_local_revision(&self, revision: &Revision) -> Result<(), FlowyError> {
  54. let _ = self.cache.add_local_revision(revision.clone()).await?;
  55. Ok(())
  56. }
  57. pub async fn ack_revision(&self, rev_id: i64) -> Result<(), FlowyError> {
  58. self.cache.ack_revision(rev_id).await;
  59. Ok(())
  60. }
  61. pub fn rev_id(&self) -> i64 { self.rev_id_counter.value() }
  62. pub fn next_rev_id(&self) -> (i64, i64) {
  63. let cur = self.rev_id_counter.value();
  64. let next = self.rev_id_counter.next();
  65. (cur, next)
  66. }
  67. pub fn update_rev_id_counter_value(&self, rev_id: i64) { self.rev_id_counter.set(rev_id); }
  68. pub async fn mk_revisions(&self, range: RevisionRange) -> Result<Revision, FlowyError> {
  69. debug_assert!(range.doc_id == self.doc_id);
  70. let revisions = self.cache.revisions_in_range(range.clone()).await?;
  71. let mut new_delta = RichTextDelta::new();
  72. // TODO: generate delta from revision should be wrapped into function.
  73. for revision in revisions {
  74. match RichTextDelta::from_bytes(revision.delta_data) {
  75. Ok(delta) => {
  76. new_delta = new_delta.compose(&delta)?;
  77. },
  78. Err(e) => log::error!("{}", e),
  79. }
  80. }
  81. let delta_data = new_delta.to_bytes();
  82. let md5 = md5(&delta_data);
  83. let revision = Revision::new(
  84. &self.doc_id,
  85. range.start,
  86. range.end,
  87. delta_data,
  88. RevType::Remote,
  89. &self.user_id,
  90. md5,
  91. );
  92. Ok(revision)
  93. }
  94. pub fn next_sync_revision(&self) -> FutureResult<Option<Revision>, FlowyError> { self.cache.next_sync_revision() }
  95. pub async fn latest_revision(&self) -> Revision { self.cache.latest_revision().await }
  96. }
  97. #[cfg(feature = "flowy_unit_test")]
  98. impl RevisionManager {
  99. pub fn revision_cache(&self) -> Arc<RevisionCache> { self.cache.clone() }
  100. }
  101. struct RevisionLoader {
  102. doc_id: String,
  103. user_id: String,
  104. server: Arc<dyn RevisionServer>,
  105. cache: Arc<RevisionCache>,
  106. }
  107. impl RevisionLoader {
  108. async fn load(&self) -> Result<Vec<Revision>, FlowyError> {
  109. let records = self.cache.disk_cache.read_revisions(&self.doc_id)?;
  110. let revisions: Vec<Revision>;
  111. if records.is_empty() {
  112. let doc = self.server.fetch_document(&self.doc_id).await?;
  113. let delta_data = Bytes::from(doc.text.clone());
  114. let doc_md5 = md5(&delta_data);
  115. let revision = Revision::new(
  116. &doc.id,
  117. doc.base_rev_id,
  118. doc.rev_id,
  119. delta_data,
  120. RevType::Remote,
  121. &self.user_id,
  122. doc_md5,
  123. );
  124. let _ = self.cache.add_local_revision(revision.clone()).await?;
  125. revisions = vec![revision];
  126. } else {
  127. for record in &records {
  128. match record.state {
  129. RevState::StateLocal => match self.cache.add_local_revision(record.revision.clone()).await {
  130. Ok(_) => {},
  131. Err(e) => tracing::error!("{}", e),
  132. },
  133. RevState::Ack => {},
  134. }
  135. }
  136. revisions = records.into_iter().map(|record| record.revision).collect::<_>();
  137. }
  138. Ok(revisions)
  139. }
  140. }
  141. fn mk_doc_from_revisions(doc_id: &str, revisions: Vec<Revision>) -> FlowyResult<DocumentInfo> {
  142. let (base_rev_id, rev_id) = revisions.last().unwrap().pair_rev_id();
  143. let mut delta = RichTextDelta::new();
  144. for (_, revision) in revisions.into_iter().enumerate() {
  145. match RichTextDelta::from_bytes(revision.delta_data) {
  146. Ok(local_delta) => {
  147. delta = delta.compose(&local_delta)?;
  148. },
  149. Err(e) => {
  150. tracing::error!("Deserialize delta from revision failed: {}", e);
  151. },
  152. }
  153. }
  154. correct_delta_if_need(&mut delta);
  155. Result::<DocumentInfo, FlowyError>::Ok(DocumentInfo {
  156. id: doc_id.to_owned(),
  157. text: delta.to_json(),
  158. rev_id,
  159. base_rev_id,
  160. })
  161. }
  162. fn correct_delta_if_need(delta: &mut RichTextDelta) {
  163. if delta.ops.last().is_none() {
  164. return;
  165. }
  166. let data = delta.ops.last().as_ref().unwrap().get_data();
  167. if !data.ends_with('\n') {
  168. log::error!("❌The op must end with newline. Correcting it by inserting newline op");
  169. delta.ops.push(Operation::Insert("\n".into()));
  170. }
  171. }