|
@@ -7,7 +7,11 @@ use crate::{
|
|
|
module::DocumentUser,
|
|
|
services::{
|
|
|
doc::{
|
|
|
- edit::{edit_actor::DocumentEditActor, message::EditMsg},
|
|
|
+ edit::{
|
|
|
+ edit_actor::DocumentEditActor,
|
|
|
+ message::{EditMsg, TransformDeltas},
|
|
|
+ model::NotifyOpenDocAction,
|
|
|
+ },
|
|
|
revision::{DocRevision, RevisionCmd, RevisionManager, RevisionServer, RevisionStoreActor},
|
|
|
UndoResult,
|
|
|
},
|
|
@@ -16,6 +20,7 @@ use crate::{
|
|
|
};
|
|
|
use bytes::Bytes;
|
|
|
use flowy_database::ConnectionPool;
|
|
|
+use flowy_infra::retry::{ExponentialBackoff, Retry};
|
|
|
use flowy_ot::core::{Attribute, Delta, Interval};
|
|
|
use flowy_ws::WsState;
|
|
|
use std::{convert::TryFrom, sync::Arc};
|
|
@@ -27,7 +32,9 @@ pub struct ClientEditDoc {
|
|
|
pub doc_id: DocId,
|
|
|
rev_manager: Arc<RevisionManager>,
|
|
|
document: UnboundedSender<EditMsg>,
|
|
|
+ ws: Arc<dyn DocumentWebSocket>,
|
|
|
pool: Arc<ConnectionPool>,
|
|
|
+ user: Arc<dyn DocumentUser>,
|
|
|
}
|
|
|
|
|
|
impl ClientEditDoc {
|
|
@@ -38,21 +45,23 @@ impl ClientEditDoc {
|
|
|
server: Arc<dyn RevisionServer>,
|
|
|
user: Arc<dyn DocumentUser>,
|
|
|
) -> DocResult<Self> {
|
|
|
- let user_id = user.user_id()?;
|
|
|
let rev_store = spawn_rev_store_actor(doc_id, pool.clone(), server.clone());
|
|
|
let DocRevision { rev_id, delta } = fetch_document(rev_store.clone()).await?;
|
|
|
-
|
|
|
log::info!("😁 Document delta: {:?}", delta);
|
|
|
|
|
|
- let rev_manager = Arc::new(RevisionManager::new(doc_id, &user_id, rev_id, ws, rev_store));
|
|
|
+ let rev_manager = Arc::new(RevisionManager::new(doc_id, rev_id, rev_store));
|
|
|
let document = spawn_doc_edit_actor(doc_id, delta, pool.clone());
|
|
|
let doc_id = doc_id.to_string();
|
|
|
- Ok(Self {
|
|
|
+ let edit_doc = Self {
|
|
|
doc_id,
|
|
|
rev_manager,
|
|
|
document,
|
|
|
pool,
|
|
|
- })
|
|
|
+ ws,
|
|
|
+ user,
|
|
|
+ };
|
|
|
+ edit_doc.notify_open_doc();
|
|
|
+ Ok(edit_doc)
|
|
|
}
|
|
|
|
|
|
pub async fn insert<T: ToString>(&self, index: usize, data: T) -> Result<(), DocError> {
|
|
@@ -146,7 +155,12 @@ impl ClientEditDoc {
|
|
|
let (base_rev_id, rev_id) = self.rev_manager.next_rev_id();
|
|
|
let delta_data = delta_data.to_vec();
|
|
|
let revision = Revision::new(base_rev_id, rev_id, delta_data, &self.doc_id, RevType::Local);
|
|
|
- let _ = self.rev_manager.add_revision(revision).await?;
|
|
|
+ let _ = self.rev_manager.add_revision(&revision).await?;
|
|
|
+ match self.ws.send(revision.into()) {
|
|
|
+ Ok(_) => {},
|
|
|
+ Err(e) => log::error!("Send delta failed: {:?}", e),
|
|
|
+ };
|
|
|
+
|
|
|
Ok(rev_id.into())
|
|
|
}
|
|
|
|
|
@@ -169,39 +183,117 @@ impl ClientEditDoc {
|
|
|
let _ = self.document.send(msg);
|
|
|
rx.await.map_err(internal_error)?
|
|
|
}
|
|
|
-}
|
|
|
|
|
|
-impl WsDocumentHandler for ClientEditDoc {
|
|
|
- fn receive(&self, doc_data: WsDocumentData) {
|
|
|
- let document = self.document.clone();
|
|
|
- let rev_manager = self.rev_manager.clone();
|
|
|
- let handle_ws_message = |doc_data: WsDocumentData| async move {
|
|
|
- let bytes = Bytes::from(doc_data.data);
|
|
|
- match doc_data.ty {
|
|
|
- WsDataType::PushRev => {
|
|
|
- let _ = handle_push_rev(bytes, rev_manager, document).await?;
|
|
|
- },
|
|
|
- WsDataType::PullRev => {
|
|
|
- let range = RevisionRange::try_from(bytes)?;
|
|
|
- let _ = rev_manager.send_revisions(range).await?;
|
|
|
- },
|
|
|
- WsDataType::NewDocUser => {},
|
|
|
- WsDataType::Acked => {
|
|
|
- let rev_id = RevId::try_from(bytes)?;
|
|
|
- let _ = rev_manager.ack_rev(rev_id);
|
|
|
- },
|
|
|
- WsDataType::Conflict => {},
|
|
|
- }
|
|
|
- Result::<(), DocError>::Ok(())
|
|
|
+ #[tracing::instrument(level = "debug", skip(self))]
|
|
|
+ fn notify_open_doc(&self) {
|
|
|
+ let rev_id: RevId = self.rev_manager.rev_id().into();
|
|
|
+
|
|
|
+ if let Ok(user_id) = self.user.user_id() {
|
|
|
+ let action = NotifyOpenDocAction::new(&user_id, &self.doc_id, &rev_id, &self.ws);
|
|
|
+ let strategy = ExponentialBackoff::from_millis(50).take(3);
|
|
|
+ let retry = Retry::spawn(strategy, action);
|
|
|
+ tokio::spawn(async move {
|
|
|
+ match retry.await {
|
|
|
+ Ok(_) => {},
|
|
|
+ Err(e) => log::error!("Notify open doc failed: {}", e),
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ #[tracing::instrument(level = "debug", skip(self))]
|
|
|
+ async fn handle_push_rev(&self, bytes: Bytes) -> DocResult<()> {
|
|
|
+ // Transform the revision
|
|
|
+ let (ret, rx) = oneshot::channel::<DocResult<TransformDeltas>>();
|
|
|
+ let _ = self.document.send(EditMsg::RemoteRevision { bytes, ret });
|
|
|
+ let TransformDeltas {
|
|
|
+ client_prime,
|
|
|
+ server_prime,
|
|
|
+ server_rev_id,
|
|
|
+ } = rx.await.map_err(internal_error)??;
|
|
|
+
|
|
|
+ if self.rev_manager.rev_id() >= server_rev_id.0 {
|
|
|
+ // Ignore this push revision if local_rev_id >= server_rev_id
|
|
|
+ return Ok(());
|
|
|
+ }
|
|
|
+
|
|
|
+ // compose delta
|
|
|
+ let (ret, rx) = oneshot::channel::<DocResult<()>>();
|
|
|
+ let msg = EditMsg::Delta {
|
|
|
+ delta: client_prime.clone(),
|
|
|
+ ret,
|
|
|
};
|
|
|
+ let _ = self.document.send(msg);
|
|
|
+ let _ = rx.await.map_err(internal_error)??;
|
|
|
+
|
|
|
+ // update rev id
|
|
|
+ self.rev_manager.update_rev_id(server_rev_id.clone().into());
|
|
|
+ let (_, local_rev_id) = self.rev_manager.next_rev_id();
|
|
|
+
|
|
|
+ // save the revision
|
|
|
+ let revision = Revision::new(
|
|
|
+ server_rev_id.0,
|
|
|
+ local_rev_id,
|
|
|
+ client_prime.to_bytes().to_vec(),
|
|
|
+ &self.doc_id,
|
|
|
+ RevType::Remote,
|
|
|
+ );
|
|
|
+ let _ = self.rev_manager.add_revision(&revision).await?;
|
|
|
+
|
|
|
+ // send the server_prime delta
|
|
|
+ let revision = Revision::new(
|
|
|
+ server_rev_id.0,
|
|
|
+ local_rev_id,
|
|
|
+ server_prime.to_bytes().to_vec(),
|
|
|
+ &self.doc_id,
|
|
|
+ RevType::Remote,
|
|
|
+ );
|
|
|
+ self.ws.send(revision.into());
|
|
|
|
|
|
+ save_document(self.document.clone(), local_rev_id.into()).await;
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
+
|
|
|
+ async fn handle_ws_message(&self, doc_data: WsDocumentData) -> DocResult<()> {
|
|
|
+ let bytes = Bytes::from(doc_data.data);
|
|
|
+ match doc_data.ty {
|
|
|
+ WsDataType::PushRev => {
|
|
|
+ let _ = self.handle_push_rev(bytes).await?;
|
|
|
+ },
|
|
|
+ WsDataType::PullRev => {
|
|
|
+ let range = RevisionRange::try_from(bytes)?;
|
|
|
+ let _ = self.rev_manager.send_revisions(range).await?;
|
|
|
+ },
|
|
|
+ WsDataType::NewDocUser => {},
|
|
|
+ WsDataType::Acked => {
|
|
|
+ let rev_id = RevId::try_from(bytes)?;
|
|
|
+ let _ = self.rev_manager.ack_rev(rev_id);
|
|
|
+ },
|
|
|
+ WsDataType::Conflict => {},
|
|
|
+ }
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+pub struct EditDocWsHandler(pub Arc<ClientEditDoc>);
|
|
|
+
|
|
|
+impl WsDocumentHandler for EditDocWsHandler {
|
|
|
+ fn receive(&self, doc_data: WsDocumentData) {
|
|
|
+ let edit_doc = self.0.clone();
|
|
|
tokio::spawn(async move {
|
|
|
- if let Err(e) = handle_ws_message(doc_data).await {
|
|
|
+ if let Err(e) = edit_doc.handle_ws_message(doc_data).await {
|
|
|
log::error!("{:?}", e);
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
- fn state_changed(&self, state: &WsState) { let _ = self.rev_manager.handle_ws_state_changed(state); }
|
|
|
+
|
|
|
+ fn state_changed(&self, state: &WsState) {
|
|
|
+ match state {
|
|
|
+ WsState::Init => {},
|
|
|
+ WsState::Connected(_) => self.0.notify_open_doc(),
|
|
|
+ WsState::Disconnected(_) => {},
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
async fn save_document(document: UnboundedSender<EditMsg>, rev_id: RevId) -> DocResult<()> {
|
|
@@ -211,24 +303,6 @@ async fn save_document(document: UnboundedSender<EditMsg>, rev_id: RevId) -> Doc
|
|
|
result
|
|
|
}
|
|
|
|
|
|
-async fn handle_push_rev(
|
|
|
- rev_bytes: Bytes,
|
|
|
- rev_manager: Arc<RevisionManager>,
|
|
|
- document: UnboundedSender<EditMsg>,
|
|
|
-) -> DocResult<()> {
|
|
|
- let revision = Revision::try_from(rev_bytes)?;
|
|
|
- let _ = rev_manager.add_revision(revision.clone()).await?;
|
|
|
-
|
|
|
- let delta = Delta::from_bytes(&revision.delta_data)?;
|
|
|
- let (ret, rx) = oneshot::channel::<DocResult<()>>();
|
|
|
- let msg = EditMsg::Delta { delta, ret };
|
|
|
- let _ = document.send(msg);
|
|
|
- let _ = rx.await.map_err(internal_error)??;
|
|
|
-
|
|
|
- save_document(document, revision.rev_id.into()).await;
|
|
|
- Ok(())
|
|
|
-}
|
|
|
-
|
|
|
fn spawn_rev_store_actor(
|
|
|
doc_id: &str,
|
|
|
pool: Arc<ConnectionPool>,
|