manager.rs 9.8 KB


  1. use crate::{
  2. errors::FlowyError,
  3. services::doc::{revision::RevisionCache, RevisionRecord},
  4. };
  5. use bytes::Bytes;
  6. use dashmap::DashMap;
  7. use flowy_collaboration::{
  8. entities::{
  9. doc::DocumentInfo,
  10. revision::{RepeatedRevision, Revision, RevisionRange, RevisionState},
  11. },
  12. util::{md5, pair_rev_id_from_revisions, RevIdCounter},
  13. };
  14. use flowy_error::FlowyResult;
  15. use futures_util::{future, stream, stream::StreamExt};
  16. use lib_infra::future::FutureResult;
  17. use lib_ot::{
  18. core::{Operation, OperationTransformable},
  19. errors::OTError,
  20. rich_text::RichTextDelta,
  21. };
  22. use std::{collections::VecDeque, sync::Arc};
  23. use tokio::sync::RwLock;
  24. pub trait RevisionServer: Send + Sync {
  25. fn fetch_document(&self, doc_id: &str) -> FutureResult<DocumentInfo, FlowyError>;
  26. }
  27. pub struct RevisionManager {
  28. pub(crate) doc_id: String,
  29. user_id: String,
  30. rev_id_counter: RevIdCounter,
  31. cache: Arc<RevisionCache>,
  32. sync_seq: Arc<RevisionSyncSequence>,
  33. }
  34. impl RevisionManager {
  35. pub fn new(user_id: &str, doc_id: &str, cache: Arc<RevisionCache>) -> Self {
  36. let rev_id_counter = RevIdCounter::new(0);
  37. let sync_seq = Arc::new(RevisionSyncSequence::new());
  38. Self {
  39. doc_id: doc_id.to_string(),
  40. user_id: user_id.to_owned(),
  41. rev_id_counter,
  42. cache,
  43. sync_seq,
  44. }
  45. }
  46. pub async fn load_document(&mut self, server: Arc<dyn RevisionServer>) -> FlowyResult<RichTextDelta> {
  47. let revisions = RevisionLoader {
  48. doc_id: self.doc_id.clone(),
  49. user_id: self.user_id.clone(),
  50. server,
  51. cache: self.cache.clone(),
  52. }
  53. .load()
  54. .await?;
  55. let doc = mk_doc_from_revisions(&self.doc_id, revisions)?;
  56. self.rev_id_counter.set(doc.rev_id);
  57. Ok(doc.delta()?)
  58. }
  59. #[tracing::instrument(level = "debug", skip(self, revisions), err)]
  60. pub async fn reset_document(&self, revisions: RepeatedRevision) -> FlowyResult<()> {
  61. let rev_id = pair_rev_id_from_revisions(&revisions).1;
  62. let _ = self.cache.reset_document(&self.doc_id, revisions.into_inner()).await?;
  63. self.rev_id_counter.set(rev_id);
  64. Ok(())
  65. }
  66. #[tracing::instrument(level = "debug", skip(self, revision), err)]
  67. pub async fn add_remote_revision(&self, revision: &Revision) -> Result<(), FlowyError> {
  68. if revision.delta_data.is_empty() {
  69. return Err(FlowyError::internal().context("Delta data should be empty"));
  70. }
  71. self.rev_id_counter.set(revision.rev_id);
  72. let _ = self.cache.add(revision.clone(), RevisionState::Ack, true).await?;
  73. Ok(())
  74. }
  75. #[tracing::instrument(level = "debug", skip(self, revision))]
  76. pub async fn add_local_revision(&self, revision: &Revision) -> Result<(), FlowyError> {
  77. if revision.delta_data.is_empty() {
  78. return Err(FlowyError::internal().context("Delta data should be empty"));
  79. }
  80. let record = self.cache.add(revision.clone(), RevisionState::Local, true).await?;
  81. self.sync_seq.add_revision(record).await?;
  82. Ok(())
  83. }
  84. #[tracing::instrument(level = "debug", skip(self), err)]
  85. pub async fn ack_revision(&self, rev_id: i64) -> Result<(), FlowyError> {
  86. if self.sync_seq.ack(&rev_id).await.is_ok() {
  87. self.cache.ack(rev_id).await;
  88. }
  89. Ok(())
  90. }
  91. pub fn rev_id(&self) -> i64 { self.rev_id_counter.value() }
  92. pub fn set_rev_id(&self, rev_id: i64) { self.rev_id_counter.set(rev_id); }
  93. pub fn next_rev_id_pair(&self) -> (i64, i64) {
  94. let cur = self.rev_id_counter.value();
  95. let next = self.rev_id_counter.next();
  96. (cur, next)
  97. }
  98. pub async fn get_revisions_in_range(&self, range: RevisionRange) -> Result<Vec<Revision>, FlowyError> {
  99. debug_assert!(range.doc_id == self.doc_id);
  100. let revisions = self.cache.revisions_in_range(range.clone()).await?;
  101. Ok(revisions)
  102. }
  103. pub fn next_sync_revision(&self) -> FutureResult<Option<Revision>, FlowyError> {
  104. let sync_seq = self.sync_seq.clone();
  105. let cache = self.cache.clone();
  106. FutureResult::new(async move {
  107. match sync_seq.next_sync_revision().await {
  108. None => match sync_seq.next_sync_rev_id().await {
  109. None => Ok(None),
  110. Some(rev_id) => Ok(cache.get(rev_id).await.map(|record| record.revision)),
  111. },
  112. Some((_, record)) => Ok(Some(record.revision)),
  113. }
  114. })
  115. }
  116. pub async fn latest_revision(&self) -> Revision { self.cache.latest_revision().await }
  117. pub async fn get_revision(&self, rev_id: i64) -> Option<Revision> {
  118. self.cache.get(rev_id).await.map(|record| record.revision)
  119. }
  120. }
  121. struct RevisionSyncSequence {
  122. revs_map: Arc<DashMap<i64, RevisionRecord>>,
  123. local_revs: Arc<RwLock<VecDeque<i64>>>,
  124. }
  125. impl std::default::Default for RevisionSyncSequence {
  126. fn default() -> Self {
  127. let local_revs = Arc::new(RwLock::new(VecDeque::new()));
  128. RevisionSyncSequence {
  129. revs_map: Arc::new(DashMap::new()),
  130. local_revs,
  131. }
  132. }
  133. }
  134. impl RevisionSyncSequence {
  135. fn new() -> Self { RevisionSyncSequence::default() }
  136. async fn add_revision(&self, record: RevisionRecord) -> Result<(), OTError> {
  137. // The last revision's rev_id must be greater than the new one.
  138. if let Some(rev_id) = self.local_revs.read().await.back() {
  139. if *rev_id >= record.revision.rev_id {
  140. return Err(OTError::revision_id_conflict()
  141. .context(format!("The new revision's id must be greater than {}", rev_id)));
  142. }
  143. }
  144. self.local_revs.write().await.push_back(record.revision.rev_id);
  145. self.revs_map.insert(record.revision.rev_id, record);
  146. Ok(())
  147. }
  148. async fn ack(&self, rev_id: &i64) -> FlowyResult<()> {
  149. if let Some(pop_rev_id) = self.next_sync_rev_id().await {
  150. if &pop_rev_id != rev_id {
  151. let desc = format!(
  152. "The ack rev_id:{} is not equal to the current rev_id:{}",
  153. rev_id, pop_rev_id
  154. );
  155. // tracing::error!("{}", desc);
  156. return Err(FlowyError::internal().context(desc));
  157. }
  158. tracing::debug!("pop revision {}", pop_rev_id);
  159. self.revs_map.remove(&pop_rev_id);
  160. let _ = self.local_revs.write().await.pop_front();
  161. }
  162. Ok(())
  163. }
  164. async fn next_sync_revision(&self) -> Option<(i64, RevisionRecord)> {
  165. match self.local_revs.read().await.front() {
  166. None => None,
  167. Some(rev_id) => self.revs_map.get(rev_id).map(|r| (*r.key(), r.value().clone())),
  168. }
  169. }
  170. async fn next_sync_rev_id(&self) -> Option<i64> { self.local_revs.read().await.front().copied() }
  171. }
  172. struct RevisionLoader {
  173. doc_id: String,
  174. user_id: String,
  175. server: Arc<dyn RevisionServer>,
  176. cache: Arc<RevisionCache>,
  177. }
  178. impl RevisionLoader {
  179. async fn load(&self) -> Result<Vec<Revision>, FlowyError> {
  180. let records = self.cache.batch_get(&self.doc_id)?;
  181. let revisions: Vec<Revision>;
  182. if records.is_empty() {
  183. let doc = self.server.fetch_document(&self.doc_id).await?;
  184. let delta_data = Bytes::from(doc.text.clone());
  185. let doc_md5 = md5(&delta_data);
  186. let revision = Revision::new(
  187. &doc.doc_id,
  188. doc.base_rev_id,
  189. doc.rev_id,
  190. delta_data,
  191. &self.user_id,
  192. doc_md5,
  193. );
  194. let _ = self.cache.add(revision.clone(), RevisionState::Ack, true).await?;
  195. revisions = vec![revision];
  196. } else {
  197. // Sync the records if their state is RevisionState::Local.
  198. stream::iter(records.clone())
  199. .filter(|record| future::ready(record.state == RevisionState::Local))
  200. .for_each(|record| async move {
  201. match self.cache.add(record.revision, record.state, false).await {
  202. Ok(_) => {},
  203. Err(e) => tracing::error!("{}", e),
  204. }
  205. })
  206. .await;
  207. revisions = records.into_iter().map(|record| record.revision).collect::<_>();
  208. }
  209. Ok(revisions)
  210. }
  211. }
  212. fn mk_doc_from_revisions(doc_id: &str, revisions: Vec<Revision>) -> FlowyResult<DocumentInfo> {
  213. let (base_rev_id, rev_id) = revisions.last().unwrap().pair_rev_id();
  214. let mut delta = RichTextDelta::new();
  215. for (_, revision) in revisions.into_iter().enumerate() {
  216. match RichTextDelta::from_bytes(revision.delta_data) {
  217. Ok(local_delta) => {
  218. delta = delta.compose(&local_delta)?;
  219. },
  220. Err(e) => {
  221. tracing::error!("Deserialize delta from revision failed: {}", e);
  222. },
  223. }
  224. }
  225. correct_delta_if_need(&mut delta);
  226. Result::<DocumentInfo, FlowyError>::Ok(DocumentInfo {
  227. doc_id: doc_id.to_owned(),
  228. text: delta.to_json(),
  229. rev_id,
  230. base_rev_id,
  231. })
  232. }
  233. fn correct_delta_if_need(delta: &mut RichTextDelta) {
  234. if delta.ops.last().is_none() {
  235. return;
  236. }
  237. let data = delta.ops.last().as_ref().unwrap().get_data();
  238. if !data.ends_with('\n') {
  239. log::error!("❌The op must end with newline. Correcting it by inserting newline op");
  240. delta.ops.push(Operation::Insert("\n".into()));
  241. }
  242. }
  243. #[cfg(feature = "flowy_unit_test")]
  244. impl RevisionSyncSequence {
  245. #[allow(dead_code)]
  246. pub fn revs_map(&self) -> Arc<DashMap<i64, RevisionRecord>> { self.revs_map.clone() }
  247. #[allow(dead_code)]
  248. pub fn pending_revs(&self) -> Arc<RwLock<VecDeque<i64>>> { self.local_revs.clone() }
  249. }
  250. #[cfg(feature = "flowy_unit_test")]
  251. impl RevisionManager {
  252. pub fn revision_cache(&self) -> Arc<RevisionCache> { self.cache.clone() }
  253. }