rev_manager.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. use crate::RevisionCache;
  2. use flowy_collaboration::{
  3. entities::revision::{RepeatedRevision, Revision, RevisionRange, RevisionState},
  4. util::{pair_rev_id_from_revisions, RevIdCounter},
  5. };
  6. use flowy_error::{FlowyError, FlowyResult};
  7. use lib_infra::future::FutureResult;
  8. use std::{collections::VecDeque, sync::Arc};
  9. use tokio::sync::RwLock;
  10. pub trait RevisionCloudService: Send + Sync {
  11. fn fetch_object(&self, user_id: &str, object_id: &str) -> FutureResult<Vec<Revision>, FlowyError>;
  12. }
  13. pub trait RevisionObjectBuilder: Send + Sync {
  14. type Output;
  15. fn build_with_revisions(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output>;
  16. }
  17. pub trait RevisionCompact: Send + Sync {
  18. fn compact_revisions(user_id: &str, object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Revision>;
  19. }
  20. pub struct RevisionManager {
  21. pub object_id: String,
  22. user_id: String,
  23. rev_id_counter: RevIdCounter,
  24. cache: Arc<RwLock<RevisionCacheCompact>>,
  25. #[cfg(feature = "flowy_unit_test")]
  26. revision_ack_notifier: tokio::sync::broadcast::Sender<i64>,
  27. }
  28. impl RevisionManager {
  29. pub fn new(user_id: &str, object_id: &str, revision_cache: Arc<RevisionCache>) -> Self {
  30. let rev_id_counter = RevIdCounter::new(0);
  31. let cache = Arc::new(RwLock::new(RevisionCacheCompact::new(
  32. object_id,
  33. user_id,
  34. revision_cache,
  35. )));
  36. #[cfg(feature = "flowy_unit_test")]
  37. let (revision_ack_notifier, _) = tokio::sync::broadcast::channel(1);
  38. Self {
  39. object_id: object_id.to_string(),
  40. user_id: user_id.to_owned(),
  41. rev_id_counter,
  42. cache,
  43. #[cfg(feature = "flowy_unit_test")]
  44. revision_ack_notifier,
  45. }
  46. }
  47. pub async fn load<B, C>(&mut self, cloud: Arc<dyn RevisionCloudService>) -> FlowyResult<B::Output>
  48. where
  49. B: RevisionObjectBuilder,
  50. C: RevisionCompact,
  51. {
  52. let (revisions, rev_id) = RevisionLoader {
  53. object_id: self.object_id.clone(),
  54. user_id: self.user_id.clone(),
  55. cloud,
  56. cache: self.cache.clone(),
  57. }
  58. .load::<C>()
  59. .await?;
  60. self.rev_id_counter.set(rev_id);
  61. B::build_with_revisions(&self.object_id, revisions)
  62. }
  63. #[tracing::instrument(level = "debug", skip(self, revisions), err)]
  64. pub async fn reset_object(&self, revisions: RepeatedRevision) -> FlowyResult<()> {
  65. let rev_id = pair_rev_id_from_revisions(&revisions).1;
  66. let write_guard = self.cache.write().await;
  67. let _ = write_guard.reset(revisions.into_inner()).await?;
  68. self.rev_id_counter.set(rev_id);
  69. Ok(())
  70. }
  71. #[tracing::instrument(level = "debug", skip(self, revision), err)]
  72. pub async fn add_remote_revision(&self, revision: &Revision) -> Result<(), FlowyError> {
  73. if revision.delta_data.is_empty() {
  74. return Err(FlowyError::internal().context("Delta data should be empty"));
  75. }
  76. let write_guard = self.cache.write().await;
  77. let _ = write_guard.add_ack_revision(revision).await?;
  78. self.rev_id_counter.set(revision.rev_id);
  79. Ok(())
  80. }
  81. #[tracing::instrument(level = "debug", skip(self, revision))]
  82. pub async fn add_local_revision<C>(&self, revision: &Revision) -> Result<(), FlowyError>
  83. where
  84. C: RevisionCompact,
  85. {
  86. if revision.delta_data.is_empty() {
  87. return Err(FlowyError::internal().context("Delta data should be empty"));
  88. }
  89. let mut write_guard = self.cache.write().await;
  90. let rev_id = write_guard.write_sync_revision::<C>(revision).await?;
  91. self.rev_id_counter.set(rev_id);
  92. Ok(())
  93. }
  94. #[tracing::instrument(level = "debug", skip(self), err)]
  95. pub async fn ack_revision(&self, rev_id: i64) -> Result<(), FlowyError> {
  96. if self.cache.write().await.ack_revision(rev_id).await.is_ok() {
  97. #[cfg(feature = "flowy_unit_test")]
  98. let _ = self.revision_ack_notifier.send(rev_id);
  99. }
  100. Ok(())
  101. }
  102. pub fn rev_id(&self) -> i64 {
  103. self.rev_id_counter.value()
  104. }
  105. pub fn next_rev_id_pair(&self) -> (i64, i64) {
  106. let cur = self.rev_id_counter.value();
  107. let next = self.rev_id_counter.next();
  108. (cur, next)
  109. }
  110. pub async fn get_revisions_in_range(&self, range: RevisionRange) -> Result<Vec<Revision>, FlowyError> {
  111. let revisions = self.cache.read().await.revisions_in_range(&range).await?;
  112. Ok(revisions)
  113. }
  114. pub async fn next_sync_revision(&self) -> FlowyResult<Option<Revision>> {
  115. Ok(self.cache.read().await.next_sync_revision().await?)
  116. }
  117. pub async fn get_revision(&self, rev_id: i64) -> Option<Revision> {
  118. self.cache.read().await.get(rev_id).await.map(|record| record.revision)
  119. }
  120. }
  121. #[cfg(feature = "flowy_unit_test")]
  122. impl RevisionManager {
  123. pub async fn revision_cache(&self) -> Arc<RevisionCache> {
  124. self.cache.read().await.inner.clone()
  125. }
  126. pub fn revision_ack_receiver(&self) -> tokio::sync::broadcast::Receiver<i64> {
  127. self.revision_ack_notifier.subscribe()
  128. }
  129. }
  130. struct RevisionCacheCompact {
  131. object_id: String,
  132. user_id: String,
  133. inner: Arc<RevisionCache>,
  134. sync_seq: RevisionSyncSequence,
  135. }
  136. impl RevisionCacheCompact {
  137. fn new(object_id: &str, user_id: &str, inner: Arc<RevisionCache>) -> Self {
  138. let sync_seq = RevisionSyncSequence::new();
  139. let object_id = object_id.to_owned();
  140. let user_id = user_id.to_owned();
  141. Self {
  142. object_id,
  143. user_id,
  144. inner,
  145. sync_seq,
  146. }
  147. }
  148. // Call this method to write the revisions that fetch from server to disk.
  149. #[tracing::instrument(level = "trace", skip(self, revision), fields(rev_id, object_id=%self.object_id), err)]
  150. async fn add_ack_revision(&self, revision: &Revision) -> FlowyResult<()> {
  151. tracing::Span::current().record("rev_id", &revision.rev_id);
  152. self.inner.add(revision.clone(), RevisionState::Ack, true).await
  153. }
  154. // Call this method to sync the revisions that already in local db.
  155. #[tracing::instrument(level = "trace", skip(self), fields(rev_id, object_id=%self.object_id), err)]
  156. async fn add_sync_revision(&mut self, revision: &Revision) -> FlowyResult<()> {
  157. tracing::Span::current().record("rev_id", &revision.rev_id);
  158. self.inner.add(revision.clone(), RevisionState::Sync, false).await?;
  159. self.sync_seq.add(revision.rev_id)?;
  160. Ok(())
  161. }
  162. // Call this method to save the new revisions generated by the user input.
  163. #[tracing::instrument(level = "trace", skip(self, revision), fields(rev_id, compact_range, object_id=%self.object_id), err)]
  164. async fn write_sync_revision<C>(&mut self, revision: &Revision) -> FlowyResult<i64>
  165. where
  166. C: RevisionCompact,
  167. {
  168. match self.sync_seq.compact() {
  169. None => {
  170. tracing::Span::current().record("rev_id", &revision.rev_id);
  171. self.inner.add(revision.clone(), RevisionState::Sync, true).await?;
  172. self.sync_seq.add(revision.rev_id)?;
  173. Ok(revision.rev_id)
  174. }
  175. Some((range, mut compact_seq)) => {
  176. tracing::Span::current().record("compact_range", &format!("{}", range).as_str());
  177. let mut revisions = self.inner.revisions_in_range(&range).await?;
  178. if range.to_rev_ids().len() != revisions.len() {
  179. debug_assert_eq!(range.to_rev_ids().len(), revisions.len());
  180. }
  181. // append the new revision
  182. revisions.push(revision.clone());
  183. // compact multiple revisions into one
  184. let compact_revision = C::compact_revisions(&self.user_id, &self.object_id, revisions)?;
  185. let rev_id = compact_revision.rev_id;
  186. tracing::Span::current().record("rev_id", &rev_id);
  187. // insert new revision
  188. compact_seq.push_back(rev_id);
  189. // replace the revisions in range with compact revision
  190. self.inner.compact(&range, compact_revision).await?;
  191. debug_assert_eq!(self.sync_seq.len(), compact_seq.len());
  192. self.sync_seq.reset(compact_seq);
  193. Ok(rev_id)
  194. }
  195. }
  196. }
  197. async fn ack_revision(&mut self, rev_id: i64) -> FlowyResult<()> {
  198. if self.sync_seq.ack(&rev_id).is_ok() {
  199. self.inner.ack(rev_id).await;
  200. }
  201. Ok(())
  202. }
  203. async fn next_sync_revision(&self) -> FlowyResult<Option<Revision>> {
  204. if cfg!(feature = "flowy_unit_test") {
  205. match self.sync_seq.next_rev_id() {
  206. None => Ok(None),
  207. Some(rev_id) => Ok(self.inner.get(rev_id).await.map(|record| record.revision)),
  208. }
  209. } else {
  210. Ok(None)
  211. }
  212. }
  213. async fn reset(&self, revisions: Vec<Revision>) -> FlowyResult<()> {
  214. self.inner.reset_with_revisions(&self.object_id, revisions).await?;
  215. Ok(())
  216. }
  217. }
  218. impl std::ops::Deref for RevisionCacheCompact {
  219. type Target = Arc<RevisionCache>;
  220. fn deref(&self) -> &Self::Target {
  221. &self.inner
  222. }
  223. }
  224. #[derive(Default)]
  225. struct RevisionSyncSequence(VecDeque<i64>);
  226. impl RevisionSyncSequence {
  227. fn new() -> Self {
  228. RevisionSyncSequence::default()
  229. }
  230. fn add(&mut self, new_rev_id: i64) -> FlowyResult<()> {
  231. // The last revision's rev_id must be greater than the new one.
  232. if let Some(rev_id) = self.0.back() {
  233. if *rev_id >= new_rev_id {
  234. return Err(
  235. FlowyError::internal().context(format!("The new revision's id must be greater than {}", rev_id))
  236. );
  237. }
  238. }
  239. self.0.push_back(new_rev_id);
  240. Ok(())
  241. }
  242. fn ack(&mut self, rev_id: &i64) -> FlowyResult<()> {
  243. let cur_rev_id = self.0.front().cloned();
  244. if let Some(pop_rev_id) = cur_rev_id {
  245. if &pop_rev_id != rev_id {
  246. let desc = format!(
  247. "The ack rev_id:{} is not equal to the current rev_id:{}",
  248. rev_id, pop_rev_id
  249. );
  250. return Err(FlowyError::internal().context(desc));
  251. }
  252. let _ = self.0.pop_front();
  253. }
  254. Ok(())
  255. }
  256. fn next_rev_id(&self) -> Option<i64> {
  257. self.0.front().cloned()
  258. }
  259. fn reset(&mut self, new_seq: VecDeque<i64>) {
  260. self.0 = new_seq;
  261. }
  262. fn len(&self) -> usize {
  263. self.0.len()
  264. }
  265. // Compact the rev_ids into one except the current synchronizing rev_id.
  266. fn compact(&self) -> Option<(RevisionRange, VecDeque<i64>)> {
  267. self.next_rev_id()?;
  268. let mut new_seq = self.0.clone();
  269. let mut drained = new_seq.drain(1..).collect::<VecDeque<_>>();
  270. let start = drained.pop_front()?;
  271. let end = drained.pop_back().unwrap_or(start);
  272. Some((RevisionRange { start, end }, new_seq))
  273. }
  274. }
  275. struct RevisionLoader {
  276. object_id: String,
  277. user_id: String,
  278. cloud: Arc<dyn RevisionCloudService>,
  279. cache: Arc<RwLock<RevisionCacheCompact>>,
  280. }
  281. impl RevisionLoader {
  282. async fn load<C>(&self) -> Result<(Vec<Revision>, i64), FlowyError>
  283. where
  284. C: RevisionCompact,
  285. {
  286. let records = self.cache.read().await.batch_get(&self.object_id)?;
  287. let revisions: Vec<Revision>;
  288. let mut rev_id = 0;
  289. if records.is_empty() {
  290. let remote_revisions = self.cloud.fetch_object(&self.user_id, &self.object_id).await?;
  291. for revision in &remote_revisions {
  292. rev_id = revision.rev_id;
  293. let _ = self.cache.read().await.add_ack_revision(revision).await?;
  294. }
  295. revisions = remote_revisions;
  296. } else {
  297. for record in &records {
  298. rev_id = record.revision.rev_id;
  299. if record.state == RevisionState::Sync {
  300. // Sync the records if their state is RevisionState::Sync.
  301. let _ = self.cache.write().await.add_sync_revision(&record.revision).await?;
  302. }
  303. }
  304. revisions = records.into_iter().map(|record| record.revision).collect::<_>();
  305. }
  306. if let Some(revision) = revisions.last() {
  307. debug_assert_eq!(rev_id, revision.rev_id);
  308. }
  309. Ok((revisions, rev_id))
  310. }
  311. }