store_actor.rs 9.3 KB


  1. use crate::{
  2. entities::doc::{RevId, Revision, RevisionRange},
  3. errors::{internal_error, DocError, DocResult},
  4. services::doc::revision::{model::RevisionOperation, DocRevision, RevisionServer},
  5. sql_tables::{RevState, RevTableSql},
  6. };
  7. use async_stream::stream;
  8. use dashmap::DashMap;
  9. use flowy_database::ConnectionPool;
  10. use flowy_ot::core::{Attributes, Delta, OperationTransformable};
  11. use futures::{stream::StreamExt, TryFutureExt};
  12. use std::{sync::Arc, time::Duration};
  13. use tokio::{
  14. sync::{mpsc, oneshot, RwLock},
  15. task::{spawn_blocking, JoinHandle},
  16. };
  17. pub enum RevisionCmd {
  18. Revision {
  19. revision: Revision,
  20. ret: oneshot::Sender<DocResult<()>>,
  21. },
  22. AckRevision {
  23. rev_id: RevId,
  24. },
  25. SendRevisions {
  26. range: RevisionRange,
  27. ret: oneshot::Sender<DocResult<Vec<Revision>>>,
  28. },
  29. DocumentDelta {
  30. ret: oneshot::Sender<DocResult<DocRevision>>,
  31. },
  32. }
  33. pub struct RevisionStoreActor {
  34. doc_id: String,
  35. persistence: Arc<Persistence>,
  36. revs: Arc<DashMap<i64, RevisionOperation>>,
  37. delay_save: RwLock<Option<JoinHandle<()>>>,
  38. receiver: Option<mpsc::Receiver<RevisionCmd>>,
  39. server: Arc<dyn RevisionServer>,
  40. }
  41. impl RevisionStoreActor {
  42. pub fn new(
  43. doc_id: &str,
  44. pool: Arc<ConnectionPool>,
  45. receiver: mpsc::Receiver<RevisionCmd>,
  46. server: Arc<dyn RevisionServer>,
  47. ) -> RevisionStoreActor {
  48. let persistence = Arc::new(Persistence::new(pool));
  49. let revs = Arc::new(DashMap::new());
  50. let doc_id = doc_id.to_owned();
  51. Self {
  52. doc_id,
  53. persistence,
  54. revs,
  55. delay_save: RwLock::new(None),
  56. receiver: Some(receiver),
  57. server,
  58. }
  59. }
  60. pub async fn run(mut self) {
  61. let mut receiver = self.receiver.take().expect("Should only call once");
  62. let stream = stream! {
  63. loop {
  64. match receiver.recv().await {
  65. Some(msg) => yield msg,
  66. None => break,
  67. }
  68. }
  69. };
  70. stream.for_each(|msg| self.handle_message(msg)).await;
  71. }
  72. async fn handle_message(&self, cmd: RevisionCmd) {
  73. match cmd {
  74. RevisionCmd::Revision { revision, ret } => {
  75. let result = self.handle_new_revision(revision).await;
  76. let _ = ret.send(result);
  77. },
  78. RevisionCmd::AckRevision { rev_id } => {
  79. self.handle_revision_acked(rev_id).await;
  80. },
  81. RevisionCmd::SendRevisions { range, ret } => {
  82. let result = revs_in_range(&self.doc_id, self.persistence.clone(), range).await;
  83. let _ = ret.send(result);
  84. },
  85. RevisionCmd::DocumentDelta { ret } => {
  86. let delta = fetch_document(&self.doc_id, self.server.clone(), self.persistence.clone()).await;
  87. let _ = ret.send(delta);
  88. },
  89. }
  90. }
  91. async fn handle_new_revision(&self, revision: Revision) -> DocResult<()> {
  92. if self.revs.contains_key(&revision.rev_id) {
  93. return Err(DocError::duplicate_rev().context(format!("Duplicate revision id: {}", revision.rev_id)));
  94. }
  95. let mut operation = RevisionOperation::new(&revision);
  96. let _receiver = operation.receiver();
  97. self.revs.insert(revision.rev_id, operation);
  98. self.save_revisions().await;
  99. Ok(())
  100. }
  101. async fn handle_revision_acked(&self, rev_id: RevId) {
  102. match self.revs.get_mut(rev_id.as_ref()) {
  103. None => {},
  104. Some(mut rev) => rev.value_mut().finish(),
  105. }
  106. self.save_revisions().await;
  107. }
  108. async fn save_revisions(&self) {
  109. if let Some(handler) = self.delay_save.write().await.take() {
  110. handler.abort();
  111. }
  112. if self.revs.is_empty() {
  113. return;
  114. }
  115. let revs = self.revs.clone();
  116. let persistence = self.persistence.clone();
  117. *self.delay_save.write().await = Some(tokio::spawn(async move {
  118. tokio::time::sleep(Duration::from_millis(300)).await;
  119. let ids = revs.iter().map(|kv| kv.key().clone()).collect::<Vec<i64>>();
  120. let revisions = revs
  121. .iter()
  122. .map(|kv| ((*kv.value()).clone(), kv.state))
  123. .collect::<Vec<(Revision, RevState)>>();
  124. // TODO: Ok to unwrap?
  125. let conn = &*persistence.pool.get().map_err(internal_error).unwrap();
  126. let result = conn.immediate_transaction::<_, DocError, _>(|| {
  127. let _ = persistence.rev_sql.create_rev_table(revisions, conn).unwrap();
  128. Ok(())
  129. });
  130. match result {
  131. Ok(_) => revs.retain(|k, _| !ids.contains(k)),
  132. Err(e) => log::error!("Save revision failed: {:?}", e),
  133. }
  134. }));
  135. }
  136. }
  137. async fn fetch_document(
  138. doc_id: &str,
  139. server: Arc<dyn RevisionServer>,
  140. persistence: Arc<Persistence>,
  141. ) -> DocResult<DocRevision> {
  142. let fetch_from_remote = server.fetch_document_from_remote(doc_id).or_else(|result| {
  143. log::error!(
  144. "Fetch document delta from remote failed: {:?}, try to fetch from local",
  145. result
  146. );
  147. fetch_from_local(doc_id, persistence.clone())
  148. });
  149. let fetch_from_local = fetch_from_local(doc_id, persistence.clone()).or_else(|result| async move {
  150. log::error!(
  151. "Fetch document delta from local failed: {:?}, try to fetch from remote",
  152. result
  153. );
  154. server.fetch_document_from_remote(doc_id).await
  155. });
  156. tokio::select! {
  157. result = fetch_from_remote => {
  158. log::debug!("Finish fetching document from remote");
  159. result
  160. },
  161. result = fetch_from_local => {
  162. log::debug!("Finish fetching document from local");
  163. result
  164. },
  165. }
  166. }
  167. async fn fetch_from_local(doc_id: &str, persistence: Arc<Persistence>) -> DocResult<DocRevision> {
  168. let doc_id = doc_id.to_owned();
  169. spawn_blocking(move || {
  170. // tokio::time::timeout
  171. let conn = &*persistence.pool.get().map_err(internal_error)?;
  172. let revisions = persistence.rev_sql.read_rev_tables(&doc_id, None, conn)?;
  173. if revisions.is_empty() {
  174. return Err(DocError::not_found());
  175. }
  176. let rev_id: RevId = revisions.last().unwrap().rev_id.into();
  177. let mut delta = Delta::new();
  178. for revision in revisions {
  179. match Delta::from_bytes(revision.delta_data) {
  180. Ok(local_delta) => {
  181. delta = delta.compose(&local_delta)?;
  182. },
  183. Err(e) => {
  184. log::error!("Deserialize delta from revision failed: {}", e);
  185. },
  186. }
  187. }
  188. delta.insert("\n", Attributes::default());
  189. Result::<DocRevision, DocError>::Ok(DocRevision { rev_id, delta })
  190. })
  191. .await
  192. .map_err(internal_error)?
  193. }
  194. async fn revs_in_range(doc_id: &str, persistence: Arc<Persistence>, range: RevisionRange) -> DocResult<Vec<Revision>> {
  195. let doc_id = doc_id.to_owned();
  196. let result = spawn_blocking(move || {
  197. let conn = &*persistence.pool.get().map_err(internal_error)?;
  198. let revisions = persistence.rev_sql.read_rev_tables_with_range(&doc_id, range, conn)?;
  199. Ok(revisions)
  200. })
  201. .await
  202. .map_err(internal_error)?;
  203. result
  204. }
  205. struct Persistence {
  206. rev_sql: Arc<RevTableSql>,
  207. pool: Arc<ConnectionPool>,
  208. }
  209. impl Persistence {
  210. fn new(pool: Arc<ConnectionPool>) -> Self {
  211. let rev_sql = Arc::new(RevTableSql {});
  212. Self { rev_sql, pool }
  213. }
  214. }
  215. // fn update_revisions(&self) {
  216. // let rev_ids = self
  217. // .revs
  218. // .iter()
  219. // .flat_map(|kv| match kv.state == RevState::Acked {
  220. // true => None,
  221. // false => Some(kv.key().clone()),
  222. // })
  223. // .collect::<Vec<i64>>();
  224. //
  225. // if rev_ids.is_empty() {
  226. // return;
  227. // }
  228. //
  229. // log::debug!("Try to update {:?} state", rev_ids);
  230. // match self.update(&rev_ids) {
  231. // Ok(_) => {
  232. // self.revs.retain(|k, _| !rev_ids.contains(k));
  233. // },
  234. // Err(e) => log::error!("Save revision failed: {:?}", e),
  235. // }
  236. // }
  237. //
  238. // fn update(&self, rev_ids: &Vec<i64>) -> Result<(), DocError> {
  239. // let conn = &*self.pool.get().map_err(internal_error).unwrap();
  240. // let result = conn.immediate_transaction::<_, DocError, _>(|| {
  241. // for rev_id in rev_ids {
  242. // let changeset = RevChangeset {
  243. // doc_id: self.doc_id.clone(),
  244. // rev_id: rev_id.clone(),
  245. // state: RevState::Acked,
  246. // };
  247. // let _ = self.op_sql.update_rev_table(changeset, conn)?;
  248. // }
  249. // Ok(())
  250. // });
  251. //
  252. // result
  253. // }
  254. // fn delete_revision(&self, rev_id: RevId) {
  255. // let op_sql = self.op_sql.clone();
  256. // let pool = self.pool.clone();
  257. // let doc_id = self.doc_id.clone();
  258. // tokio::spawn(async move {
  259. // let conn = &*pool.get().map_err(internal_error).unwrap();
  260. // let result = conn.immediate_transaction::<_, DocError, _>(|| {
  261. // let _ = op_sql.delete_rev_table(&doc_id, rev_id, conn)?;
  262. // Ok(())
  263. // });
  264. //
  265. // match result {
  266. // Ok(_) => {},
  267. // Err(e) => log::error!("Delete revision failed: {:?}", e),
  268. // }
  269. // });
  270. // }