cache.rs 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. use crate::{
  2. errors::FlowyError,
  3. services::doc::revision::cache::{
  4. disk::{Persistence, RevisionDiskCache},
  5. memory::{RevisionMemoryCache, RevisionMemoryCacheDelegate},
  6. },
  7. sql_tables::{RevTableState, RevisionChangeset},
  8. };
  9. use dashmap::DashMap;
  10. use flowy_collaboration::entities::revision::{Revision, RevisionRange, RevisionState};
  11. use flowy_database::ConnectionPool;
  12. use flowy_error::{internal_error, FlowyResult};
  13. use lib_infra::future::FutureResult;
  14. use lib_ot::errors::OTError;
  15. use std::{
  16. collections::VecDeque,
  17. sync::{
  18. atomic::{AtomicI64, Ordering::SeqCst},
  19. Arc,
  20. },
  21. };
  22. use tokio::{sync::RwLock, task::spawn_blocking};
  23. pub struct RevisionCache {
  24. doc_id: String,
  25. disk_cache: Arc<dyn RevisionDiskCache<Error = FlowyError>>,
  26. memory_cache: Arc<RevisionMemoryCache>,
  27. sync_seq: Arc<RevisionSyncSeq>,
  28. latest_rev_id: AtomicI64,
  29. }
  30. impl RevisionCache {
  31. pub fn new(user_id: &str, doc_id: &str, pool: Arc<ConnectionPool>) -> RevisionCache {
  32. let disk_cache = Arc::new(Persistence::new(user_id, pool));
  33. let memory_cache = Arc::new(RevisionMemoryCache::new(doc_id, Arc::new(disk_cache.clone())));
  34. let sync_seq = Arc::new(RevisionSyncSeq::new());
  35. let doc_id = doc_id.to_owned();
  36. Self {
  37. doc_id,
  38. disk_cache,
  39. memory_cache,
  40. sync_seq,
  41. latest_rev_id: AtomicI64::new(0),
  42. }
  43. }
  44. pub fn read_revisions(&self, doc_id: &str) -> FlowyResult<Vec<RevisionRecord>> {
  45. self.disk_cache.read_revisions(doc_id, None)
  46. }
  47. #[tracing::instrument(level = "debug", skip(self, doc_id, revisions))]
  48. pub fn reset_document(&self, doc_id: &str, revisions: Vec<Revision>) -> FlowyResult<()> {
  49. let disk_cache = self.disk_cache.clone();
  50. let conn = disk_cache.db_pool().get().map_err(internal_error)?;
  51. let records = revisions
  52. .into_iter()
  53. .map(|revision| RevisionRecord {
  54. revision,
  55. state: RevisionState::StateLocal,
  56. })
  57. .collect::<Vec<_>>();
  58. conn.immediate_transaction::<_, FlowyError, _>(|| {
  59. let _ = disk_cache.delete_revisions(doc_id, None, &*conn)?;
  60. let _ = disk_cache.write_revisions(records, &*conn)?;
  61. Ok(())
  62. })
  63. }
  64. #[tracing::instrument(level = "debug", skip(self, revision))]
  65. pub async fn add_local_revision(&self, revision: Revision) -> FlowyResult<()> {
  66. if self.memory_cache.contains(&revision.rev_id) {
  67. return Err(FlowyError::internal().context(format!("Duplicate local revision id: {}", revision.rev_id)));
  68. }
  69. let rev_id = revision.rev_id;
  70. let record = RevisionRecord {
  71. revision,
  72. state: RevisionState::StateLocal,
  73. };
  74. let _ = self.memory_cache.add_revision(&record).await;
  75. self.sync_seq.add_revision(record).await?;
  76. let _ = self.latest_rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(rev_id));
  77. Ok(())
  78. }
  79. #[tracing::instrument(level = "debug", skip(self, revision))]
  80. pub async fn add_remote_revision(&self, revision: Revision) -> FlowyResult<()> {
  81. if self.memory_cache.contains(&revision.rev_id) {
  82. return Err(FlowyError::internal().context(format!("Duplicate remote revision id: {}", revision.rev_id)));
  83. }
  84. let rev_id = revision.rev_id;
  85. let record = RevisionRecord {
  86. revision,
  87. state: RevisionState::Ack,
  88. };
  89. self.memory_cache.add_revision(&record).await;
  90. let _ = self.latest_rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(rev_id));
  91. Ok(())
  92. }
  93. #[tracing::instrument(level = "debug", skip(self, rev_id), fields(rev_id = %rev_id))]
  94. pub async fn ack_revision(&self, rev_id: i64) {
  95. if self.sync_seq.ack_revision(&rev_id).await.is_ok() {
  96. self.memory_cache.ack_revision(&rev_id).await;
  97. }
  98. }
  99. pub async fn latest_revision(&self) -> Revision {
  100. let rev_id = self.latest_rev_id.load(SeqCst);
  101. self.get_revision(rev_id).await.unwrap().revision
  102. }
  103. pub async fn get_revision(&self, rev_id: i64) -> Option<RevisionRecord> {
  104. match self.memory_cache.get_revision(&rev_id).await {
  105. None => match self.disk_cache.read_revisions(&self.doc_id, Some(vec![rev_id])) {
  106. Ok(mut records) => {
  107. if records.is_empty() {
  108. tracing::warn!("Can't find revision in {} with rev_id: {}", &self.doc_id, rev_id);
  109. }
  110. assert_eq!(records.len(), 1);
  111. records.pop()
  112. },
  113. Err(e) => {
  114. tracing::error!("{}", e);
  115. None
  116. },
  117. },
  118. Some(revision) => Some(revision),
  119. }
  120. }
  121. pub async fn revisions_in_range(&self, range: RevisionRange) -> FlowyResult<Vec<Revision>> {
  122. let mut records = self.memory_cache.get_revisions_in_range(&range).await?;
  123. let range_len = range.len() as usize;
  124. if records.len() != range_len {
  125. let disk_cache = self.disk_cache.clone();
  126. let doc_id = self.doc_id.clone();
  127. records = spawn_blocking(move || disk_cache.read_revisions_with_range(&doc_id, &range))
  128. .await
  129. .map_err(internal_error)??;
  130. if records.len() != range_len {
  131. log::error!("Revisions len is not equal to range required");
  132. }
  133. }
  134. Ok(records
  135. .into_iter()
  136. .map(|record| record.revision)
  137. .collect::<Vec<Revision>>())
  138. }
  139. pub(crate) fn next_sync_revision(&self) -> FutureResult<Option<Revision>, FlowyError> {
  140. let sync_seq = self.sync_seq.clone();
  141. let disk_cache = self.disk_cache.clone();
  142. let doc_id = self.doc_id.clone();
  143. FutureResult::new(async move {
  144. match sync_seq.next_sync_revision().await {
  145. None => match sync_seq.next_sync_rev_id().await {
  146. None => Ok(None),
  147. Some(rev_id) => {
  148. let records = disk_cache.read_revisions(&doc_id, Some(vec![rev_id]))?;
  149. let mut revisions = records
  150. .into_iter()
  151. .map(|record| record.revision)
  152. .collect::<Vec<Revision>>();
  153. Ok(revisions.pop())
  154. },
  155. },
  156. Some((_, record)) => Ok(Some(record.revision)),
  157. }
  158. })
  159. }
  160. }
  161. impl RevisionMemoryCacheDelegate for Arc<Persistence> {
  162. fn receive_checkpoint(&self, records: Vec<RevisionRecord>) -> FlowyResult<()> {
  163. let conn = &*self.pool.get().map_err(internal_error)?;
  164. self.write_revisions(records, &conn)
  165. }
  166. fn receive_ack(&self, doc_id: &str, rev_id: i64) {
  167. let changeset = RevisionChangeset {
  168. doc_id: doc_id.to_string(),
  169. rev_id: rev_id.into(),
  170. state: RevTableState::Acked,
  171. };
  172. match self.update_revisions(vec![changeset]) {
  173. Ok(_) => {},
  174. Err(e) => tracing::error!("{}", e),
  175. }
  176. }
  177. }
  178. #[derive(Clone)]
  179. pub struct RevisionRecord {
  180. pub revision: Revision,
  181. pub state: RevisionState,
  182. }
  183. impl RevisionRecord {
  184. pub fn ack(&mut self) { self.state = RevisionState::Ack; }
  185. }
  186. struct RevisionSyncSeq {
  187. revs_map: Arc<DashMap<i64, RevisionRecord>>,
  188. local_revs: Arc<RwLock<VecDeque<i64>>>,
  189. }
  190. impl std::default::Default for RevisionSyncSeq {
  191. fn default() -> Self {
  192. let local_revs = Arc::new(RwLock::new(VecDeque::new()));
  193. RevisionSyncSeq {
  194. revs_map: Arc::new(DashMap::new()),
  195. local_revs,
  196. }
  197. }
  198. }
  199. impl RevisionSyncSeq {
  200. fn new() -> Self { RevisionSyncSeq::default() }
  201. async fn add_revision(&self, record: RevisionRecord) -> Result<(), OTError> {
  202. // The last revision's rev_id must be greater than the new one.
  203. if let Some(rev_id) = self.local_revs.read().await.back() {
  204. if *rev_id >= record.revision.rev_id {
  205. return Err(OTError::revision_id_conflict()
  206. .context(format!("The new revision's id must be greater than {}", rev_id)));
  207. }
  208. }
  209. self.local_revs.write().await.push_back(record.revision.rev_id);
  210. self.revs_map.insert(record.revision.rev_id, record);
  211. Ok(())
  212. }
  213. async fn ack_revision(&self, rev_id: &i64) -> FlowyResult<()> {
  214. if let Some(pop_rev_id) = self.next_sync_rev_id().await {
  215. if &pop_rev_id != rev_id {
  216. let desc = format!(
  217. "The ack rev_id:{} is not equal to the current rev_id:{}",
  218. rev_id, pop_rev_id
  219. );
  220. // tracing::error!("{}", desc);
  221. return Err(FlowyError::internal().context(desc));
  222. }
  223. tracing::debug!("pop revision {}", pop_rev_id);
  224. self.revs_map.remove(&pop_rev_id);
  225. let _ = self.local_revs.write().await.pop_front();
  226. }
  227. Ok(())
  228. }
  229. async fn next_sync_revision(&self) -> Option<(i64, RevisionRecord)> {
  230. match self.local_revs.read().await.front() {
  231. None => None,
  232. Some(rev_id) => self.revs_map.get(rev_id).map(|r| (*r.key(), r.value().clone())),
  233. }
  234. }
  235. async fn next_sync_rev_id(&self) -> Option<i64> { self.local_revs.read().await.front().copied() }
  236. }
  237. #[cfg(feature = "flowy_unit_test")]
  238. impl RevisionSyncSeq {
  239. #[allow(dead_code)]
  240. pub fn revs_map(&self) -> Arc<DashMap<i64, RevisionRecord>> { self.revs_map.clone() }
  241. #[allow(dead_code)]
  242. pub fn pending_revs(&self) -> Arc<RwLock<VecDeque<i64>>> { self.local_revs.clone() }
  243. }