persistence.rs 12 KB


  1. use crate::{
  2. errors::{internal_error, DocError, DocResult},
  3. services::doc::revision::{model::*, RevisionServer},
  4. sql_tables::RevState,
  5. };
  6. use async_stream::stream;
  7. use dashmap::DashMap;
  8. use flowy_database::{ConnectionPool, SqliteConnection};
  9. use flowy_document_infra::entities::doc::{revision_from_doc, Doc, RevId, RevType, Revision, RevisionRange};
  10. use futures::stream::StreamExt;
  11. use lib_infra::future::ResultFuture;
  12. use lib_ot::core::{Delta, Operation, OperationTransformable};
  13. use std::{collections::VecDeque, sync::Arc, time::Duration};
  14. use tokio::{
  15. sync::{broadcast, mpsc, RwLock},
  16. task::{spawn_blocking, JoinHandle},
  17. };
  18. pub struct RevisionStore {
  19. doc_id: String,
  20. persistence: Arc<Persistence>,
  21. revs_map: Arc<DashMap<i64, RevisionRecord>>,
  22. pending_tx: PendingSender,
  23. pending_revs: Arc<RwLock<VecDeque<PendingRevId>>>,
  24. defer_save: RwLock<Option<JoinHandle<()>>>,
  25. server: Arc<dyn RevisionServer>,
  26. }
  27. impl RevisionStore {
  28. pub fn new(
  29. doc_id: &str,
  30. pool: Arc<ConnectionPool>,
  31. server: Arc<dyn RevisionServer>,
  32. ws_revision_sender: mpsc::UnboundedSender<Revision>,
  33. ) -> Arc<RevisionStore> {
  34. let doc_id = doc_id.to_owned();
  35. let persistence = Arc::new(Persistence::new(pool));
  36. let revs_map = Arc::new(DashMap::new());
  37. let (pending_tx, pending_rx) = mpsc::unbounded_channel();
  38. let pending_revs = Arc::new(RwLock::new(VecDeque::new()));
  39. let store = Arc::new(Self {
  40. doc_id,
  41. persistence,
  42. revs_map,
  43. pending_revs,
  44. pending_tx,
  45. defer_save: RwLock::new(None),
  46. server,
  47. });
  48. tokio::spawn(RevisionStream::new(store.clone(), pending_rx, ws_revision_sender).run());
  49. store
  50. }
  51. #[tracing::instrument(level = "debug", skip(self, revision))]
  52. pub async fn add_revision(&self, revision: Revision) -> DocResult<()> {
  53. if self.revs_map.contains_key(&revision.rev_id) {
  54. return Err(DocError::duplicate_rev().context(format!("Duplicate revision id: {}", revision.rev_id)));
  55. }
  56. let (sender, receiver) = broadcast::channel(2);
  57. let revs_map = self.revs_map.clone();
  58. let mut rx = sender.subscribe();
  59. tokio::spawn(async move {
  60. if let Ok(rev_id) = rx.recv().await {
  61. match revs_map.get_mut(&rev_id) {
  62. None => {},
  63. Some(mut rev) => rev.value_mut().state = RevState::Acked,
  64. }
  65. }
  66. });
  67. let pending_rev = PendingRevId::new(revision.rev_id, sender);
  68. self.pending_revs.write().await.push_back(pending_rev);
  69. self.revs_map.insert(revision.rev_id, RevisionRecord::new(revision));
  70. let _ = self.pending_tx.send(PendingMsg::Revision { ret: receiver });
  71. self.save_revisions().await;
  72. Ok(())
  73. }
  74. #[tracing::instrument(level = "debug", skip(self, rev_id), fields(rev_id = %rev_id.as_ref()))]
  75. pub async fn ack_revision(&self, rev_id: RevId) {
  76. let rev_id = rev_id.value;
  77. self.pending_revs
  78. .write()
  79. .await
  80. .retain(|pending| !pending.finish(rev_id));
  81. self.save_revisions().await;
  82. }
  83. async fn save_revisions(&self) {
  84. if let Some(handler) = self.defer_save.write().await.take() {
  85. handler.abort();
  86. }
  87. if self.revs_map.is_empty() {
  88. return;
  89. }
  90. let revs_map = self.revs_map.clone();
  91. let persistence = self.persistence.clone();
  92. *self.defer_save.write().await = Some(tokio::spawn(async move {
  93. tokio::time::sleep(Duration::from_millis(300)).await;
  94. let ids = revs_map.iter().map(|kv| *kv.key()).collect::<Vec<i64>>();
  95. let revisions_state = revs_map
  96. .iter()
  97. .map(|kv| (kv.revision.clone(), kv.state))
  98. .collect::<Vec<(Revision, RevState)>>();
  99. match persistence.create_revs(revisions_state.clone()) {
  100. Ok(_) => {
  101. tracing::debug!(
  102. "Revision State Changed: {:?}",
  103. revisions_state.iter().map(|s| (s.0.rev_id, s.1)).collect::<Vec<_>>()
  104. );
  105. revs_map.retain(|k, _| !ids.contains(k));
  106. },
  107. Err(e) => log::error!("Save revision failed: {:?}", e),
  108. }
  109. }));
  110. }
  111. pub async fn revs_in_range(&self, range: RevisionRange) -> DocResult<Vec<Revision>> {
  112. let revs = range
  113. .iter()
  114. .flat_map(|rev_id| match self.revs_map.get(&rev_id) {
  115. None => None,
  116. Some(rev) => Some(rev.revision.clone()),
  117. })
  118. .collect::<Vec<Revision>>();
  119. if revs.len() == range.len() as usize {
  120. Ok(revs)
  121. } else {
  122. let doc_id = self.doc_id.clone();
  123. let persistence = self.persistence.clone();
  124. let result = spawn_blocking(move || persistence.read_rev_with_range(&doc_id, range))
  125. .await
  126. .map_err(internal_error)?;
  127. result
  128. }
  129. }
  130. pub async fn fetch_document(&self) -> DocResult<Doc> {
  131. let result = fetch_from_local(&self.doc_id, self.persistence.clone()).await;
  132. if result.is_ok() {
  133. return result;
  134. }
  135. let doc = self.server.fetch_document_from_remote(&self.doc_id).await?;
  136. let revision = revision_from_doc(doc.clone(), RevType::Remote);
  137. let _ = self.persistence.create_revs(vec![(revision, RevState::Acked)])?;
  138. Ok(doc)
  139. }
  140. }
  141. impl RevisionIterator for RevisionStore {
  142. fn next(&self) -> ResultFuture<Option<Revision>, DocError> {
  143. let pending_revs = self.pending_revs.clone();
  144. let revs_map = self.revs_map.clone();
  145. let persistence = self.persistence.clone();
  146. let doc_id = self.doc_id.clone();
  147. ResultFuture::new(async move {
  148. match pending_revs.read().await.front() {
  149. None => Ok(None),
  150. Some(pending) => match revs_map.get(&pending.rev_id) {
  151. None => persistence.read_rev(&doc_id, &pending.rev_id),
  152. Some(context) => Ok(Some(context.revision.clone())),
  153. },
  154. }
  155. })
  156. }
  157. }
  158. async fn fetch_from_local(doc_id: &str, persistence: Arc<Persistence>) -> DocResult<Doc> {
  159. let doc_id = doc_id.to_owned();
  160. spawn_blocking(move || {
  161. let conn = &*persistence.pool.get().map_err(internal_error)?;
  162. let revisions = persistence.rev_sql.read_rev_tables(&doc_id, conn)?;
  163. if revisions.is_empty() {
  164. return Err(DocError::record_not_found().context("Local doesn't have this document"));
  165. }
  166. let base_rev_id: RevId = revisions.last().unwrap().base_rev_id.into();
  167. let rev_id: RevId = revisions.last().unwrap().rev_id.into();
  168. let mut delta = Delta::new();
  169. for (_, revision) in revisions.into_iter().enumerate() {
  170. match Delta::from_bytes(revision.delta_data) {
  171. Ok(local_delta) => {
  172. delta = delta.compose(&local_delta)?;
  173. },
  174. Err(e) => {
  175. log::error!("Deserialize delta from revision failed: {}", e);
  176. },
  177. }
  178. }
  179. #[cfg(debug_assertions)]
  180. validate_delta(&doc_id, persistence, conn, &delta);
  181. match delta.ops.last() {
  182. None => {},
  183. Some(op) => {
  184. let data = op.get_data();
  185. if !data.ends_with('\n') {
  186. delta.ops.push(Operation::Insert("\n".into()))
  187. }
  188. },
  189. }
  190. Result::<Doc, DocError>::Ok(Doc {
  191. id: doc_id,
  192. data: delta.to_json(),
  193. rev_id: rev_id.into(),
  194. base_rev_id: base_rev_id.into(),
  195. })
  196. })
  197. .await
  198. .map_err(internal_error)?
  199. }
  200. #[cfg(debug_assertions)]
  201. fn validate_delta(doc_id: &str, persistence: Arc<Persistence>, conn: &SqliteConnection, delta: &Delta) {
  202. if delta.ops.last().is_none() {
  203. return;
  204. }
  205. let data = delta.ops.last().as_ref().unwrap().get_data();
  206. if !data.ends_with('\n') {
  207. log::error!("The op must end with newline");
  208. let result = || {
  209. let revisions = persistence.rev_sql.read_rev_tables(&doc_id, conn)?;
  210. for revision in revisions {
  211. let delta = Delta::from_bytes(revision.delta_data)?;
  212. log::error!("Invalid revision: {}:{}", revision.rev_id, delta.to_json());
  213. }
  214. Ok::<(), DocError>(())
  215. };
  216. match result() {
  217. Ok(_) => {},
  218. Err(e) => log::error!("{}", e),
  219. }
  220. }
  221. }
  222. // fn update_revisions(&self) {
  223. // let rev_ids = self
  224. // .revs
  225. // .iter()
  226. // .flat_map(|kv| match kv.state == RevState::Acked {
  227. // true => None,
  228. // false => Some(kv.key().clone()),
  229. // })
  230. // .collect::<Vec<i64>>();
  231. //
  232. // if rev_ids.is_empty() {
  233. // return;
  234. // }
  235. //
  236. // tracing::debug!("Try to update {:?} state", rev_ids);
  237. // match self.update(&rev_ids) {
  238. // Ok(_) => {
  239. // self.revs.retain(|k, _| !rev_ids.contains(k));
  240. // },
  241. // Err(e) => log::error!("Save revision failed: {:?}", e),
  242. // }
  243. // }
  244. //
  245. // fn update(&self, rev_ids: &Vec<i64>) -> Result<(), DocError> {
  246. // let conn = &*self.pool.get().map_err(internal_error).unwrap();
  247. // let result = conn.immediate_transaction::<_, DocError, _>(|| {
  248. // for rev_id in rev_ids {
  249. // let changeset = RevChangeset {
  250. // doc_id: self.doc_id.clone(),
  251. // rev_id: rev_id.clone(),
  252. // state: RevState::Acked,
  253. // };
  254. // let _ = self.op_sql.update_rev_table(changeset, conn)?;
  255. // }
  256. // Ok(())
  257. // });
  258. //
  259. // result
  260. // }
  261. // fn delete_revision(&self, rev_id: RevId) {
  262. // let op_sql = self.op_sql.clone();
  263. // let pool = self.pool.clone();
  264. // let doc_id = self.doc_id.clone();
  265. // tokio::spawn(async move {
  266. // let conn = &*pool.get().map_err(internal_error).unwrap();
  267. // let result = conn.immediate_transaction::<_, DocError, _>(|| {
  268. // let _ = op_sql.delete_rev_table(&doc_id, rev_id, conn)?;
  269. // Ok(())
  270. // });
  271. //
  272. // match result {
  273. // Ok(_) => {},
  274. // Err(e) => log::error!("Delete revision failed: {:?}", e),
  275. // }
  276. // });
  277. // }
  278. pub(crate) enum PendingMsg {
  279. Revision { ret: RevIdReceiver },
  280. }
  281. pub(crate) type PendingSender = mpsc::UnboundedSender<PendingMsg>;
  282. pub(crate) type PendingReceiver = mpsc::UnboundedReceiver<PendingMsg>;
  283. pub(crate) struct RevisionStream {
  284. revisions: Arc<dyn RevisionIterator>,
  285. receiver: Option<PendingReceiver>,
  286. ws_revision_sender: mpsc::UnboundedSender<Revision>,
  287. }
  288. impl RevisionStream {
  289. pub(crate) fn new(
  290. revisions: Arc<dyn RevisionIterator>,
  291. pending_rx: PendingReceiver,
  292. ws_revision_sender: mpsc::UnboundedSender<Revision>,
  293. ) -> Self {
  294. Self {
  295. revisions,
  296. receiver: Some(pending_rx),
  297. ws_revision_sender,
  298. }
  299. }
  300. pub async fn run(mut self) {
  301. let mut receiver = self.receiver.take().expect("Should only call once");
  302. let stream = stream! {
  303. loop {
  304. match receiver.recv().await {
  305. Some(msg) => yield msg,
  306. None => break,
  307. }
  308. }
  309. };
  310. stream
  311. .for_each(|msg| async {
  312. match self.handle_msg(msg).await {
  313. Ok(_) => {},
  314. Err(e) => log::error!("{:?}", e),
  315. }
  316. })
  317. .await;
  318. }
  319. async fn handle_msg(&self, msg: PendingMsg) -> DocResult<()> {
  320. match msg {
  321. PendingMsg::Revision { ret } => self.prepare_next_pending_rev(ret).await,
  322. }
  323. }
  324. async fn prepare_next_pending_rev(&self, mut ret: RevIdReceiver) -> DocResult<()> {
  325. match self.revisions.next().await? {
  326. None => Ok(()),
  327. Some(revision) => {
  328. let _ = self.ws_revision_sender.send(revision).map_err(internal_error);
  329. let _ = tokio::time::timeout(Duration::from_millis(2000), ret.recv()).await;
  330. Ok(())
  331. },
  332. }
  333. }
  334. }