cache.rs 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. use crate::{
  2. errors::{internal_error, DocError, DocResult},
  3. services::doc::revision::RevisionServer,
  4. sql_tables::RevTableSql,
  5. };
  6. use flowy_database::ConnectionPool;
  7. use flowy_document_infra::entities::doc::Doc;
  8. use lib_infra::future::ResultFuture;
  9. use lib_ot::{
  10. core::{Operation, OperationTransformable},
  11. revision::{
  12. RevId,
  13. RevState,
  14. RevType,
  15. Revision,
  16. RevisionDiskCache,
  17. RevisionMemoryCache,
  18. RevisionRange,
  19. RevisionRecord,
  20. },
  21. rich_text::RichTextDelta,
  22. };
  23. use std::{sync::Arc, time::Duration};
  24. use tokio::{
  25. sync::RwLock,
  26. task::{spawn_blocking, JoinHandle},
  27. };
  28. pub trait RevisionIterator: Send + Sync {
  29. fn next(&self) -> ResultFuture<Option<RevisionRecord>, DocError>;
  30. }
  31. type DocRevisionDeskCache = dyn RevisionDiskCache<Error = DocError>;
  32. pub struct RevisionCache {
  33. doc_id: String,
  34. dish_cache: Arc<DocRevisionDeskCache>,
  35. memory_cache: Arc<RevisionMemoryCache>,
  36. defer_save: RwLock<Option<JoinHandle<()>>>,
  37. server: Arc<dyn RevisionServer>,
  38. }
  39. impl RevisionCache {
  40. pub fn new(doc_id: &str, pool: Arc<ConnectionPool>, server: Arc<dyn RevisionServer>) -> RevisionCache {
  41. let doc_id = doc_id.to_owned();
  42. let dish_cache = Arc::new(Persistence::new(pool));
  43. let memory_cache = Arc::new(RevisionMemoryCache::new());
  44. Self {
  45. doc_id,
  46. dish_cache,
  47. memory_cache,
  48. defer_save: RwLock::new(None),
  49. server,
  50. }
  51. }
  52. #[tracing::instrument(level = "debug", skip(self, revision))]
  53. pub async fn add_revision(&self, revision: Revision) -> DocResult<()> {
  54. if self.memory_cache.contains(&revision.rev_id) {
  55. return Err(DocError::duplicate_rev().context(format!("Duplicate revision id: {}", revision.rev_id)));
  56. }
  57. self.memory_cache.add_revision(revision.clone()).await?;
  58. self.save_revisions().await;
  59. Ok(())
  60. }
  61. #[tracing::instrument(level = "debug", skip(self, rev_id), fields(rev_id = %rev_id.as_ref()))]
  62. pub async fn ack_revision(&self, rev_id: RevId) {
  63. let rev_id = rev_id.value;
  64. self.memory_cache.mut_revision(&rev_id, |mut rev| rev.value_mut().ack());
  65. self.save_revisions().await;
  66. }
  67. async fn save_revisions(&self) {
  68. if let Some(handler) = self.defer_save.write().await.take() {
  69. handler.abort();
  70. }
  71. if self.memory_cache.is_empty() {
  72. return;
  73. }
  74. let memory_cache = self.memory_cache.clone();
  75. let disk_cache = self.dish_cache.clone();
  76. *self.defer_save.write().await = Some(tokio::spawn(async move {
  77. tokio::time::sleep(Duration::from_millis(300)).await;
  78. let (ids, records) = memory_cache.revisions();
  79. match disk_cache.create_revisions(records) {
  80. Ok(_) => {
  81. memory_cache.remove_revisions(ids);
  82. },
  83. Err(e) => log::error!("Save revision failed: {:?}", e),
  84. }
  85. }));
  86. }
  87. pub async fn revisions_in_range(&self, range: RevisionRange) -> DocResult<Vec<Revision>> {
  88. let revs = self.memory_cache.revisions_in_range(&range).await?;
  89. if revs.len() == range.len() as usize {
  90. Ok(revs)
  91. } else {
  92. let doc_id = self.doc_id.clone();
  93. let disk_cache = self.dish_cache.clone();
  94. spawn_blocking(move || disk_cache.revisions_in_range(&doc_id, &range))
  95. .await
  96. .map_err(internal_error)?
  97. }
  98. }
  99. pub async fn fetch_document(&self) -> DocResult<Doc> {
  100. let result = fetch_from_local(&self.doc_id, self.dish_cache.clone()).await;
  101. if result.is_ok() {
  102. return result;
  103. }
  104. let doc = self.server.fetch_document_from_remote(&self.doc_id).await?;
  105. let delta_data = doc.data.as_bytes();
  106. let revision = Revision::new(
  107. doc.base_rev_id,
  108. doc.rev_id,
  109. delta_data.to_owned(),
  110. &doc.id,
  111. RevType::Remote,
  112. );
  113. let record = RevisionRecord {
  114. revision,
  115. state: RevState::Acked,
  116. };
  117. let _ = self.dish_cache.create_revisions(vec![record])?;
  118. Ok(doc)
  119. }
  120. }
  121. impl RevisionIterator for RevisionCache {
  122. fn next(&self) -> ResultFuture<Option<RevisionRecord>, DocError> {
  123. let memory_cache = self.memory_cache.clone();
  124. let disk_cache = self.dish_cache.clone();
  125. let doc_id = self.doc_id.clone();
  126. ResultFuture::new(async move {
  127. match memory_cache.front_revision().await {
  128. None => {
  129. //
  130. match memory_cache.front_rev_id().await {
  131. None => Ok(None),
  132. Some(rev_id) => match disk_cache.read_revision(&doc_id, rev_id)? {
  133. None => Ok(None),
  134. Some(revision) => Ok(Some(RevisionRecord::new(revision))),
  135. },
  136. }
  137. },
  138. Some((_, record)) => Ok(Some(record)),
  139. }
  140. })
  141. }
  142. }
  143. async fn fetch_from_local(doc_id: &str, disk_cache: Arc<DocRevisionDeskCache>) -> DocResult<Doc> {
  144. let doc_id = doc_id.to_owned();
  145. spawn_blocking(move || {
  146. let revisions = disk_cache.read_revisions(&doc_id)?;
  147. if revisions.is_empty() {
  148. return Err(DocError::record_not_found().context("Local doesn't have this document"));
  149. }
  150. let base_rev_id: RevId = revisions.last().unwrap().base_rev_id.into();
  151. let rev_id: RevId = revisions.last().unwrap().rev_id.into();
  152. let mut delta = RichTextDelta::new();
  153. for (_, revision) in revisions.into_iter().enumerate() {
  154. match RichTextDelta::from_bytes(revision.delta_data) {
  155. Ok(local_delta) => {
  156. delta = delta.compose(&local_delta)?;
  157. },
  158. Err(e) => {
  159. log::error!("Deserialize delta from revision failed: {}", e);
  160. },
  161. }
  162. }
  163. #[cfg(debug_assertions)]
  164. validate_delta(&doc_id, disk_cache, &delta);
  165. match delta.ops.last() {
  166. None => {},
  167. Some(op) => {
  168. let data = op.get_data();
  169. if !data.ends_with('\n') {
  170. delta.ops.push(Operation::Insert("\n".into()))
  171. }
  172. },
  173. }
  174. Result::<Doc, DocError>::Ok(Doc {
  175. id: doc_id,
  176. data: delta.to_json(),
  177. rev_id: rev_id.into(),
  178. base_rev_id: base_rev_id.into(),
  179. })
  180. })
  181. .await
  182. .map_err(internal_error)?
  183. }
  184. #[cfg(debug_assertions)]
  185. fn validate_delta(doc_id: &str, disk_cache: Arc<DocRevisionDeskCache>, delta: &RichTextDelta) {
  186. if delta.ops.last().is_none() {
  187. return;
  188. }
  189. let data = delta.ops.last().as_ref().unwrap().get_data();
  190. if !data.ends_with('\n') {
  191. log::error!("The op must end with newline");
  192. let result = || {
  193. let revisions = disk_cache.read_revisions(&doc_id)?;
  194. for revision in revisions {
  195. let delta = RichTextDelta::from_bytes(revision.delta_data)?;
  196. log::error!("Invalid revision: {}:{}", revision.rev_id, delta.to_json());
  197. }
  198. Ok::<(), DocError>(())
  199. };
  200. match result() {
  201. Ok(_) => {},
  202. Err(e) => log::error!("{}", e),
  203. }
  204. }
  205. }
  206. pub(crate) struct Persistence {
  207. pub(crate) pool: Arc<ConnectionPool>,
  208. }
  209. impl RevisionDiskCache for Persistence {
  210. type Error = DocError;
  211. fn create_revisions(&self, revisions: Vec<RevisionRecord>) -> Result<(), Self::Error> {
  212. let conn = &*self.pool.get().map_err(internal_error)?;
  213. conn.immediate_transaction::<_, DocError, _>(|| {
  214. let _ = RevTableSql::create_rev_table(revisions, conn)?;
  215. Ok(())
  216. })
  217. }
  218. fn revisions_in_range(&self, doc_id: &str, range: &RevisionRange) -> Result<Vec<Revision>, Self::Error> {
  219. let conn = &*self.pool.get().map_err(internal_error).unwrap();
  220. let revisions = RevTableSql::read_rev_tables_with_range(doc_id, range.clone(), conn)?;
  221. Ok(revisions)
  222. }
  223. fn read_revision(&self, doc_id: &str, rev_id: i64) -> Result<Option<Revision>, Self::Error> {
  224. let conn = self.pool.get().map_err(internal_error)?;
  225. let some = RevTableSql::read_rev_table(doc_id, &rev_id, &*conn)?;
  226. Ok(some)
  227. }
  228. fn read_revisions(&self, doc_id: &str) -> Result<Vec<Revision>, Self::Error> {
  229. let conn = self.pool.get().map_err(internal_error)?;
  230. let some = RevTableSql::read_rev_tables(doc_id, &*conn)?;
  231. Ok(some)
  232. }
  233. }
  234. impl Persistence {
  235. pub(crate) fn new(pool: Arc<ConnectionPool>) -> Self { Self { pool } }
  236. }