cache.rs 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. use crate::{
  2. errors::{internal_error, DocError, DocResult},
  3. services::doc::revision::RevisionServer,
  4. sql_tables::RevTableSql,
  5. };
  6. use flowy_database::ConnectionPool;
  7. use flowy_document_infra::entities::doc::Doc;
  8. use lib_infra::future::ResultFuture;
  9. use lib_ot::{
  10. core::{Operation, OperationTransformable},
  11. revision::{
  12. RevId,
  13. RevState,
  14. RevType,
  15. Revision,
  16. RevisionDiskCache,
  17. RevisionMemoryCache,
  18. RevisionRange,
  19. RevisionRecord,
  20. },
  21. rich_text::RichTextDelta,
  22. };
  23. use std::{sync::Arc, time::Duration};
  24. use tokio::{
  25. sync::{mpsc, RwLock},
  26. task::{spawn_blocking, JoinHandle},
  27. };
  28. pub trait RevisionIterator: Send + Sync {
  29. fn next(&self) -> ResultFuture<Option<RevisionRecord>, DocError>;
  30. }
  31. type DocRevisionDeskCache = dyn RevisionDiskCache<Error = DocError>;
  32. pub struct RevisionCache {
  33. doc_id: String,
  34. dish_cache: Arc<DocRevisionDeskCache>,
  35. memory_cache: Arc<RevisionMemoryCache>,
  36. defer_save: RwLock<Option<JoinHandle<()>>>,
  37. server: Arc<dyn RevisionServer>,
  38. }
  39. impl RevisionCache {
  40. pub fn new(doc_id: &str, pool: Arc<ConnectionPool>, server: Arc<dyn RevisionServer>) -> RevisionCache {
  41. let doc_id = doc_id.to_owned();
  42. let dish_cache = Arc::new(Persistence::new(pool));
  43. let memory_cache = Arc::new(RevisionMemoryCache::new());
  44. Self {
  45. doc_id,
  46. dish_cache,
  47. memory_cache,
  48. defer_save: RwLock::new(None),
  49. server,
  50. }
  51. }
  52. #[tracing::instrument(level = "debug", skip(self, revision))]
  53. pub async fn add_revision(&self, revision: Revision) -> DocResult<()> {
  54. if self.memory_cache.contains(&revision.rev_id) {
  55. return Err(DocError::duplicate_rev().context(format!("Duplicate revision id: {}", revision.rev_id)));
  56. }
  57. self.memory_cache.add_revision(revision.clone()).await?;
  58. self.save_revisions().await;
  59. Ok(())
  60. }
  61. #[tracing::instrument(level = "debug", skip(self, rev_id), fields(rev_id = %rev_id.as_ref()))]
  62. pub async fn ack_revision(&self, rev_id: RevId) {
  63. let rev_id = rev_id.value;
  64. self.memory_cache.mut_revision(&rev_id, |mut rev| rev.value_mut().ack());
  65. self.save_revisions().await;
  66. }
  67. async fn save_revisions(&self) {
  68. if let Some(handler) = self.defer_save.write().await.take() {
  69. handler.abort();
  70. }
  71. if self.memory_cache.is_empty() {
  72. return;
  73. }
  74. let memory_cache = self.memory_cache.clone();
  75. let disk_cache = self.dish_cache.clone();
  76. *self.defer_save.write().await = Some(tokio::spawn(async move {
  77. tokio::time::sleep(Duration::from_millis(300)).await;
  78. let (ids, records) = memory_cache.revisions();
  79. match disk_cache.create_revisions(records) {
  80. Ok(_) => {
  81. memory_cache.remove_revisions(ids);
  82. },
  83. Err(e) => log::error!("Save revision failed: {:?}", e),
  84. }
  85. }));
  86. }
  87. pub async fn revisions_in_range(&self, range: RevisionRange) -> DocResult<Vec<Revision>> {
  88. let revs = self.memory_cache.revisions_in_range(&range).await?;
  89. if revs.len() == range.len() as usize {
  90. Ok(revs)
  91. } else {
  92. let doc_id = self.doc_id.clone();
  93. let disk_cache = self.dish_cache.clone();
  94. spawn_blocking(move || disk_cache.revisions_in_range(&doc_id, &range))
  95. .await
  96. .map_err(internal_error)?
  97. }
  98. }
  99. pub async fn load_document(&self) -> DocResult<Doc> {
  100. // Loading the document from disk and it will be sync with server.
  101. let result = load_from_disk(&self.doc_id, self.memory_cache.clone(), self.dish_cache.clone()).await;
  102. if result.is_ok() {
  103. return result;
  104. }
  105. // The document doesn't exist in local. Try load from server
  106. let doc = self.server.fetch_document(&self.doc_id).await?;
  107. let delta_data = doc.data.as_bytes();
  108. let revision = Revision::new(
  109. doc.base_rev_id,
  110. doc.rev_id,
  111. delta_data.to_owned(),
  112. &doc.id,
  113. RevType::Remote,
  114. );
  115. let record = RevisionRecord {
  116. revision,
  117. state: RevState::Acked,
  118. };
  119. let _ = self.dish_cache.create_revisions(vec![record])?;
  120. Ok(doc)
  121. }
  122. }
  123. impl RevisionIterator for RevisionCache {
  124. fn next(&self) -> ResultFuture<Option<RevisionRecord>, DocError> {
  125. let memory_cache = self.memory_cache.clone();
  126. let disk_cache = self.dish_cache.clone();
  127. let doc_id = self.doc_id.clone();
  128. ResultFuture::new(async move {
  129. match memory_cache.front_revision().await {
  130. None => {
  131. //
  132. match memory_cache.front_rev_id().await {
  133. None => Ok(None),
  134. Some(rev_id) => match disk_cache.read_revision(&doc_id, rev_id)? {
  135. None => Ok(None),
  136. Some(revision) => Ok(Some(RevisionRecord::new(revision))),
  137. },
  138. }
  139. },
  140. Some((_, record)) => Ok(Some(record)),
  141. }
  142. })
  143. }
  144. }
  145. async fn load_from_disk(
  146. doc_id: &str,
  147. memory_cache: Arc<RevisionMemoryCache>,
  148. disk_cache: Arc<DocRevisionDeskCache>,
  149. ) -> DocResult<Doc> {
  150. let doc_id = doc_id.to_owned();
  151. let (tx, mut rx) = mpsc::channel(2);
  152. let doc = spawn_blocking(move || {
  153. let revisions = disk_cache.read_revisions(&doc_id)?;
  154. if revisions.is_empty() {
  155. return Err(DocError::doc_not_found().context("Local doesn't have this document"));
  156. }
  157. let (base_rev_id, rev_id) = revisions.last().unwrap().pair_rev_id();
  158. let mut delta = RichTextDelta::new();
  159. for (_, revision) in revisions.into_iter().enumerate() {
  160. // Opti: revision's clone may cause memory issues
  161. match RichTextDelta::from_bytes(revision.clone().delta_data) {
  162. Ok(local_delta) => {
  163. delta = delta.compose(&local_delta)?;
  164. match tx.blocking_send(revision) {
  165. Ok(_) => {},
  166. Err(e) => log::error!("Load document from disk error: {}", e),
  167. }
  168. },
  169. Err(e) => {
  170. log::error!("Deserialize delta from revision failed: {}", e);
  171. },
  172. }
  173. }
  174. correct_delta_if_need(&mut delta);
  175. Result::<Doc, DocError>::Ok(Doc {
  176. id: doc_id,
  177. data: delta.to_json(),
  178. rev_id,
  179. base_rev_id,
  180. })
  181. })
  182. .await
  183. .map_err(internal_error)?;
  184. while let Some(revision) = rx.recv().await {
  185. match memory_cache.add_revision(revision).await {
  186. Ok(_) => {},
  187. Err(e) => log::error!("{:?}", e),
  188. }
  189. }
  190. doc
  191. }
  192. fn correct_delta_if_need(delta: &mut RichTextDelta) {
  193. if delta.ops.last().is_none() {
  194. return;
  195. }
  196. let data = delta.ops.last().as_ref().unwrap().get_data();
  197. if !data.ends_with('\n') {
  198. log::error!("The op must end with newline. Correcting it by inserting newline op");
  199. delta.ops.push(Operation::Insert("\n".into()));
  200. }
  201. }
  202. pub(crate) struct Persistence {
  203. pub(crate) pool: Arc<ConnectionPool>,
  204. }
  205. impl RevisionDiskCache for Persistence {
  206. type Error = DocError;
  207. fn create_revisions(&self, revisions: Vec<RevisionRecord>) -> Result<(), Self::Error> {
  208. let conn = &*self.pool.get().map_err(internal_error)?;
  209. conn.immediate_transaction::<_, DocError, _>(|| {
  210. let _ = RevTableSql::create_rev_table(revisions, conn)?;
  211. Ok(())
  212. })
  213. }
  214. fn revisions_in_range(&self, doc_id: &str, range: &RevisionRange) -> Result<Vec<Revision>, Self::Error> {
  215. let conn = &*self.pool.get().map_err(internal_error).unwrap();
  216. let revisions = RevTableSql::read_rev_tables_with_range(doc_id, range.clone(), conn)?;
  217. Ok(revisions)
  218. }
  219. fn read_revision(&self, doc_id: &str, rev_id: i64) -> Result<Option<Revision>, Self::Error> {
  220. let conn = self.pool.get().map_err(internal_error)?;
  221. let some = RevTableSql::read_rev_table(doc_id, &rev_id, &*conn)?;
  222. Ok(some)
  223. }
  224. fn read_revisions(&self, doc_id: &str) -> Result<Vec<Revision>, Self::Error> {
  225. let conn = self.pool.get().map_err(internal_error)?;
  226. let some = RevTableSql::read_rev_tables(doc_id, &*conn)?;
  227. Ok(some)
  228. }
  229. }
  230. impl Persistence {
  231. pub(crate) fn new(pool: Arc<ConnectionPool>) -> Self { Self { pool } }
  232. }
  233. #[cfg(feature = "flowy_unit_test")]
  234. impl RevisionCache {
  235. pub fn dish_cache(&self) -> Arc<DocRevisionDeskCache> { self.dish_cache.clone() }
  236. pub fn memory_cache(&self) -> Arc<RevisionMemoryCache> { self.memory_cache.clone() }
  237. }