|
@@ -6,9 +6,9 @@ use crate::{
|
|
|
};
|
|
|
use async_stream::stream;
|
|
|
use dashmap::DashMap;
|
|
|
-use flowy_database::ConnectionPool;
|
|
|
+use flowy_database::{ConnectionPool, SqliteConnection};
|
|
|
use flowy_infra::future::ResultFuture;
|
|
|
-use flowy_ot::core::{Delta, OperationTransformable};
|
|
|
+use flowy_ot::core::{Delta, Operation, OperationTransformable};
|
|
|
use futures::stream::StreamExt;
|
|
|
use std::{collections::VecDeque, sync::Arc, time::Duration};
|
|
|
use tokio::{
|
|
@@ -22,7 +22,7 @@ pub struct RevisionStore {
|
|
|
revs_map: Arc<DashMap<i64, RevisionRecord>>,
|
|
|
pending_tx: PendingSender,
|
|
|
pending_revs: Arc<RwLock<VecDeque<PendingRevId>>>,
|
|
|
- defer_save_oper: RwLock<Option<JoinHandle<()>>>,
|
|
|
+ defer_save: RwLock<Option<JoinHandle<()>>>,
|
|
|
server: Arc<dyn RevisionServer>,
|
|
|
}
|
|
|
|
|
@@ -45,7 +45,7 @@ impl RevisionStore {
|
|
|
revs_map,
|
|
|
pending_revs,
|
|
|
pending_tx,
|
|
|
- defer_save_oper: RwLock::new(None),
|
|
|
+ defer_save: RwLock::new(None),
|
|
|
server,
|
|
|
});
|
|
|
|
|
@@ -94,7 +94,7 @@ impl RevisionStore {
|
|
|
}
|
|
|
|
|
|
async fn save_revisions(&self) {
|
|
|
- if let Some(handler) = self.defer_save_oper.write().await.take() {
|
|
|
+ if let Some(handler) = self.defer_save.write().await.take() {
|
|
|
handler.abort();
|
|
|
}
|
|
|
|
|
@@ -105,7 +105,7 @@ impl RevisionStore {
|
|
|
let revs_map = self.revs_map.clone();
|
|
|
let persistence = self.persistence.clone();
|
|
|
|
|
|
- *self.defer_save_oper.write().await = Some(tokio::spawn(async move {
|
|
|
+ *self.defer_save.write().await = Some(tokio::spawn(async move {
|
|
|
tokio::time::sleep(Duration::from_millis(300)).await;
|
|
|
let ids = revs_map.iter().map(|kv| kv.key().clone()).collect::<Vec<i64>>();
|
|
|
let revisions_state = revs_map
|
|
@@ -182,7 +182,7 @@ async fn fetch_from_local(doc_id: &str, persistence: Arc<Persistence>) -> DocRes
|
|
|
let doc_id = doc_id.to_owned();
|
|
|
spawn_blocking(move || {
|
|
|
let conn = &*persistence.pool.get().map_err(internal_error)?;
|
|
|
- let revisions = persistence.rev_sql.read_rev_tables(&doc_id, None, conn)?;
|
|
|
+ let revisions = persistence.rev_sql.read_rev_tables(&doc_id, conn)?;
|
|
|
if revisions.is_empty() {
|
|
|
return Err(DocError::record_not_found().context("Local doesn't have this document"));
|
|
|
}
|
|
@@ -190,7 +190,16 @@ async fn fetch_from_local(doc_id: &str, persistence: Arc<Persistence>) -> DocRes
|
|
|
let base_rev_id: RevId = revisions.last().unwrap().base_rev_id.into();
|
|
|
let rev_id: RevId = revisions.last().unwrap().rev_id.into();
|
|
|
let mut delta = Delta::new();
|
|
|
- for revision in revisions {
|
|
|
+ let mut pre_rev_id = 0;
|
|
|
+ for (index, revision) in revisions.into_iter().enumerate() {
|
|
|
+ if cfg!(debug_assertions) {
|
|
|
+ if index == 0 {
|
|
|
+ pre_rev_id = revision.rev_id;
|
|
|
+ } else {
|
|
|
+ validate_rev_id(pre_rev_id, revision.rev_id);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
match Delta::from_bytes(revision.delta_data) {
|
|
|
Ok(local_delta) => {
|
|
|
delta = delta.compose(&local_delta)?;
|
|
@@ -200,13 +209,17 @@ async fn fetch_from_local(doc_id: &str, persistence: Arc<Persistence>) -> DocRes
|
|
|
},
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ if cfg!(debug_assertions) {
|
|
|
+ validate_delta(&doc_id, persistence, conn, &delta);
|
|
|
+ }
|
|
|
+
|
|
|
match delta.ops.last() {
|
|
|
None => {},
|
|
|
Some(op) => {
|
|
|
let data = op.get_data();
|
|
|
if !data.ends_with("\n") {
|
|
|
- log::error!("The op must end with newline");
|
|
|
- log::debug!("Invalid delta: {}", delta.to_json());
|
|
|
+ delta.ops.push(Operation::Insert("\n".into()))
|
|
|
}
|
|
|
},
|
|
|
}
|
|
@@ -222,6 +235,37 @@ async fn fetch_from_local(doc_id: &str, persistence: Arc<Persistence>) -> DocRes
|
|
|
.map_err(internal_error)?
|
|
|
}
|
|
|
|
|
|
+#[cfg(debug_assertions)]
|
|
|
+fn validate_rev_id(current: i64, next: i64) {
|
|
|
+ if current >= next {
|
|
|
+ log::error!("The next revision id should be greater than the previous");
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+#[cfg(debug_assertions)]
|
|
|
+fn validate_delta(doc_id: &str, persistence: Arc<Persistence>, conn: &SqliteConnection, delta: &Delta) {
|
|
|
+ if delta.ops.last().is_none() {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ let data = delta.ops.last().as_ref().unwrap().get_data();
|
|
|
+ if !data.ends_with("\n") {
|
|
|
+ log::error!("The op must end with newline");
|
|
|
+ let result = || {
|
|
|
+ let revisions = persistence.rev_sql.read_rev_tables(&doc_id, conn)?;
|
|
|
+ for revision in revisions {
|
|
|
+ let delta = Delta::from_bytes(revision.delta_data)?;
|
|
|
+ log::error!("Invalid revision: {}:{}", revision.rev_id, delta.to_json());
|
|
|
+ }
|
|
|
+ Ok::<(), DocError>(())
|
|
|
+ };
|
|
|
+ match result() {
|
|
|
+ Ok(_) => {},
|
|
|
+ Err(e) => log::error!("{}", e),
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
// fn update_revisions(&self) {
|
|
|
// let rev_ids = self
|
|
|
// .revs
|