appflowy 3 年 前
コミット
1744938a51

+ 1 - 1
frontend/rust-lib/flowy-core/Flowy.toml

@@ -1,3 +1,3 @@
 
-proto_crates = ["src/entities", "src/event.rs", "src/errors.rs", "src/notify"]
+proto_crates = ["src/entities", "src/event.rs", "src/notify"]
 event_files = ["src/event.rs"]

+ 1 - 1
frontend/rust-lib/flowy-document/Flowy.toml

@@ -1,3 +1,3 @@
 
-proto_crates = ["src/event.rs", "src/errors.rs", "src/notify"]
+proto_crates = ["src/notify"]
 event_files = []

+ 13 - 8
frontend/rust-lib/flowy-document/src/services/doc/controller.rs

@@ -28,7 +28,12 @@ pub(crate) struct DocController {
 impl DocController {
     pub(crate) fn new(server: Server, user: Arc<dyn DocumentUser>, ws_handlers: Arc<DocumentWsHandlers>) -> Self {
         let open_cache = Arc::new(OpenDocCache::new());
-        Self { server, ws_handlers, open_cache, user }
+        Self {
+            server,
+            ws_handlers,
+            open_cache,
+            user,
+        }
     }
 
     pub(crate) fn init(&self) -> FlowyResult<()> {
@@ -93,8 +98,13 @@ impl DocController {
         pool: Arc<ConnectionPool>,
     ) -> Result<Arc<ClientDocEditor>, FlowyError> {
         let user = self.user.clone();
+        let token = self.user.token()?;
         let rev_manager = self.make_rev_manager(doc_id, pool.clone())?;
-        let doc_editor = ClientDocEditor::new(doc_id, user, pool, rev_manager, self.ws_handlers.ws()).await?;
+        let server = Arc::new(RevisionServerImpl {
+            token,
+            server: self.server.clone(),
+        });
+        let doc_editor = ClientDocEditor::new(doc_id, user, pool, rev_manager, self.ws_handlers.ws(), server).await?;
         let ws_handler = doc_editor.ws_handler();
         self.ws_handlers.register_handler(doc_id, ws_handler);
         self.open_cache.insert(&doc_id, &doc_editor);
@@ -105,13 +115,8 @@ impl DocController {
         // Opti: require upgradable_read lock and then upgrade to write lock using
         // RwLockUpgradableReadGuard::upgrade(xx) of ws
         // let doc = self.read_doc(doc_id, pool.clone()).await?;
-        let token = self.user.token()?;
         let user_id = self.user.user_id()?;
-        let server = Arc::new(RevisionServerImpl {
-            token,
-            server: self.server.clone(),
-        });
-        let cache = Arc::new(RevisionCache::new(&user_id, doc_id, pool, server));
+        let cache = Arc::new(RevisionCache::new(&user_id, doc_id, pool));
         Ok(RevisionManager::new(&user_id, doc_id, cache))
     }
 }

+ 31 - 21
frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs

@@ -13,16 +13,15 @@ use lib_ot::{
     revision::{RevId, RevType, Revision, RevisionRange},
     rich_text::{RichTextAttribute, RichTextDelta},
 };
-use parking_lot::RwLock;
 use std::{collections::VecDeque, sync::Arc};
-use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
-type SinkVec = Arc<RwLock<VecDeque<DocumentWSData>>>;
+use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot, RwLock};
+
 pub struct ClientDocEditor {
     pub doc_id: String,
     rev_manager: Arc<RevisionManager>,
     ws_manager: Arc<WebSocketManager>,
     edit_cmd_tx: UnboundedSender<EditCommand>,
-    sink_vec: SinkVec,
+    sink_data_provider: SinkDataProvider,
     user: Arc<dyn DocumentUser>,
 }
 
@@ -33,22 +32,23 @@ impl ClientDocEditor {
         pool: Arc<ConnectionPool>,
         mut rev_manager: RevisionManager,
         ws: Arc<dyn DocumentWebSocket>,
+        server: Arc<dyn RevisionServer>,
     ) -> FlowyResult<Arc<Self>> {
-        let delta = rev_manager.load_document().await?;
+        let delta = rev_manager.load_document(server).await?;
         let edit_cmd_tx = spawn_edit_queue(doc_id, delta, pool.clone());
         let doc_id = doc_id.to_string();
         let rev_manager = Arc::new(rev_manager);
-        let sink_vec = Arc::new(RwLock::new(VecDeque::new()));
+        let sink_data_provider = Arc::new(RwLock::new(VecDeque::new()));
         let data_provider = Arc::new(DocumentSinkDataProviderAdapter {
             rev_manager: rev_manager.clone(),
-            sink_vec: sink_vec.clone(),
+            data_provider: sink_data_provider.clone(),
         });
         let stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter {
             doc_id: doc_id.clone(),
             edit_cmd_tx: edit_cmd_tx.clone(),
             rev_manager: rev_manager.clone(),
             user: user.clone(),
-            sink_vec: sink_vec.clone(),
+            sink_data_provider: sink_data_provider.clone(),
         });
         let ws_manager = Arc::new(WebSocketManager::new(&doc_id, ws, data_provider, stream_consumer));
         let editor = Arc::new(Self {
@@ -56,7 +56,7 @@ impl ClientDocEditor {
             rev_manager,
             ws_manager,
             edit_cmd_tx,
-            sink_vec,
+            sink_data_provider,
             user,
         });
         Ok(editor)
@@ -202,7 +202,7 @@ struct DocumentWebSocketSteamConsumerAdapter {
     edit_cmd_tx: UnboundedSender<EditCommand>,
     rev_manager: Arc<RevisionManager>,
     user: Arc<dyn DocumentUser>,
-    sink_vec: SinkVec,
+    sink_data_provider: SinkDataProvider,
 }
 
 impl DocumentWebSocketSteamConsumer for DocumentWebSocketSteamConsumerAdapter {
@@ -210,43 +210,50 @@ impl DocumentWebSocketSteamConsumer for DocumentWebSocketSteamConsumerAdapter {
         let user = self.user.clone();
         let rev_manager = self.rev_manager.clone();
         let edit_cmd_tx = self.edit_cmd_tx.clone();
+        let sink_data_provider = self.sink_data_provider.clone();
         let doc_id = self.doc_id.clone();
         FutureResult::new(async move {
             let user_id = user.user_id()?;
-            let _revision = handle_push_rev(&doc_id, &user_id, edit_cmd_tx, rev_manager, bytes).await?;
+            if let Some(revision) = handle_push_rev(&doc_id, &user_id, edit_cmd_tx, rev_manager, bytes).await? {
+                sink_data_provider.write().await.push_back(revision.into());
+            }
             Ok(())
         })
     }
 
-    fn make_revision_from_range(&self, range: RevisionRange) -> FutureResult<Revision, FlowyError> {
+    fn receive_ack_revision(&self, rev_id: i64) -> FutureResult<(), FlowyError> {
         let rev_manager = self.rev_manager.clone();
         FutureResult::new(async move {
-            let revision = rev_manager.mk_revisions(range).await?;
-            Ok(revision)
+            let _ = rev_manager.ack_revision(rev_id).await?;
+            Ok(())
         })
     }
 
-    fn ack_revision(&self, rev_id: i64) -> FutureResult<(), FlowyError> {
+    fn send_revision_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError> {
         let rev_manager = self.rev_manager.clone();
+        let sink_data_provider = self.sink_data_provider.clone();
         FutureResult::new(async move {
-            let _ = rev_manager.ack_revision(rev_id).await?;
+            let revision = rev_manager.mk_revisions(range).await?;
+            sink_data_provider.write().await.push_back(revision.into());
             Ok(())
         })
     }
 }
 
+type SinkDataProvider = Arc<RwLock<VecDeque<DocumentWSData>>>;
+
 struct DocumentSinkDataProviderAdapter {
     rev_manager: Arc<RevisionManager>,
-    sink_vec: SinkVec,
+    data_provider: SinkDataProvider,
 }
 
 impl DocumentSinkDataProvider for DocumentSinkDataProviderAdapter {
     fn next(&self) -> FutureResult<Option<DocumentWSData>, FlowyError> {
         let rev_manager = self.rev_manager.clone();
-        let sink_vec = self.sink_vec.clone();
+        let data_provider = self.data_provider.clone();
 
         FutureResult::new(async move {
-            if sink_vec.read().is_empty() {
+            if data_provider.read().await.is_empty() {
                 match rev_manager.next_sync_revision().await? {
                     Some(rev) => {
                         tracing::debug!("[DocumentSinkDataProvider]: revision: {}:{:?}", rev.doc_id, rev.rev_id);
@@ -255,9 +262,12 @@ impl DocumentSinkDataProvider for DocumentSinkDataProviderAdapter {
                     None => Ok(None),
                 }
             } else {
-                match sink_vec.read().front() {
+                match data_provider.read().await.front() {
                     None => Ok(None),
-                    Some(data) => Ok(Some(data.clone())),
+                    Some(data) => {
+                        tracing::debug!("[DocumentSinkDataProvider]: {}:{:?}", data.doc_id, data.ty);
+                        Ok(Some(data.clone()))
+                    },
                 }
             }
         })

+ 2 - 2
frontend/rust-lib/flowy-document/src/services/doc/edit/mod.rs

@@ -1,8 +1,8 @@
 mod edit_queue;
-mod edit_ws;
 mod editor;
 mod model;
+mod web_socket;
 
 pub(crate) use edit_queue::*;
-pub use edit_ws::*;
 pub use editor::*;
+pub use web_socket::*;

+ 5 - 13
frontend/rust-lib/flowy-document/src/services/doc/edit/edit_ws.rs → frontend/rust-lib/flowy-document/src/services/doc/edit/web_socket.rs

@@ -1,10 +1,7 @@
 use crate::services::doc::{DocumentWebSocket, DocumentWsHandler, SYNC_INTERVAL_IN_MILLIS};
 use async_stream::stream;
 use bytes::Bytes;
-use flowy_collaboration::{
-    entities::ws::{DocumentWSData, DocumentWSDataType},
-    Revision,
-};
+use flowy_collaboration::entities::ws::{DocumentWSData, DocumentWSDataType};
 use flowy_error::{internal_error, FlowyError, FlowyResult};
 use futures::stream::StreamExt;
 use lib_infra::future::FutureResult;
@@ -66,7 +63,6 @@ impl WebSocketManager {
             &self.doc_id,
             self.stream_consumer.clone(),
             ws_msg_rx,
-            self.ws.clone(),
             self.stop_sync_tx.subscribe(),
         );
         tokio::spawn(sink.run());
@@ -117,15 +113,14 @@ impl DocumentWsHandler for WebSocketManager {
 
 pub trait DocumentWebSocketSteamConsumer: Send + Sync {
     fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError>;
-    fn make_revision_from_range(&self, range: RevisionRange) -> FutureResult<Revision, FlowyError>;
-    fn ack_revision(&self, rev_id: i64) -> FutureResult<(), FlowyError>;
+    fn receive_ack_revision(&self, rev_id: i64) -> FutureResult<(), FlowyError>;
+    fn send_revision_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError>;
 }
 
 pub(crate) struct DocumentWebSocketStream {
     doc_id: String,
     consumer: Arc<dyn DocumentWebSocketSteamConsumer>,
     ws_msg_rx: Option<mpsc::UnboundedReceiver<DocumentWSData>>,
-    ws_sender: Arc<dyn DocumentWebSocket>,
     stop_rx: Option<SinkStopRx>,
 }
 
@@ -134,14 +129,12 @@ impl DocumentWebSocketStream {
         doc_id: &str,
         consumer: Arc<dyn DocumentWebSocketSteamConsumer>,
         ws_msg_rx: mpsc::UnboundedReceiver<DocumentWSData>,
-        ws_sender: Arc<dyn DocumentWebSocket>,
         stop_rx: SinkStopRx,
     ) -> Self {
         DocumentWebSocketStream {
             doc_id: doc_id.to_owned(),
             consumer,
             ws_msg_rx: Some(ws_msg_rx),
-            ws_sender,
             stop_rx: Some(stop_rx),
         }
     }
@@ -200,12 +193,11 @@ impl DocumentWebSocketStream {
             },
             DocumentWSDataType::PullRev => {
                 let range = RevisionRange::try_from(bytes)?;
-                let revision = self.consumer.make_revision_from_range(range).await?;
-                let _ = self.ws_sender.send(revision.into());
+                let _ = self.consumer.send_revision_in_range(range).await?;
             },
             DocumentWSDataType::Acked => {
                 let rev_id = RevId::try_from(bytes)?;
-                let _ = self.consumer.ack_revision(rev_id.into()).await;
+                let _ = self.consumer.receive_ack_revision(rev_id.into()).await;
             },
             DocumentWSDataType::UserConnect => {},
         }

+ 72 - 199
frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs

@@ -1,56 +1,51 @@
 use crate::{
     errors::FlowyError,
     services::doc::revision::{
-        cache::{disk::RevisionDiskCache, memory::RevisionMemoryCache},
+        cache::{
+            disk::{Persistence, RevisionDiskCache},
+            memory::{RevisionMemoryCache, RevisionMemoryCacheMissing},
+            sync::RevisionSyncSeq,
+        },
         RevisionRecord,
-        RevisionServer,
     },
-    sql_tables::RevTableSql,
 };
-use bytes::Bytes;
-use flowy_collaboration::{entities::doc::Doc, util::md5};
+
 use flowy_database::ConnectionPool;
 use flowy_error::{internal_error, FlowyResult};
 use lib_infra::future::FutureResult;
 use lib_ot::{
-    core::{Operation, OperationTransformable},
-    revision::{RevState, RevType, Revision, RevisionRange},
+    core::Operation,
+    revision::{RevState, Revision, RevisionRange},
     rich_text::RichTextDelta,
 };
-use std::{sync::Arc, time::Duration};
+use std::sync::Arc;
 use tokio::{
-    sync::{mpsc, RwLock},
+    sync::RwLock,
     task::{spawn_blocking, JoinHandle},
 };
 
-type DocRevisionDeskCache = dyn RevisionDiskCache<Error = FlowyError>;
+type DocRevisionDiskCache = dyn RevisionDiskCache<Error = FlowyError>;
 
 pub struct RevisionCache {
-    user_id: String,
     doc_id: String,
-    dish_cache: Arc<DocRevisionDeskCache>,
+    pub disk_cache: Arc<DocRevisionDiskCache>,
     memory_cache: Arc<RevisionMemoryCache>,
+    sync_seq: Arc<RevisionSyncSeq>,
     defer_save: RwLock<Option<JoinHandle<()>>>,
-    server: Arc<dyn RevisionServer>,
 }
 
 impl RevisionCache {
-    pub fn new(
-        user_id: &str,
-        doc_id: &str,
-        pool: Arc<ConnectionPool>,
-        server: Arc<dyn RevisionServer>,
-    ) -> RevisionCache {
+    pub fn new(user_id: &str, doc_id: &str, pool: Arc<ConnectionPool>) -> RevisionCache {
+        let disk_cache = Arc::new(Persistence::new(user_id, pool));
+        let memory_cache = Arc::new(RevisionMemoryCache::new(doc_id, Arc::new(disk_cache.clone())));
+        let sync_seq = Arc::new(RevisionSyncSeq::new());
         let doc_id = doc_id.to_owned();
-        let dish_cache = Arc::new(Persistence::new(user_id, pool));
-        let memory_cache = Arc::new(RevisionMemoryCache::new());
         Self {
-            user_id: user_id.to_owned(),
             doc_id,
-            dish_cache,
+            disk_cache,
             memory_cache,
+            sync_seq,
             defer_save: RwLock::new(None),
-            server,
         }
     }
 
@@ -63,7 +58,8 @@ impl RevisionCache {
             revision,
             state: RevState::StateLocal,
         };
-        self.memory_cache.add_revision(record).await?;
+        let _ = self.memory_cache.add_revision(&record).await;
+        self.sync_seq.add_revision(record).await?;
         self.save_revisions().await;
         Ok(())
     }
@@ -77,104 +73,60 @@ impl RevisionCache {
             revision,
             state: RevState::StateLocal,
         };
-        self.memory_cache.add_revision(record).await?;
+        self.memory_cache.add_revision(&record).await;
         self.save_revisions().await;
         Ok(())
     }
 
     #[tracing::instrument(level = "debug", skip(self, rev_id), fields(rev_id = %rev_id))]
     pub async fn ack_revision(&self, rev_id: i64) {
-        self.memory_cache.ack_revision(&rev_id).await;
+        self.sync_seq.ack_revision(&rev_id).await;
         self.save_revisions().await;
     }
 
-    pub async fn query_revision(&self, doc_id: &str, rev_id: i64) -> Option<RevisionRecord> {
-        match self.memory_cache.query_revision(&rev_id).await {
-            None => match self.dish_cache.read_revision(doc_id, rev_id) {
-                Ok(revision) => revision,
-                Err(e) => {
-                    log::error!("query_revision error: {:?}", e);
-                    None
-                },
-            },
-            Some(record) => Some(record),
-        }
+    pub async fn get_revision(&self, _doc_id: &str, rev_id: i64) -> Option<RevisionRecord> {
+        self.memory_cache.get_revision(&rev_id).await
     }
 
     async fn save_revisions(&self) {
+        // https://github.com/async-graphql/async-graphql/blob/ed8449beec3d9c54b94da39bab33cec809903953/src/dataloader/mod.rs#L362
         if let Some(handler) = self.defer_save.write().await.take() {
             handler.abort();
         }
 
-        if self.memory_cache.is_empty() {
-            return;
-        }
+        // if self.sync_seq.is_empty() {
+        //     return;
+        // }
 
-        let memory_cache = self.memory_cache.clone();
-        let disk_cache = self.dish_cache.clone();
-        *self.defer_save.write().await = Some(tokio::spawn(async move {
-            tokio::time::sleep(Duration::from_millis(300)).await;
-            let (ids, records) = memory_cache.revisions();
-            match disk_cache.create_revisions(records) {
-                Ok(_) => {
-                    memory_cache.remove_revisions(ids);
-                },
-                Err(e) => log::error!("Save revision failed: {:?}", e),
-            }
-        }));
+        // let memory_cache = self.sync_seq.clone();
+        // let disk_cache = self.disk_cache.clone();
+        // *self.defer_save.write().await = Some(tokio::spawn(async move {
+        //     tokio::time::sleep(Duration::from_millis(300)).await;
+        //     let (ids, records) = memory_cache.revisions();
+        //     match disk_cache.create_revisions(records) {
+        //         Ok(_) => {
+        //             memory_cache.remove_revisions(ids);
+        //         },
+        //         Err(e) => log::error!("Save revision failed: {:?}", e),
+        //     }
+        // }));
     }
 
     pub async fn revisions_in_range(&self, range: RevisionRange) -> FlowyResult<Vec<Revision>> {
-        let revs = self.memory_cache.revisions_in_range(&range).await?;
-        if revs.len() == range.len() as usize {
-            Ok(revs)
-        } else {
-            let doc_id = self.doc_id.clone();
-            let disk_cache = self.dish_cache.clone();
-            let records = spawn_blocking(move || disk_cache.revisions_in_range(&doc_id, &range))
-                .await
-                .map_err(internal_error)??;
-
-            let revisions = records
-                .into_iter()
-                .map(|record| record.revision)
-                .collect::<Vec<Revision>>();
-            Ok(revisions)
-        }
-    }
-
-    pub async fn load_document(&self) -> FlowyResult<Doc> {
-        // Loading the document from disk and it will be sync with server.
-        let result = load_from_disk(&self.doc_id, self.memory_cache.clone(), self.dish_cache.clone()).await;
-        if result.is_ok() {
-            return result;
-        }
-
-        // The document doesn't exist in local. Try load from server
-        let doc = self.server.fetch_document(&self.doc_id).await?;
-        let delta_data = Bytes::from(doc.data.clone());
-        let doc_md5 = md5(&delta_data);
-        let revision = Revision::new(
-            &doc.id,
-            doc.base_rev_id,
-            doc.rev_id,
-            delta_data,
-            RevType::Remote,
-            &self.user_id,
-            doc_md5,
-        );
-
-        self.add_remote_revision(revision).await?;
-        Ok(doc)
+        let records = self.memory_cache.get_revisions_in_range(&range).await?;
+        Ok(records
+            .into_iter()
+            .map(|record| record.revision)
+            .collect::<Vec<Revision>>())
     }
 
     pub(crate) fn next_revision(&self) -> FutureResult<Option<Revision>, FlowyError> {
-        let memory_cache = self.memory_cache.clone();
-        let disk_cache = self.dish_cache.clone();
+        let sync_seq = self.sync_seq.clone();
+        let disk_cache = self.disk_cache.clone();
         let doc_id = self.doc_id.clone();
         FutureResult::new(async move {
-            match memory_cache.front_local_revision().await {
-                None => match memory_cache.front_local_rev_id().await {
+            match sync_seq.next_sync_revision().await {
+                None => match sync_seq.next_sync_rev_id().await {
                     None => Ok(None),
                     Some(rev_id) => match disk_cache.read_revision(&doc_id, rev_id)? {
                         None => Ok(None),
@@ -187,116 +139,37 @@ impl RevisionCache {
     }
 }
 
-async fn load_from_disk(
-    doc_id: &str,
-    memory_cache: Arc<RevisionMemoryCache>,
-    disk_cache: Arc<DocRevisionDeskCache>,
-) -> FlowyResult<Doc> {
-    let doc_id = doc_id.to_owned();
-    let (tx, mut rx) = mpsc::channel(2);
-    let doc = spawn_blocking(move || {
-        let records = disk_cache.read_revisions(&doc_id)?;
-        if records.is_empty() {
-            return Err(FlowyError::record_not_found().context("Local doesn't have this document"));
-        }
-
-        let (base_rev_id, rev_id) = records.last().unwrap().revision.pair_rev_id();
-        let mut delta = RichTextDelta::new();
-        for (_, record) in records.into_iter().enumerate() {
-            // Opti: revision's clone may cause memory issues
-            match RichTextDelta::from_bytes(record.revision.clone().delta_data) {
-                Ok(local_delta) => {
-                    delta = delta.compose(&local_delta)?;
-                    match tx.blocking_send(record) {
-                        Ok(_) => {},
-                        Err(e) => tracing::error!("❌Load document from disk error: {}", e),
-                    }
-                },
-                Err(e) => {
-                    tracing::error!("Deserialize delta from revision failed: {}", e);
-                },
-            }
-        }
-
-        correct_delta_if_need(&mut delta);
-        Result::<Doc, FlowyError>::Ok(Doc {
-            id: doc_id,
-            data: delta.to_json(),
-            rev_id,
-            base_rev_id,
-        })
-    })
-    .await
-    .map_err(internal_error)?;
-
-    while let Some(record) = rx.recv().await {
-        match memory_cache.add_revision(record).await {
-            Ok(_) => {},
-            Err(e) => log::error!("{:?}", e),
+impl RevisionMemoryCacheMissing for Arc<Persistence> {
+    fn get_revision_record(&self, doc_id: &str, rev_id: i64) -> Result<Option<RevisionRecord>, FlowyError> {
+        match self.read_revision(&doc_id, rev_id)? {
+            None => {
+                tracing::warn!("Can't find revision in {} with rev_id: {}", doc_id, rev_id);
+                Ok(None)
+            },
+            Some(record) => Ok(Some(record)),
         }
     }
-    doc
-}
-
-fn correct_delta_if_need(delta: &mut RichTextDelta) {
-    if delta.ops.last().is_none() {
-        return;
-    }
-
-    let data = delta.ops.last().as_ref().unwrap().get_data();
-    if !data.ends_with('\n') {
-        log::error!("❌The op must end with newline. Correcting it by inserting newline op");
-        delta.ops.push(Operation::Insert("\n".into()));
-    }
-}
-
-pub(crate) struct Persistence {
-    user_id: String,
-    pub(crate) pool: Arc<ConnectionPool>,
-}
 
-impl RevisionDiskCache for Persistence {
-    type Error = FlowyError;
+    fn get_revision_records_with_range(
+        &self,
+        doc_id: &str,
+        range: RevisionRange,
+    ) -> FutureResult<Vec<RevisionRecord>, FlowyError> {
+        let disk_cache = self.clone();
+        let doc_id = doc_id.to_owned();
+        FutureResult::new(async move {
+            let records = spawn_blocking(move || disk_cache.revisions_in_range(&doc_id, &range))
+                .await
+                .map_err(internal_error)??;
 
-    fn create_revisions(&self, revisions: Vec<RevisionRecord>) -> Result<(), Self::Error> {
-        let conn = &*self.pool.get().map_err(internal_error)?;
-        conn.immediate_transaction::<_, FlowyError, _>(|| {
-            let _ = RevTableSql::create_rev_table(revisions, conn)?;
-            Ok(())
+            Ok::<Vec<RevisionRecord>, FlowyError>(records)
         })
     }
-
-    fn revisions_in_range(&self, doc_id: &str, range: &RevisionRange) -> Result<Vec<RevisionRecord>, Self::Error> {
-        let conn = &*self.pool.get().map_err(internal_error).unwrap();
-        let revisions = RevTableSql::read_rev_tables_with_range(&self.user_id, doc_id, range.clone(), conn)?;
-        Ok(revisions)
-    }
-
-    fn read_revision(&self, doc_id: &str, rev_id: i64) -> Result<Option<RevisionRecord>, Self::Error> {
-        let conn = self.pool.get().map_err(internal_error)?;
-        let some = RevTableSql::read_rev_table(&self.user_id, doc_id, &rev_id, &*conn)?;
-        Ok(some)
-    }
-
-    fn read_revisions(&self, doc_id: &str) -> Result<Vec<RevisionRecord>, Self::Error> {
-        let conn = self.pool.get().map_err(internal_error)?;
-        let some = RevTableSql::read_rev_tables(&self.user_id, doc_id, &*conn)?;
-        Ok(some)
-    }
-}
-
-impl Persistence {
-    pub(crate) fn new(user_id: &str, pool: Arc<ConnectionPool>) -> Self {
-        Self {
-            user_id: user_id.to_owned(),
-            pool,
-        }
-    }
 }
 
 #[cfg(feature = "flowy_unit_test")]
 impl RevisionCache {
-    pub fn dish_cache(&self) -> Arc<DocRevisionDeskCache> { self.dish_cache.clone() }
+    pub fn disk_cache(&self) -> Arc<DocRevisionDiskCache> { self.disk_cache.clone() }
 
-    pub fn memory_cache(&self) -> Arc<RevisionMemoryCache> { self.memory_cache.clone() }
+    pub fn memory_cache(&self) -> Arc<RevisionSyncSeq> { self.sync_seq.clone() }
 }

+ 48 - 1
frontend/rust-lib/flowy-document/src/services/doc/revision/cache/disk.rs

@@ -1,7 +1,10 @@
 use crate::services::doc::revision::RevisionRecord;
 
+use crate::sql_tables::RevTableSql;
+use flowy_database::ConnectionPool;
+use flowy_error::{internal_error, FlowyError};
 use lib_ot::revision::RevisionRange;
-use std::fmt::Debug;
+use std::{fmt::Debug, sync::Arc};
 
 pub trait RevisionDiskCache: Sync + Send {
     type Error: Debug;
@@ -10,3 +13,47 @@ pub trait RevisionDiskCache: Sync + Send {
     fn read_revision(&self, doc_id: &str, rev_id: i64) -> Result<Option<RevisionRecord>, Self::Error>;
     fn read_revisions(&self, doc_id: &str) -> Result<Vec<RevisionRecord>, Self::Error>;
 }
+
+pub(crate) struct Persistence {
+    user_id: String,
+    pub(crate) pool: Arc<ConnectionPool>,
+}
+
+impl RevisionDiskCache for Persistence {
+    type Error = FlowyError;
+
+    fn create_revisions(&self, revisions: Vec<RevisionRecord>) -> Result<(), Self::Error> {
+        let conn = &*self.pool.get().map_err(internal_error)?;
+        conn.immediate_transaction::<_, FlowyError, _>(|| {
+            let _ = RevTableSql::create_rev_table(revisions, conn)?;
+            Ok(())
+        })
+    }
+
+    fn revisions_in_range(&self, doc_id: &str, range: &RevisionRange) -> Result<Vec<RevisionRecord>, Self::Error> {
+        let conn = &*self.pool.get().map_err(internal_error).unwrap();
+        let revisions = RevTableSql::read_rev_tables_with_range(&self.user_id, doc_id, range.clone(), conn)?;
+        Ok(revisions)
+    }
+
+    fn read_revision(&self, doc_id: &str, rev_id: i64) -> Result<Option<RevisionRecord>, Self::Error> {
+        let conn = self.pool.get().map_err(internal_error)?;
+        let some = RevTableSql::read_rev_table(&self.user_id, doc_id, &rev_id, &*conn)?;
+        Ok(some)
+    }
+
+    fn read_revisions(&self, doc_id: &str) -> Result<Vec<RevisionRecord>, Self::Error> {
+        let conn = self.pool.get().map_err(internal_error)?;
+        let some = RevTableSql::read_rev_tables(&self.user_id, doc_id, &*conn)?;
+        Ok(some)
+    }
+}
+
+impl Persistence {
+    pub(crate) fn new(user_id: &str, pool: Arc<ConnectionPool>) -> Self {
+        Self {
+            user_id: user_id.to_owned(),
+            pool,
+        }
+    }
+}

+ 58 - 97
frontend/rust-lib/flowy-document/src/services/doc/revision/cache/memory.rs

@@ -1,126 +1,87 @@
-use crate::services::doc::revision::RevisionRecord;
+use crate::services::doc::RevisionRecord;
 use dashmap::DashMap;
-use lib_ot::{
-    errors::OTError,
-    revision::{RevState, Revision, RevisionRange},
-};
-use std::{collections::VecDeque, sync::Arc};
+use flowy_error::FlowyError;
+use lib_infra::future::FutureResult;
+use lib_ot::revision::RevisionRange;
+use std::sync::Arc;
 use tokio::sync::RwLock;
 
-pub struct RevisionMemoryCache {
+pub(crate) trait RevisionMemoryCacheMissing: Send + Sync {
+    fn get_revision_record(&self, doc_id: &str, rev_id: i64) -> Result<Option<RevisionRecord>, FlowyError>;
+    fn get_revision_records_with_range(
+        &self,
+        doc_id: &str,
+        range: RevisionRange,
+    ) -> FutureResult<Vec<RevisionRecord>, FlowyError>;
+}
+
+pub(crate) struct RevisionMemoryCache {
+    doc_id: String,
     revs_map: Arc<DashMap<i64, RevisionRecord>>,
-    local_revs: Arc<RwLock<VecDeque<i64>>>,
+    rev_loader: Arc<dyn RevisionMemoryCacheMissing>,
+    revs_order: Arc<RwLock<Vec<i64>>>,
 }
 
-impl std::default::Default for RevisionMemoryCache {
-    fn default() -> Self {
-        let local_revs = Arc::new(RwLock::new(VecDeque::new()));
+// TODO: remove outdated revisions to reduce memory usage
+impl RevisionMemoryCache {
+    pub(crate) fn new(doc_id: &str, rev_loader: Arc<dyn RevisionMemoryCacheMissing>) -> Self {
         RevisionMemoryCache {
+            doc_id: doc_id.to_owned(),
             revs_map: Arc::new(DashMap::new()),
-            local_revs,
+            rev_loader,
+            revs_order: Arc::new(RwLock::new(vec![])),
         }
     }
-}
-
-impl RevisionMemoryCache {
-    pub fn new() -> Self { RevisionMemoryCache::default() }
-
-    pub async fn add_revision(&self, record: RevisionRecord) -> Result<(), OTError> {
-        // The last revision's rev_id must be greater than the new one.
-        if let Some(rev_id) = self.local_revs.read().await.back() {
-            if *rev_id >= record.revision.rev_id {
-                return Err(OTError::revision_id_conflict()
-                    .context(format!("The new revision's id must be greater than {}", rev_id)));
-            }
-        }
 
-        match record.state {
-            RevState::StateLocal => {
-                tracing::debug!("{}:add revision {}", record.revision.doc_id, record.revision.rev_id);
-                self.local_revs.write().await.push_back(record.revision.rev_id);
-            },
-            RevState::Acked => {},
-        }
-
-        self.revs_map.insert(record.revision.rev_id, record);
-        Ok(())
-    }
+    pub(crate) async fn is_empty(&self) -> bool { self.revs_order.read().await.is_empty() }
 
-    pub fn remove_revisions(&self, ids: Vec<i64>) { self.revs_map.retain(|k, _| !ids.contains(k)); }
+    pub(crate) fn contains(&self, rev_id: &i64) -> bool { self.revs_map.contains_key(rev_id) }
 
-    pub async fn ack_revision(&self, rev_id: &i64) {
-        if let Some(pop_rev_id) = self.front_local_rev_id().await {
-            if &pop_rev_id != rev_id {
+    pub(crate) async fn add_revision(&self, record: &RevisionRecord) {
+        if let Some(rev_id) = self.revs_order.read().await.last() {
+            if *rev_id >= record.revision.rev_id {
+                tracing::error!("Duplicated revision added to memory_cache");
                 return;
             }
         }
+        self.revs_map.insert(record.revision.rev_id, record.clone());
+        self.revs_order.write().await.push(record.revision.rev_id);
+    }
 
-        match self.local_revs.write().await.pop_front() {
-            None => {},
-            Some(pop_rev_id) => {
-                if &pop_rev_id != rev_id {
-                    tracing::error!("The front rev_id:{} not equal to ack rev_id: {}", pop_rev_id, rev_id);
-                    assert_eq!(&pop_rev_id, rev_id);
-                } else {
-                    tracing::debug!("pop revision {}", pop_rev_id);
-                }
+    pub(crate) async fn get_revision(&self, rev_id: &i64) -> Option<RevisionRecord> {
+        match self.revs_map.get(&rev_id).map(|r| r.value().clone()) {
+            None => match self.rev_loader.get_revision_record(&self.doc_id, *rev_id) {
+                Ok(revision) => revision,
+                Err(e) => {
+                    tracing::error!("{}", e);
+                    None
+                },
             },
+            Some(revision) => Some(revision),
         }
     }
 
-    pub async fn revisions_in_range(&self, range: &RevisionRange) -> Result<Vec<Revision>, OTError> {
+    pub(crate) async fn get_revisions_in_range(
+        &self,
+        range: &RevisionRange,
+    ) -> Result<Vec<RevisionRecord>, FlowyError> {
+        let range_len = range.len() as usize;
         let revs = range
             .iter()
-            .flat_map(|rev_id| match self.revs_map.get(&rev_id) {
-                None => None,
-                Some(record) => Some(record.revision.clone()),
-            })
-            .collect::<Vec<Revision>>();
+            .flat_map(|rev_id| self.revs_map.get(&rev_id).map(|record| record.clone()))
+            .collect::<Vec<RevisionRecord>>();
 
-        if revs.len() == range.len() as usize {
+        if revs.len() == range_len {
             Ok(revs)
         } else {
-            Ok(vec![])
-        }
-    }
-
-    pub fn contains(&self, rev_id: &i64) -> bool { self.revs_map.contains_key(rev_id) }
-
-    pub fn is_empty(&self) -> bool { self.revs_map.is_empty() }
-
-    pub fn revisions(&self) -> (Vec<i64>, Vec<RevisionRecord>) {
-        let mut records: Vec<RevisionRecord> = vec![];
-        let mut ids: Vec<i64> = vec![];
-
-        self.revs_map.iter().for_each(|kv| {
-            records.push(kv.value().clone());
-            ids.push(*kv.key());
-        });
-        (ids, records)
-    }
-
-    pub async fn query_revision(&self, rev_id: &i64) -> Option<RevisionRecord> {
-        self.revs_map.get(&rev_id).map(|r| r.value().clone())
-    }
-
-    pub async fn front_local_revision(&self) -> Option<(i64, RevisionRecord)> {
-        match self.local_revs.read().await.front() {
-            None => None,
-            Some(rev_id) => match self.revs_map.get(rev_id).map(|r| (*r.key(), r.value().clone())) {
-                None => None,
-                Some(val) => {
-                    tracing::debug!("{}:try send revision {}", val.1.revision.doc_id, val.1.revision.rev_id);
-                    Some(val)
-                },
-            },
+            let revs = self
+                .rev_loader
+                .get_revision_records_with_range(&self.doc_id, range.clone())
+                .await?;
+            if revs.len() != range_len {
+                log::error!("Revisions len is not equal to range required");
+            }
+            Ok(revs)
         }
     }
-
-    pub async fn front_local_rev_id(&self) -> Option<i64> { self.local_revs.read().await.front().copied() }
-}
-
-#[cfg(feature = "flowy_unit_test")]
-impl RevisionMemoryCache {
-    pub fn revs_map(&self) -> Arc<DashMap<i64, RevisionRecord>> { self.revs_map.clone() }
-    pub fn pending_revs(&self) -> Arc<RwLock<VecDeque<i64>>> { self.local_revs.clone() }
 }

+ 1 - 0
frontend/rust-lib/flowy-document/src/services/doc/revision/cache/mod.rs

@@ -3,6 +3,7 @@ mod cache;
 mod disk;
 mod memory;
 mod model;
+mod sync;
 
 pub use cache::*;
 pub use model::*;

+ 71 - 0
frontend/rust-lib/flowy-document/src/services/doc/revision/cache/sync.rs

@@ -0,0 +1,71 @@
+use crate::services::doc::revision::RevisionRecord;
+use dashmap::DashMap;
+use lib_ot::errors::OTError;
+use std::{collections::VecDeque, sync::Arc};
+use tokio::sync::RwLock;
+
+pub struct RevisionSyncSeq {
+    revs_map: Arc<DashMap<i64, RevisionRecord>>,
+    local_revs: Arc<RwLock<VecDeque<i64>>>,
+}
+
+impl std::default::Default for RevisionSyncSeq {
+    fn default() -> Self {
+        let local_revs = Arc::new(RwLock::new(VecDeque::new()));
+        RevisionSyncSeq {
+            revs_map: Arc::new(DashMap::new()),
+            local_revs,
+        }
+    }
+}
+
+impl RevisionSyncSeq {
+    pub fn new() -> Self { RevisionSyncSeq::default() }
+
+    pub async fn add_revision(&self, record: RevisionRecord) -> Result<(), OTError> {
+        // The last revision's rev_id must be greater than the new one.
+        if let Some(rev_id) = self.local_revs.read().await.back() {
+            if *rev_id >= record.revision.rev_id {
+                return Err(OTError::revision_id_conflict()
+                    .context(format!("The new revision's id must be greater than {}", rev_id)));
+            }
+        }
+        self.revs_map.insert(record.revision.rev_id, record);
+        Ok(())
+    }
+
+    pub async fn ack_revision(&self, rev_id: &i64) {
+        if let Some(pop_rev_id) = self.next_sync_rev_id().await {
+            if &pop_rev_id != rev_id {
+                tracing::error!("The next ack rev_id must be equal to the next rev_id");
+                assert_eq!(&pop_rev_id, rev_id);
+                return;
+            }
+
+            tracing::debug!("pop revision {}", pop_rev_id);
+            self.revs_map.remove(&pop_rev_id);
+            let _ = self.local_revs.write().await.pop_front();
+        }
+    }
+
+    pub async fn next_sync_revision(&self) -> Option<(i64, RevisionRecord)> {
+        match self.local_revs.read().await.front() {
+            None => None,
+            Some(rev_id) => match self.revs_map.get(rev_id).map(|r| (*r.key(), r.value().clone())) {
+                None => None,
+                Some(val) => {
+                    tracing::debug!("{}:try send revision {}", val.1.revision.doc_id, val.1.revision.rev_id);
+                    Some(val)
+                },
+            },
+        }
+    }
+
+    pub async fn next_sync_rev_id(&self) -> Option<i64> { self.local_revs.read().await.front().copied() }
+}
+
+#[cfg(feature = "flowy_unit_test")]
+impl RevisionSyncSeq {
+    pub fn revs_map(&self) -> Arc<DashMap<i64, RevisionRecord>> { self.revs_map.clone() }
+    pub fn pending_revs(&self) -> Arc<RwLock<VecDeque<i64>>> { self.local_revs.clone() }
+}

+ 90 - 4
frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs

@@ -1,4 +1,5 @@
 use crate::{errors::FlowyError, services::doc::revision::RevisionCache};
+use bytes::Bytes;
 use flowy_collaboration::{
     entities::doc::Doc,
     util::{md5, RevIdCounter},
@@ -6,8 +7,8 @@ use flowy_collaboration::{
 use flowy_error::FlowyResult;
 use lib_infra::future::FutureResult;
 use lib_ot::{
-    core::OperationTransformable,
-    revision::{RevType, Revision, RevisionRange},
+    core::{Operation, OperationTransformable},
+    revision::{RevState, RevType, Revision, RevisionRange},
     rich_text::RichTextDelta,
 };
 use std::sync::Arc;
@@ -34,8 +35,16 @@ impl RevisionManager {
         }
     }
 
-    pub async fn load_document(&mut self) -> FlowyResult<RichTextDelta> {
-        let doc = self.cache.load_document().await?;
+    pub async fn load_document(&mut self, server: Arc<dyn RevisionServer>) -> FlowyResult<RichTextDelta> {
+        let revisions = RevisionLoader {
+            doc_id: self.doc_id.clone(),
+            user_id: self.user_id.clone(),
+            server,
+            cache: self.cache.clone(),
+        }
+        .load()
+        .await?;
+        let doc = mk_doc_from_revisions(&self.doc_id, revisions)?;
         self.update_rev_id_counter_value(doc.rev_id);
         Ok(doc.delta()?)
     }
@@ -100,3 +109,80 @@ impl RevisionManager {
 impl RevisionManager {
     pub fn revision_cache(&self) -> Arc<RevisionCache> { self.cache.clone() }
 }
+
+struct RevisionLoader {
+    doc_id: String,
+    user_id: String,
+    server: Arc<dyn RevisionServer>,
+    cache: Arc<RevisionCache>,
+}
+
+impl RevisionLoader {
+    async fn load(&self) -> Result<Vec<Revision>, FlowyError> {
+        let records = self.cache.disk_cache.read_revisions(&self.doc_id)?;
+        let revisions: Vec<Revision>;
+        if records.is_empty() {
+            let doc = self.server.fetch_document(&self.doc_id).await?;
+            let delta_data = Bytes::from(doc.data.clone());
+            let doc_md5 = md5(&delta_data);
+            let revision = Revision::new(
+                &doc.id,
+                doc.base_rev_id,
+                doc.rev_id,
+                delta_data,
+                RevType::Remote,
+                &self.user_id,
+                doc_md5,
+            );
+            let _ = self.cache.add_remote_revision(revision.clone()).await?;
+            revisions = vec![revision];
+        } else {
+            for record in &records {
+                match record.state {
+                    RevState::StateLocal => match self.cache.add_local_revision(record.revision.clone()).await {
+                        Ok(_) => {},
+                        Err(e) => tracing::error!("{}", e),
+                    },
+                    RevState::Acked => {},
+                }
+            }
+            revisions = records.into_iter().map(|record| record.revision).collect::<_>();
+        }
+
+        Ok(revisions)
+    }
+}
+
+fn mk_doc_from_revisions(doc_id: &str, revisions: Vec<Revision>) -> FlowyResult<Doc> {
+    let (base_rev_id, rev_id) = revisions.last().unwrap().pair_rev_id();
+    let mut delta = RichTextDelta::new();
+    for (_, revision) in revisions.into_iter().enumerate() {
+        match RichTextDelta::from_bytes(revision.delta_data) {
+            Ok(local_delta) => {
+                delta = delta.compose(&local_delta)?;
+            },
+            Err(e) => {
+                tracing::error!("Deserialize delta from revision failed: {}", e);
+            },
+        }
+    }
+    correct_delta_if_need(&mut delta);
+
+    Result::<Doc, FlowyError>::Ok(Doc {
+        id: doc_id.to_owned(),
+        data: delta.to_json(),
+        rev_id,
+        base_rev_id,
+    })
+}
+fn correct_delta_if_need(delta: &mut RichTextDelta) {
+    if delta.ops.last().is_none() {
+        return;
+    }
+
+    let data = delta.ops.last().as_ref().unwrap().get_data();
+    if !data.ends_with('\n') {
+        log::error!("❌The op must end with newline. Correcting it by inserting newline op");
+        delta.ops.push(Operation::Insert("\n".into()));
+    }
+}

+ 0 - 0
frontend/rust-lib/flowy-document/src/services/doc/ws/mod.rs


+ 2 - 2
frontend/rust-lib/flowy-test/src/doc_script.rs

@@ -47,7 +47,7 @@ impl EditorTest {
         let rev_manager = self.editor.rev_manager();
         let cache = rev_manager.revision_cache();
         let _memory_cache = cache.memory_cache();
-        let _disk_cache = cache.dish_cache();
+        let _disk_cache = cache.disk_cache();
         let doc_id = self.editor.doc_id.clone();
         let _user_id = self.sdk.user_session.user_id().unwrap();
         let ws_manager = self.sdk.ws_manager.clone();
@@ -71,7 +71,7 @@ impl EditorTest {
                 self.editor.replace(interval, s).await.unwrap();
             },
             EditorScript::AssertRevisionState(rev_id, state) => {
-                let record = cache.query_revision(&doc_id, rev_id).await.unwrap();
+                let record = cache.get_revision(&doc_id, rev_id).await.unwrap();
                 assert_eq!(record.state, state);
             },
             EditorScript::AssertCurrentRevId(rev_id) => {

+ 1 - 1
frontend/rust-lib/flowy-user/Flowy.toml

@@ -1,3 +1,3 @@
 
-proto_crates = ["src/event.rs", "src/errors.rs", "src/notify"]
+proto_crates = ["src/event.rs", "src/notify"]
 event_files = ["src/event.rs"]

+ 1 - 5
shared-lib/flowy-collaboration/src/core/sync/synchronizer.rs

@@ -1,20 +1,16 @@
 use crate::{
     core::document::Document,
-    entities::ws::{DocumentWSData, DocumentWSDataType, WsDocumentDataBuilder},
+    entities::ws::{DocumentWSData, WsDocumentDataBuilder},
 };
-use bytes::Bytes;
 use lib_ot::{
     core::OperationTransformable,
     errors::OTError,
-    protobuf::RevId,
     revision::{RevType, Revision, RevisionRange},
     rich_text::RichTextDelta,
 };
 use parking_lot::RwLock;
-use protobuf::Message;
 use std::{
     cmp::Ordering,
-    convert::TryInto,
     fmt::Debug,
     sync::{
         atomic::{AtomicI64, Ordering::SeqCst},

+ 1 - 1
shared-lib/flowy-collaboration/src/entities/ws/ws.rs

@@ -84,7 +84,7 @@ impl WsDocumentDataBuilder {
 
     // DocumentWSDataType::Acked -> RevId
     pub fn build_acked_message(doc_id: &str, rev_id: i64) -> DocumentWSData {
-        let cloned_rev_id = rev_id.clone();
+        let cloned_rev_id = rev_id;
         let rev_id: RevId = rev_id.into();
         let bytes: Bytes = rev_id.try_into().unwrap();