rev_persistence.rs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478
  1. use crate::cache::memory::RevisionMemoryCacheDelegate;
  2. use crate::memory::RevisionMemoryCache;
  3. use crate::RevisionMergeable;
  4. use flowy_error::{internal_error, FlowyError, FlowyResult};
  5. use flowy_revision_persistence::{RevisionChangeset, RevisionDiskCache, RevisionState, SyncRecord};
  6. use revision_model::{Revision, RevisionRange};
  7. use std::collections::{HashMap, VecDeque};
  8. use std::{borrow::Cow, sync::Arc};
  9. use tokio::sync::RwLock;
  10. use tokio::task::spawn_blocking;
  11. pub const REVISION_WRITE_INTERVAL_IN_MILLIS: u64 = 600;
  12. #[derive(Clone)]
  13. pub struct RevisionPersistenceConfiguration {
  14. // If the number of revisions that didn't sync to the server greater than the max_merge_len
  15. // then these revisions will be merged into one revision.
  16. max_merge_len: usize,
  17. /// Indicates that the revisions that didn't sync to the server can be merged into one when
  18. /// `merge_lagging_revisions` get called.
  19. merge_lagging: bool,
  20. }
  21. impl RevisionPersistenceConfiguration {
  22. pub fn new(merge_max_length: usize, merge_lagging: bool) -> Self {
  23. debug_assert!(merge_max_length > 1);
  24. if merge_max_length > 1 {
  25. Self {
  26. max_merge_len: merge_max_length,
  27. merge_lagging,
  28. }
  29. } else {
  30. Self {
  31. max_merge_len: 100,
  32. merge_lagging,
  33. }
  34. }
  35. }
  36. }
  37. impl std::default::Default for RevisionPersistenceConfiguration {
  38. fn default() -> Self {
  39. Self {
  40. max_merge_len: 100,
  41. merge_lagging: false,
  42. }
  43. }
  44. }
  45. /// Represents as the persistence of revisions including memory or disk cache.
  46. /// The generic parameter, `Connection`, represents as the disk backend's connection.
  47. /// If the backend is SQLite, then the Connect will be SQLiteConnect.
  48. pub struct RevisionPersistence<Connection> {
  49. object_id: String,
  50. disk_cache: Arc<dyn RevisionDiskCache<Connection, Error = FlowyError>>,
  51. memory_cache: Arc<RevisionMemoryCache>,
  52. sync_seq: RwLock<DeferSyncSequence>,
  53. configuration: RevisionPersistenceConfiguration,
  54. }
  55. impl<Connection> RevisionPersistence<Connection>
  56. where
  57. Connection: 'static,
  58. {
  59. pub fn new<C>(
  60. object_id: &str,
  61. disk_cache: C,
  62. configuration: RevisionPersistenceConfiguration,
  63. ) -> RevisionPersistence<Connection>
  64. where
  65. C: 'static + RevisionDiskCache<Connection, Error = FlowyError>,
  66. {
  67. let disk_cache =
  68. Arc::new(disk_cache) as Arc<dyn RevisionDiskCache<Connection, Error = FlowyError>>;
  69. Self::from_disk_cache(object_id, disk_cache, configuration)
  70. }
  71. pub fn from_disk_cache(
  72. object_id: &str,
  73. disk_cache: Arc<dyn RevisionDiskCache<Connection, Error = FlowyError>>,
  74. configuration: RevisionPersistenceConfiguration,
  75. ) -> RevisionPersistence<Connection> {
  76. let object_id = object_id.to_owned();
  77. let sync_seq = RwLock::new(DeferSyncSequence::new());
  78. let memory_cache = Arc::new(RevisionMemoryCache::new(
  79. &object_id,
  80. Arc::new(disk_cache.clone()),
  81. ));
  82. Self {
  83. object_id,
  84. disk_cache,
  85. memory_cache,
  86. sync_seq,
  87. configuration,
  88. }
  89. }
  90. /// Save the revision that comes from remote to disk.
  91. #[tracing::instrument(level = "trace", skip(self, revision), fields(rev_id, object_id=%self.object_id), err)]
  92. pub(crate) async fn add_ack_revision(&self, revision: &Revision) -> FlowyResult<()> {
  93. tracing::Span::current().record("rev_id", revision.rev_id);
  94. self.add(revision.clone(), RevisionState::Ack, true).await
  95. }
  96. #[tracing::instrument(level = "trace", skip_all, err)]
  97. pub async fn merge_lagging_revisions<'a>(
  98. &'a self,
  99. rev_compress: &Arc<dyn RevisionMergeable + 'a>,
  100. ) -> FlowyResult<()> {
  101. if !self.configuration.merge_lagging {
  102. return Ok(());
  103. }
  104. let mut sync_seq = self.sync_seq.write().await;
  105. let compact_seq = sync_seq.pop_merge_rev_ids();
  106. if !compact_seq.is_empty() {
  107. let range = RevisionRange {
  108. start: *compact_seq.front().unwrap(),
  109. end: *compact_seq.back().unwrap(),
  110. };
  111. let revisions = self.revisions_in_range(&range).await?;
  112. let rev_ids = range.to_rev_ids();
  113. debug_assert_eq!(range.len() as usize, revisions.len());
  114. // compact multiple revisions into one
  115. let merged_revision = rev_compress.merge_revisions(&self.object_id, revisions)?;
  116. tracing::Span::current().record("rev_id", merged_revision.rev_id);
  117. let record = SyncRecord {
  118. revision: merged_revision,
  119. state: RevisionState::Sync,
  120. write_to_disk: true,
  121. };
  122. self
  123. .disk_cache
  124. .delete_and_insert_records(&self.object_id, Some(rev_ids), vec![record])?;
  125. }
  126. Ok(())
  127. }
  128. /// Sync the each records' revisions to remote if its state is `RevisionState::Sync`.
  129. ///
  130. pub(crate) async fn sync_revision_records(&self, records: &[SyncRecord]) -> FlowyResult<()> {
  131. let mut sync_seq = self.sync_seq.write().await;
  132. for record in records {
  133. if record.state == RevisionState::Sync {
  134. self
  135. .add(record.revision.clone(), RevisionState::Sync, false)
  136. .await?;
  137. sync_seq.recv(record.revision.rev_id)?; // Sync the records if their state is RevisionState::Sync.
  138. }
  139. }
  140. Ok(())
  141. }
  142. /// Save the revision to disk and append it to the end of the sync sequence.
  143. /// The returned value,rev_id, will be different with the passed-in revision's rev_id if
  144. /// multiple revisions are merged into one.
  145. #[tracing::instrument(level = "trace", skip_all, fields(rev_id, compact_range, object_id=%self.object_id), err)]
  146. pub(crate) async fn add_local_revision<'a>(
  147. &'a self,
  148. new_revision: Revision,
  149. rev_compress: &Arc<dyn RevisionMergeable + 'a>,
  150. ) -> FlowyResult<i64> {
  151. let mut sync_seq = self.sync_seq.write().await;
  152. // Before the new_revision is pushed into the sync_seq, we check if the current `merge_length` of the
  153. // sync_seq is less equal to or greater than the `merge_max_length`. If yes, it's needs to merged
  154. // with the new_revision into one revision.
  155. let mut merge_rev_ids = VecDeque::default();
  156. // tracing::info!("{}", compact_seq)
  157. if sync_seq.merge_length >= self.configuration.max_merge_len - 1 {
  158. merge_rev_ids.extend(sync_seq.pop_merge_rev_ids());
  159. }
  160. if !merge_rev_ids.is_empty() {
  161. let range = RevisionRange {
  162. start: *merge_rev_ids.front().unwrap(),
  163. end: *merge_rev_ids.back().unwrap(),
  164. };
  165. tracing::Span::current().record("compact_range", format!("{}", range).as_str());
  166. let mut revisions = self.revisions_in_range(&range).await?;
  167. debug_assert_eq!(range.len() as usize, revisions.len());
  168. // append the new revision
  169. revisions.push(new_revision);
  170. // compact multiple revisions into one
  171. let merged_revision = rev_compress.merge_revisions(&self.object_id, revisions)?;
  172. let new_rev_id = merged_revision.rev_id;
  173. tracing::Span::current().record("rev_id", merged_revision.rev_id);
  174. sync_seq.recv(new_rev_id)?;
  175. // replace the revisions in range with compact revision
  176. self.compact(&range, merged_revision).await?;
  177. Ok(new_rev_id)
  178. } else {
  179. let rev_id = new_revision.rev_id;
  180. tracing::Span::current().record("rev_id", rev_id);
  181. self.add(new_revision, RevisionState::Sync, true).await?;
  182. sync_seq.merge_recv(rev_id)?;
  183. Ok(rev_id)
  184. }
  185. }
  186. /// Remove the revision with rev_id from the sync sequence.
  187. pub(crate) async fn ack_revision(&self, rev_id: i64) -> FlowyResult<()> {
  188. if self.sync_seq.write().await.ack(&rev_id).is_ok() {
  189. self.memory_cache.ack(&rev_id).await;
  190. }
  191. Ok(())
  192. }
  193. pub(crate) async fn next_sync_revision(&self) -> FlowyResult<Option<Revision>> {
  194. match self.sync_seq.read().await.next_rev_id() {
  195. None => Ok(None),
  196. Some(rev_id) => Ok(self.get(rev_id).await.map(|record| record.revision)),
  197. }
  198. }
  199. pub(crate) async fn next_sync_rev_id(&self) -> Option<i64> {
  200. self.sync_seq.read().await.next_rev_id()
  201. }
  202. pub(crate) fn number_of_sync_records(&self) -> usize {
  203. self.memory_cache.number_of_sync_records()
  204. }
  205. pub(crate) fn number_of_records_in_disk(&self) -> usize {
  206. match self.disk_cache.read_revision_records(&self.object_id, None) {
  207. Ok(records) => records.len(),
  208. Err(e) => {
  209. tracing::error!("Read revision records failed: {:?}", e);
  210. 0
  211. },
  212. }
  213. }
  214. /// The cache gets reset while it conflicts with the remote revisions.
  215. #[tracing::instrument(level = "trace", skip(self, revisions), err)]
  216. pub(crate) async fn reset(&self, revisions: Vec<Revision>) -> FlowyResult<()> {
  217. let records = revisions
  218. .into_iter()
  219. .map(|revision| SyncRecord {
  220. revision,
  221. state: RevisionState::Sync,
  222. write_to_disk: false,
  223. })
  224. .collect::<Vec<_>>();
  225. self
  226. .disk_cache
  227. .delete_and_insert_records(&self.object_id, None, records.clone())?;
  228. self.memory_cache.reset_with_revisions(records).await;
  229. self.sync_seq.write().await.clear();
  230. Ok(())
  231. }
  232. async fn add(
  233. &self,
  234. revision: Revision,
  235. state: RevisionState,
  236. write_to_disk: bool,
  237. ) -> FlowyResult<()> {
  238. if self.memory_cache.contains(&revision.rev_id) {
  239. tracing::warn!(
  240. "Duplicate revision: {}:{}-{:?}",
  241. self.object_id,
  242. revision.rev_id,
  243. state
  244. );
  245. return Ok(());
  246. }
  247. let record = SyncRecord {
  248. revision,
  249. state,
  250. write_to_disk,
  251. };
  252. self.memory_cache.add(Cow::Owned(record)).await;
  253. Ok(())
  254. }
  255. async fn compact(&self, range: &RevisionRange, new_revision: Revision) -> FlowyResult<()> {
  256. self.memory_cache.remove_with_range(range);
  257. let rev_ids = range.to_rev_ids();
  258. self
  259. .disk_cache
  260. .delete_revision_records(&self.object_id, Some(rev_ids))?;
  261. self.add(new_revision, RevisionState::Sync, true).await?;
  262. Ok(())
  263. }
  264. pub async fn get(&self, rev_id: i64) -> Option<SyncRecord> {
  265. match self.memory_cache.get(&rev_id).await {
  266. None => match self
  267. .disk_cache
  268. .read_revision_records(&self.object_id, Some(vec![rev_id]))
  269. {
  270. Ok(mut records) => {
  271. let record = records.pop()?;
  272. assert!(records.is_empty());
  273. Some(record)
  274. },
  275. Err(e) => {
  276. tracing::error!("{}", e);
  277. None
  278. },
  279. },
  280. Some(revision) => Some(revision),
  281. }
  282. }
  283. pub fn load_all_records(&self, object_id: &str) -> FlowyResult<Vec<SyncRecord>> {
  284. let mut record_ids = HashMap::new();
  285. let mut records = vec![];
  286. for record in self.disk_cache.read_revision_records(object_id, None)? {
  287. let rev_id = record.revision.rev_id;
  288. if record_ids.get(&rev_id).is_none() {
  289. records.push(record);
  290. }
  291. record_ids.insert(rev_id, rev_id);
  292. }
  293. Ok(records)
  294. }
  295. // Read the revision which rev_id >= range.start && rev_id <= range.end
  296. pub async fn revisions_in_range(&self, range: &RevisionRange) -> FlowyResult<Vec<Revision>> {
  297. let range = range.clone();
  298. let mut records = self.memory_cache.get_with_range(&range).await?;
  299. let range_len = range.len() as usize;
  300. if records.len() != range_len {
  301. let disk_cache = self.disk_cache.clone();
  302. let object_id = self.object_id.clone();
  303. records =
  304. spawn_blocking(move || disk_cache.read_revision_records_with_range(&object_id, &range))
  305. .await
  306. .map_err(internal_error)??;
  307. if records.len() != range_len {
  308. tracing::error!(
  309. "Expect revision len {},but receive {}",
  310. range_len,
  311. records.len()
  312. );
  313. }
  314. }
  315. Ok(
  316. records
  317. .into_iter()
  318. .map(|record| record.revision)
  319. .collect::<Vec<Revision>>(),
  320. )
  321. }
  322. pub fn delete_revisions_from_range(&self, range: RevisionRange) -> FlowyResult<()> {
  323. self
  324. .disk_cache
  325. .delete_revision_records(&self.object_id, Some(range.to_rev_ids()))?;
  326. Ok(())
  327. }
  328. }
  329. impl<C> RevisionMemoryCacheDelegate for Arc<dyn RevisionDiskCache<C, Error = FlowyError>> {
  330. fn send_sync(&self, mut records: Vec<SyncRecord>) -> FlowyResult<()> {
  331. records.retain(|record| record.write_to_disk);
  332. if !records.is_empty() {
  333. tracing::Span::current().record(
  334. "checkpoint_result",
  335. format!("{} records were saved", records.len()).as_str(),
  336. );
  337. self.create_revision_records(records)?;
  338. }
  339. Ok(())
  340. }
  341. fn receive_ack(&self, object_id: &str, rev_id: i64) {
  342. let changeset = RevisionChangeset {
  343. object_id: object_id.to_string(),
  344. rev_id,
  345. state: RevisionState::Ack,
  346. };
  347. match self.update_revision_record(vec![changeset]) {
  348. Ok(_) => {},
  349. Err(e) => tracing::error!("{}", e),
  350. }
  351. }
  352. }
  353. #[derive(Default)]
  354. struct DeferSyncSequence {
  355. rev_ids: VecDeque<i64>,
  356. merge_start: Option<usize>,
  357. merge_length: usize,
  358. }
  359. impl DeferSyncSequence {
  360. fn new() -> Self {
  361. DeferSyncSequence::default()
  362. }
  363. /// Pushes the new_rev_id to the end of the list and marks this new_rev_id is mergeable.
  364. ///
  365. fn merge_recv(&mut self, new_rev_id: i64) -> FlowyResult<()> {
  366. self.recv(new_rev_id)?;
  367. self.merge_length += 1;
  368. if self.merge_start.is_none() && !self.rev_ids.is_empty() {
  369. self.merge_start = Some(self.rev_ids.len() - 1);
  370. }
  371. Ok(())
  372. }
  373. /// Pushes the new_rev_id to the end of the list.
  374. fn recv(&mut self, new_rev_id: i64) -> FlowyResult<()> {
  375. // The last revision's rev_id must be greater than the new one.
  376. if let Some(rev_id) = self.rev_ids.back() {
  377. if *rev_id >= new_rev_id {
  378. tracing::error!("The new revision's id must be greater than {}", rev_id);
  379. return Ok(());
  380. }
  381. }
  382. self.rev_ids.push_back(new_rev_id);
  383. Ok(())
  384. }
  385. /// Removes the rev_id from the list
  386. fn ack(&mut self, rev_id: &i64) -> FlowyResult<()> {
  387. let cur_rev_id = self.rev_ids.front().cloned();
  388. if let Some(pop_rev_id) = cur_rev_id {
  389. if &pop_rev_id != rev_id {
  390. let desc = format!(
  391. "The ack rev_id:{} is not equal to the current rev_id:{}",
  392. rev_id, pop_rev_id
  393. );
  394. return Err(FlowyError::internal().context(desc));
  395. }
  396. let mut compact_rev_id = None;
  397. if let Some(compact_index) = self.merge_start {
  398. compact_rev_id = self.rev_ids.get(compact_index).cloned();
  399. }
  400. let pop_rev_id = self.rev_ids.pop_front();
  401. if let (Some(compact_rev_id), Some(pop_rev_id)) = (compact_rev_id, pop_rev_id) {
  402. if compact_rev_id <= pop_rev_id && self.merge_length > 0 {
  403. self.merge_length -= 1;
  404. }
  405. }
  406. }
  407. Ok(())
  408. }
  409. fn next_rev_id(&self) -> Option<i64> {
  410. self.rev_ids.front().cloned()
  411. }
  412. fn clear(&mut self) {
  413. self.merge_start = None;
  414. self.merge_length = 0;
  415. self.rev_ids.clear();
  416. }
  417. // Returns the rev_ids into one except the current synchronizing rev_id.
  418. fn pop_merge_rev_ids(&mut self) -> VecDeque<i64> {
  419. let mut compact_seq = VecDeque::with_capacity(self.rev_ids.len());
  420. if let Some(start) = self.merge_start {
  421. if start < self.rev_ids.len() {
  422. let seq = self.rev_ids.split_off(start);
  423. compact_seq.extend(seq);
  424. }
  425. }
  426. self.merge_start = None;
  427. self.merge_length = 0;
  428. compact_seq
  429. }
  430. }