Преглед на файлове

fix bugs & add unit test

appflowy преди 3 години
родител
ревизия
909406bf08

+ 27 - 24
frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs

@@ -4,7 +4,7 @@ use crate::{
     services::{
         doc::{
             edit::{EditCommand, EditCommandQueue, OpenDocAction, TransformDeltas},
-            revision::{RevisionDownStream, RevisionManager, SteamStopRx, SteamStopTx},
+            revision::{RevisionDownStream, RevisionManager, SteamStopTx},
         },
         ws::{DocumentWebSocket, WsDocumentHandler},
     },
@@ -31,7 +31,7 @@ pub type DocId = String;
 pub struct ClientDocEditor {
     pub doc_id: DocId,
     rev_manager: Arc<RevisionManager>,
-    edit_tx: UnboundedSender<EditCommand>,
+    edit_cmd_tx: UnboundedSender<EditCommand>,
     ws_sender: Arc<dyn DocumentWebSocket>,
     user: Arc<dyn DocumentUser>,
     ws_msg_tx: UnboundedSender<WsDocumentData>,
@@ -47,7 +47,7 @@ impl ClientDocEditor {
         ws_sender: Arc<dyn DocumentWebSocket>,
     ) -> DocResult<Arc<Self>> {
         let delta = rev_manager.load_document().await?;
-        let edit_queue_tx = spawn_edit_queue(doc_id, delta, pool.clone());
+        let edit_cmd_tx = spawn_edit_queue(doc_id, delta, pool.clone());
         let doc_id = doc_id.to_string();
         let rev_manager = Arc::new(rev_manager);
         let (ws_msg_tx, ws_msg_rx) = mpsc::unbounded_channel();
@@ -56,12 +56,13 @@ impl ClientDocEditor {
         let edit_doc = Arc::new(Self {
             doc_id,
             rev_manager,
-            edit_tx: edit_queue_tx,
+            edit_cmd_tx,
+            ws_sender,
             user,
             ws_msg_tx,
-            ws_sender,
             stop_sync_tx,
         });
+
         edit_doc.notify_open_doc();
 
         start_sync(edit_doc.clone(), ws_msg_rx, cloned_stop_sync_tx);
@@ -75,7 +76,7 @@ impl ClientDocEditor {
             data: data.to_string(),
             ret,
         };
-        let _ = self.edit_tx.send(msg);
+        let _ = self.edit_cmd_tx.send(msg);
         let delta = rx.await.map_err(internal_error)??;
         let _ = self.save_local_delta(delta).await?;
         Ok(())
@@ -84,7 +85,7 @@ impl ClientDocEditor {
     pub async fn delete(&self, interval: Interval) -> Result<(), DocError> {
         let (ret, rx) = oneshot::channel::<DocumentResult<RichTextDelta>>();
         let msg = EditCommand::Delete { interval, ret };
-        let _ = self.edit_tx.send(msg);
+        let _ = self.edit_cmd_tx.send(msg);
         let delta = rx.await.map_err(internal_error)??;
         let _ = self.save_local_delta(delta).await?;
         Ok(())
@@ -97,7 +98,7 @@ impl ClientDocEditor {
             attribute,
             ret,
         };
-        let _ = self.edit_tx.send(msg);
+        let _ = self.edit_cmd_tx.send(msg);
         let delta = rx.await.map_err(internal_error)??;
         let _ = self.save_local_delta(delta).await?;
         Ok(())
@@ -110,7 +111,7 @@ impl ClientDocEditor {
             data: data.to_string(),
             ret,
         };
-        let _ = self.edit_tx.send(msg);
+        let _ = self.edit_cmd_tx.send(msg);
         let delta = rx.await.map_err(internal_error)??;
         let _ = self.save_local_delta(delta).await?;
         Ok(())
@@ -119,21 +120,21 @@ impl ClientDocEditor {
     pub async fn can_undo(&self) -> bool {
         let (ret, rx) = oneshot::channel::<bool>();
         let msg = EditCommand::CanUndo { ret };
-        let _ = self.edit_tx.send(msg);
+        let _ = self.edit_cmd_tx.send(msg);
         rx.await.unwrap_or(false)
     }
 
     pub async fn can_redo(&self) -> bool {
         let (ret, rx) = oneshot::channel::<bool>();
         let msg = EditCommand::CanRedo { ret };
-        let _ = self.edit_tx.send(msg);
+        let _ = self.edit_cmd_tx.send(msg);
         rx.await.unwrap_or(false)
     }
 
     pub async fn undo(&self) -> Result<UndoResult, DocError> {
         let (ret, rx) = oneshot::channel::<DocumentResult<UndoResult>>();
         let msg = EditCommand::Undo { ret };
-        let _ = self.edit_tx.send(msg);
+        let _ = self.edit_cmd_tx.send(msg);
         let r = rx.await.map_err(internal_error)??;
         Ok(r)
     }
@@ -141,7 +142,7 @@ impl ClientDocEditor {
     pub async fn redo(&self) -> Result<UndoResult, DocError> {
         let (ret, rx) = oneshot::channel::<DocumentResult<UndoResult>>();
         let msg = EditCommand::Redo { ret };
-        let _ = self.edit_tx.send(msg);
+        let _ = self.edit_cmd_tx.send(msg);
         let r = rx.await.map_err(internal_error)??;
         Ok(r)
     }
@@ -149,7 +150,7 @@ impl ClientDocEditor {
     pub async fn delta(&self) -> DocResult<DocDelta> {
         let (ret, rx) = oneshot::channel::<DocumentResult<String>>();
         let msg = EditCommand::ReadDoc { ret };
-        let _ = self.edit_tx.send(msg);
+        let _ = self.edit_cmd_tx.send(msg);
         let data = rx.await.map_err(internal_error)??;
 
         Ok(DocDelta {
@@ -175,16 +176,16 @@ impl ClientDocEditor {
             delta: delta.clone(),
             ret,
         };
-        let _ = self.edit_tx.send(msg);
+        let _ = self.edit_cmd_tx.send(msg);
         let _ = rx.await.map_err(internal_error)??;
 
         let _ = self.save_local_delta(delta).await?;
         Ok(())
     }
 
-    #[tracing::instrument(level = "debug", skip(self), fields(doc_id))]
+    #[tracing::instrument(level = "debug", skip(self))]
     pub fn stop_sync(&self) {
-        tracing::Span::current().record("doc_id", &self.doc_id.as_str());
+        tracing::debug!("{} stop sync", self.doc_id);
         let _ = self.stop_sync_tx.send(());
     }
 
@@ -208,7 +209,7 @@ impl ClientDocEditor {
     pub(crate) async fn handle_push_rev(&self, bytes: Bytes) -> DocResult<()> {
         // Transform the revision
         let (ret, rx) = oneshot::channel::<DocumentResult<TransformDeltas>>();
-        let _ = self.edit_tx.send(EditCommand::ProcessRemoteRevision { bytes, ret });
+        let _ = self.edit_cmd_tx.send(EditCommand::ProcessRemoteRevision { bytes, ret });
         let TransformDeltas {
             client_prime,
             server_prime,
@@ -226,7 +227,7 @@ impl ClientDocEditor {
             delta: client_prime.clone(),
             ret,
         };
-        let _ = self.edit_tx.send(msg);
+        let _ = self.edit_cmd_tx.send(msg);
         let _ = rx.await.map_err(internal_error)??;
 
         // update rev id
@@ -256,10 +257,12 @@ impl ClientDocEditor {
         Ok(())
     }
 
-    async fn handle_ws_message(&self, doc_data: WsDocumentData) -> DocResult<()> {
+    pub async fn handle_ws_message(&self, doc_data: WsDocumentData) -> DocResult<()> {
         match self.ws_msg_tx.send(doc_data) {
-            Ok(_) => {},
-            Err(e) => log::error!("Propagate ws message data failed. {}", e),
+            Ok(_) => {
+                tracing::debug!("Propagate ws message data success")
+            },
+            Err(e) => tracing::error!("Propagate ws message data failed. {}", e),
         }
         Ok(())
     }
@@ -320,7 +323,7 @@ impl ClientDocEditor {
     pub async fn doc_json(&self) -> DocResult<String> {
         let (ret, rx) = oneshot::channel::<DocumentResult<String>>();
         let msg = EditCommand::ReadDoc { ret };
-        let _ = self.edit_tx.send(msg);
+        let _ = self.edit_cmd_tx.send(msg);
         let s = rx.await.map_err(internal_error)??;
         Ok(s)
     }
@@ -328,7 +331,7 @@ impl ClientDocEditor {
     pub async fn doc_delta(&self) -> DocResult<RichTextDelta> {
         let (ret, rx) = oneshot::channel::<DocumentResult<RichTextDelta>>();
         let msg = EditCommand::ReadDocDelta { ret };
-        let _ = self.edit_tx.send(msg);
+        let _ = self.edit_cmd_tx.send(msg);
         let delta = rx.await.map_err(internal_error)??;
         Ok(delta)
     }

+ 8 - 14
frontend/rust-lib/flowy-document/src/services/doc/revision/cache.rs

@@ -8,16 +8,7 @@ use flowy_document_infra::entities::doc::Doc;
 use lib_infra::future::ResultFuture;
 use lib_ot::{
     core::{Operation, OperationTransformable},
-    revision::{
-        RevId,
-        RevState,
-        RevType,
-        Revision,
-        RevisionDiskCache,
-        RevisionMemoryCache,
-        RevisionRange,
-        RevisionRecord,
-    },
+    revision::{RevState, RevType, Revision, RevisionDiskCache, RevisionMemoryCache, RevisionRange, RevisionRecord},
     rich_text::RichTextDelta,
 };
 use std::{sync::Arc, time::Duration};
@@ -64,13 +55,16 @@ impl RevisionCache {
         Ok(())
     }
 
-    #[tracing::instrument(level = "debug", skip(self, rev_id), fields(rev_id = %rev_id.as_ref()))]
-    pub async fn ack_revision(&self, rev_id: RevId) {
-        let rev_id = rev_id.value;
-        self.memory_cache.mut_revision(&rev_id, |mut rev| rev.value_mut().ack());
+    #[tracing::instrument(level = "debug", skip(self, rev_id), fields(rev_id = %rev_id))]
+    pub async fn ack_revision(&self, rev_id: i64) {
+        self.memory_cache.ack_revision(&rev_id).await;
         self.save_revisions().await;
     }
 
+    pub async fn query_revision(&self, rev_id: i64) -> Option<RevisionRecord> {
+        self.memory_cache.query_revision(&rev_id).await
+    }
+
     async fn save_revisions(&self) {
         if let Some(handler) = self.defer_save.write().await.take() {
             handler.abort();

+ 2 - 2
frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs

@@ -49,7 +49,7 @@ impl RevisionManager {
     }
 
     pub async fn ack_revision(&self, rev_id: RevId) -> Result<(), DocError> {
-        self.cache.ack_revision(rev_id).await;
+        self.cache.ack_revision(rev_id.into()).await;
         Ok(())
     }
 
@@ -89,7 +89,7 @@ impl RevisionManager {
     }
 
     pub(crate) fn make_up_stream(&self, stop_rx: SteamStopRx) -> RevisionUpStream {
-        RevisionUpStream::new(self.cache.clone(), self.ws_sender.clone(), stop_rx)
+        RevisionUpStream::new(&self.doc_id, self.cache.clone(), self.ws_sender.clone(), stop_rx)
     }
 }
 

+ 21 - 14
frontend/rust-lib/flowy-document/src/services/doc/revision/sync.rs

@@ -23,7 +23,7 @@ use tokio::{
 pub(crate) struct RevisionDownStream {
     editor: Arc<ClientDocEditor>,
     rev_manager: Arc<RevisionManager>,
-    receiver: Option<mpsc::UnboundedReceiver<WsDocumentData>>,
+    ws_msg_rx: Option<mpsc::UnboundedReceiver<WsDocumentData>>,
     ws_sender: Arc<dyn DocumentWebSocket>,
     stop_rx: Option<SteamStopRx>,
 }
@@ -32,36 +32,36 @@ impl RevisionDownStream {
     pub(crate) fn new(
         editor: Arc<ClientDocEditor>,
         rev_manager: Arc<RevisionManager>,
-        receiver: mpsc::UnboundedReceiver<WsDocumentData>,
+        ws_msg_rx: mpsc::UnboundedReceiver<WsDocumentData>,
         ws_sender: Arc<dyn DocumentWebSocket>,
         stop_rx: SteamStopRx,
     ) -> Self {
         RevisionDownStream {
             editor,
             rev_manager,
-            receiver: Some(receiver),
+            ws_msg_rx: Some(ws_msg_rx),
             ws_sender,
             stop_rx: Some(stop_rx),
         }
     }
 
     pub async fn run(mut self) {
-        let mut receiver = self.receiver.take().expect("Only take once");
+        let mut receiver = self.ws_msg_rx.take().expect("Only take once");
         let mut stop_rx = self.stop_rx.take().expect("Only take once");
+        let doc_id = self.editor.doc_id.clone();
         let stream = stream! {
             loop {
-                // match receiver.recv().await {
-                //     Some(msg) => yield msg,
-                //     None => break,
-                // }
                 tokio::select! {
                     result = receiver.recv() => {
                         match result {
                             Some(msg) => yield msg,
-                            None => break,
+                            None => {},
                         }
                     },
-                    _ = stop_rx.recv() => break,
+                    _ = stop_rx.recv() => {
+                        tracing::debug!("[RevisionDownStream:{}] loop exit", doc_id);
+                        break
+                    },
                 };
             }
         };
@@ -80,8 +80,8 @@ impl RevisionDownStream {
         let bytes = spawn_blocking(move || Bytes::from(data))
             .await
             .map_err(internal_error)?;
-        log::debug!("[RevisionDownStream]: receives new message: {:?}", ty);
 
+        log::debug!("[RevisionDownStream]: receives new message: {:?}", ty);
         match ty {
             WsDataType::PushRev => {
                 let _ = self.editor.handle_push_rev(bytes).await?;
@@ -115,10 +115,12 @@ pub(crate) struct RevisionUpStream {
     revisions: Arc<dyn RevisionIterator>,
     ws_sender: Arc<dyn DocumentWebSocket>,
     stop_rx: Option<SteamStopRx>,
+    doc_id: String,
 }
 
 impl RevisionUpStream {
     pub(crate) fn new(
+        doc_id: &str,
         revisions: Arc<dyn RevisionIterator>,
         ws_sender: Arc<dyn DocumentWebSocket>,
         stop_rx: SteamStopRx,
@@ -127,12 +129,14 @@ impl RevisionUpStream {
             revisions,
             ws_sender,
             stop_rx: Some(stop_rx),
+            doc_id: doc_id.to_owned(),
         }
     }
 
     pub async fn run(mut self) {
         let (tx, mut rx) = mpsc::unbounded_channel();
         let mut stop_rx = self.stop_rx.take().expect("Only take once");
+        let doc_id = self.doc_id.clone();
         tokio::spawn(tick(tx));
         let stream = stream! {
             loop {
@@ -140,10 +144,13 @@ impl RevisionUpStream {
                     result = rx.recv() => {
                         match result {
                             Some(msg) => yield msg,
-                            None => break,
+                            None => {},
                         }
                     },
-                    _ = stop_rx.recv() => break,
+                    _ = stop_rx.recv() => {
+                        tracing::debug!("[RevisionUpStream:{}] loop exit", doc_id);
+                        break
+                    },
                 };
             }
         };
@@ -167,7 +174,7 @@ impl RevisionUpStream {
         match self.revisions.next().await? {
             None => Ok(()),
             Some(record) => {
-                log::debug!(
+                tracing::debug!(
                     "[RevisionUpStream]: processes revision: {}:{:?}",
                     record.revision.doc_id,
                     record.revision.rev_id

+ 31 - 5
frontend/rust-lib/flowy-document/tests/editor/revision_test.rs

@@ -1,13 +1,39 @@
 use flowy_test::editor::{EditorScript::*, *};
+use lib_ot::revision::RevState;
 
 #[tokio::test]
-async fn create_doc() {
+async fn doc_rev_state_test1() {
     let scripts = vec![
         InsertText("123", 0),
-        AssertRevId(1),
-        InsertText("456", 3),
-        AssertRevId(2),
-        AssertJson(r#"[{"insert":"123456\n"}]"#),
+        AssertCurrentRevId(1),
+        AssertRevisionState(1, RevState::Local),
+        SimulateAckedMessage(1),
+        AssertRevisionState(1, RevState::Acked),
+        AssertNextSendingRevision(None),
+        AssertJson(r#"[{"insert":"123\n"}]"#),
+    ];
+    EditorTest::new().await.run_scripts(scripts).await;
+}
+
+#[tokio::test]
+async fn doc_rev_state_test2() {
+    let scripts = vec![
+        InsertText("1", 0),
+        InsertText("2", 1),
+        InsertText("3", 2),
+        AssertCurrentRevId(3),
+        AssertRevisionState(1, RevState::Local),
+        AssertRevisionState(2, RevState::Local),
+        AssertRevisionState(3, RevState::Local),
+        SimulateAckedMessage(1),
+        AssertRevisionState(1, RevState::Acked),
+        AssertNextSendingRevision(Some(2)),
+        SimulateAckedMessage(2),
+        AssertRevisionState(2, RevState::Acked),
+        //
+        AssertNextSendingRevision(Some(3)),
+        AssertRevisionState(3, RevState::Local),
+        AssertJson(r#"[{"insert":"123\n"}]"#),
     ];
     EditorTest::new().await.run_scripts(scripts).await;
 }

+ 37 - 11
frontend/rust-lib/flowy-test/src/editor.rs

@@ -1,7 +1,11 @@
 use crate::{helper::ViewTest, FlowySDKTest};
-use flowy_document::services::doc::edit::ClientDocEditor;
-use flowy_document_infra::entities::doc::DocIdentifier;
-use lib_ot::{core::Interval, revision::RevState, rich_text::RichTextDelta};
+use flowy_document::services::doc::{edit::ClientDocEditor, revision::RevisionIterator};
+use flowy_document_infra::entities::{doc::DocIdentifier, ws::WsDocumentDataBuilder};
+use lib_ot::{
+    core::Interval,
+    revision::{RevState, Revision, RevisionRange},
+    rich_text::RichTextDelta,
+};
 use std::sync::Arc;
 use tokio::time::{sleep, Duration};
 
@@ -25,14 +29,15 @@ impl EditorTest {
             self.run_script(script).await;
         }
 
-        sleep(Duration::from_secs(10)).await;
+        sleep(Duration::from_secs(5)).await;
     }
 
     async fn run_script(&mut self, script: EditorScript) {
         let rev_manager = self.editor.rev_manager();
         let cache = rev_manager.revision_cache();
-        let memory_cache = cache.memory_cache();
-        let disk_cache = cache.dish_cache();
+        let _memory_cache = cache.memory_cache();
+        let _disk_cache = cache.dish_cache();
+        let doc_id = self.editor.doc_id.clone();
 
         match script {
             EditorScript::InsertText(s, offset) => {
@@ -50,11 +55,29 @@ impl EditorTest {
             EditorScript::Redo() => {
                 self.editor.redo().await.unwrap();
             },
-            EditorScript::AssertRevisionState(rev_id, state) => {},
-            EditorScript::AssertNextSentRevision(rev_id, state) => {},
-            EditorScript::AssertRevId(rev_id) => {
+            EditorScript::AssertRevisionState(rev_id, state) => {
+                let record = cache.query_revision(rev_id).await.unwrap();
+                assert_eq!(record.state, state);
+            },
+            EditorScript::AssertCurrentRevId(rev_id) => {
                 assert_eq!(self.editor.rev_manager().rev_id(), rev_id);
             },
+            EditorScript::AssertNextSendingRevision(rev_id) => {
+                let next_revision = cache.next().await.unwrap();
+                if rev_id.is_none() {
+                    assert_eq!(next_revision.is_none(), true);
+                }
+
+                let next_revision = next_revision.unwrap();
+                assert_eq!(next_revision.revision.rev_id, rev_id.unwrap());
+            },
+            EditorScript::SimulatePushRevisionMessage(_revision) => {},
+            EditorScript::SimulatePullRevisionMessage(_range) => {},
+            EditorScript::SimulateAckedMessage(i64) => {
+                let data = WsDocumentDataBuilder::build_acked_message(&doc_id, i64);
+                self.editor.handle_ws_message(data).await.unwrap();
+                sleep(Duration::from_millis(200)).await;
+            },
             EditorScript::AssertJson(expected) => {
                 let expected_delta: RichTextDelta = serde_json::from_str(expected).unwrap();
                 let delta = self.editor.doc_delta().await.unwrap();
@@ -75,8 +98,11 @@ pub enum EditorScript {
     Replace(Interval, &'static str),
     Undo(),
     Redo(),
+    SimulatePushRevisionMessage(Revision),
+    SimulatePullRevisionMessage(RevisionRange),
+    SimulateAckedMessage(i64),
     AssertRevisionState(i64, RevState),
-    AssertNextSentRevision(i64, RevState),
-    AssertRevId(i64),
+    AssertNextSendingRevision(Option<i64>),
+    AssertCurrentRevId(i64),
     AssertJson(&'static str),
 }

+ 36 - 3
shared-lib/flowy-document-infra/src/entities/ws/ws.rs

@@ -1,7 +1,7 @@
 use crate::{entities::doc::NewDocUser, errors::DocumentError};
 use bytes::Bytes;
 use flowy_derive::{ProtoBuf, ProtoBuf_Enum};
-use lib_ot::revision::Revision;
+use lib_ot::revision::{RevId, Revision, RevisionRange};
 use std::convert::{TryFrom, TryInto};
 
 #[derive(Debug, Clone, ProtoBuf_Enum, Eq, PartialEq, Hash)]
@@ -11,7 +11,7 @@ pub enum WsDataType {
     // The frontend receives the PushRev event means the backend is pushing the new revision to frontend
     PushRev    = 1,
     // The fronted receives the PullRev event means the backend try to pull the revision from frontend
-    PullRev    = 2, // data should be Revision
+    PullRev    = 2,
     Conflict   = 3,
     NewDocUser = 4,
 }
@@ -37,7 +37,6 @@ pub struct WsDocumentData {
     #[pb(index = 2)]
     pub ty: WsDataType,
 
-    // Opti: parse the data with  type constraints
     #[pb(index = 3)]
     pub data: Vec<u8>,
 }
@@ -65,3 +64,37 @@ impl std::convert::From<NewDocUser> for WsDocumentData {
         }
     }
 }
+
+pub struct WsDocumentDataBuilder();
+impl WsDocumentDataBuilder {
+    // WsDataType::PushRev -> Revision
+    pub fn build_push_rev_message(doc_id: &str, revision: Revision) -> WsDocumentData {
+        let bytes: Bytes = revision.try_into().unwrap();
+        WsDocumentData {
+            doc_id: doc_id.to_string(),
+            ty: WsDataType::PushRev,
+            data: bytes.to_vec(),
+        }
+    }
+
+    // WsDataType::PullRev -> RevisionRange
+    pub fn build_push_pull_message(doc_id: &str, range: RevisionRange) -> WsDocumentData {
+        let bytes: Bytes = range.try_into().unwrap();
+        WsDocumentData {
+            doc_id: doc_id.to_string(),
+            ty: WsDataType::PullRev,
+            data: bytes.to_vec(),
+        }
+    }
+
+    // WsDataType::Acked -> RevId
+    pub fn build_acked_message(doc_id: &str, rev_id: i64) -> WsDocumentData {
+        let rev_id: RevId = rev_id.into();
+        let bytes: Bytes = rev_id.try_into().unwrap();
+        WsDocumentData {
+            doc_id: doc_id.to_string(),
+            ty: WsDataType::Acked,
+            data: bytes.to_vec(),
+        }
+    }
+}

+ 15 - 8
shared-lib/lib-ot/src/revision/cache.rs

@@ -2,7 +2,7 @@ use crate::{
     errors::OTError,
     revision::{Revision, RevisionRange},
 };
-use dashmap::{mapref::one::RefMut, DashMap};
+use dashmap::DashMap;
 use std::{collections::VecDeque, fmt::Debug, sync::Arc};
 use tokio::sync::{broadcast, RwLock};
 
@@ -48,12 +48,15 @@ impl RevisionMemoryCache {
 
     pub fn remove_revisions(&self, ids: Vec<i64>) { self.revs_map.retain(|k, _| !ids.contains(k)); }
 
-    pub fn mut_revision<F>(&self, rev_id: &i64, f: F)
-    where
-        F: Fn(RefMut<i64, RevisionRecord>),
-    {
-        if let Some(m_revision) = self.revs_map.get_mut(rev_id) {
-            f(m_revision)
+    pub async fn ack_revision(&self, rev_id: &i64) {
+        if let Some(mut m_revision) = self.revs_map.get_mut(rev_id) {
+            m_revision.value_mut().ack();
+            match self.pending_revs.write().await.pop_front() {
+                None => log::error!("The pending_revs should not be empty"),
+                Some(cache_rev_id) => {
+                    assert_eq!(&cache_rev_id, rev_id);
+                },
+            }
         } else {
             log::error!("Can't find revision with id {}", rev_id);
         }
@@ -90,6 +93,10 @@ impl RevisionMemoryCache {
         (ids, records)
     }
 
+    pub async fn query_revision(&self, rev_id: &i64) -> Option<RevisionRecord> {
+        self.revs_map.get(&rev_id).map(|r| r.value().clone())
+    }
+
     pub async fn front_revision(&self) -> Option<(i64, RevisionRecord)> {
         match self.pending_revs.read().await.front() {
             None => None,
@@ -103,7 +110,7 @@ impl RevisionMemoryCache {
 pub type RevIdReceiver = broadcast::Receiver<i64>;
 pub type RevIdSender = broadcast::Sender<i64>;
 
-#[derive(Clone, Eq, PartialEq)]
+#[derive(Debug, Clone, Eq, PartialEq)]
 pub enum RevState {
     Local = 0,
     Acked = 1,