|
@@ -72,7 +72,7 @@ impl ClientEditDoc {
|
|
};
|
|
};
|
|
let _ = self.document.send(msg);
|
|
let _ = self.document.send(msg);
|
|
let delta = rx.await.map_err(internal_error)??;
|
|
let delta = rx.await.map_err(internal_error)??;
|
|
- let rev_id = self.save_revision(delta).await?;
|
|
|
|
|
|
+ let rev_id = self.save_local_delta(delta).await?;
|
|
save_document(self.document.clone(), rev_id.into()).await
|
|
save_document(self.document.clone(), rev_id.into()).await
|
|
}
|
|
}
|
|
|
|
|
|
@@ -81,7 +81,7 @@ impl ClientEditDoc {
|
|
let msg = DocumentMsg::Delete { interval, ret };
|
|
let msg = DocumentMsg::Delete { interval, ret };
|
|
let _ = self.document.send(msg);
|
|
let _ = self.document.send(msg);
|
|
let delta = rx.await.map_err(internal_error)??;
|
|
let delta = rx.await.map_err(internal_error)??;
|
|
- let _ = self.save_revision(delta).await?;
|
|
|
|
|
|
+ let _ = self.save_local_delta(delta).await?;
|
|
Ok(())
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
|
|
@@ -94,7 +94,7 @@ impl ClientEditDoc {
|
|
};
|
|
};
|
|
let _ = self.document.send(msg);
|
|
let _ = self.document.send(msg);
|
|
let delta = rx.await.map_err(internal_error)??;
|
|
let delta = rx.await.map_err(internal_error)??;
|
|
- let _ = self.save_revision(delta).await?;
|
|
|
|
|
|
+ let _ = self.save_local_delta(delta).await?;
|
|
Ok(())
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
|
|
@@ -107,7 +107,7 @@ impl ClientEditDoc {
|
|
};
|
|
};
|
|
let _ = self.document.send(msg);
|
|
let _ = self.document.send(msg);
|
|
let delta = rx.await.map_err(internal_error)??;
|
|
let delta = rx.await.map_err(internal_error)??;
|
|
- let _ = self.save_revision(delta).await?;
|
|
|
|
|
|
+ let _ = self.save_local_delta(delta).await?;
|
|
Ok(())
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
|
|
@@ -151,13 +151,9 @@ impl ClientEditDoc {
|
|
})
|
|
})
|
|
}
|
|
}
|
|
|
|
|
|
- #[tracing::instrument(level = "debug", skip(self, delta), fields(revision_delta = %delta.to_json(), send_state, base_rev_id, rev_id))]
|
|
|
|
- async fn save_revision(&self, delta: Delta) -> Result<RevId, DocError> {
|
|
|
|
|
|
+ async fn save_local_delta(&self, delta: Delta) -> Result<RevId, DocError> {
|
|
let delta_data = delta.to_bytes();
|
|
let delta_data = delta.to_bytes();
|
|
let (base_rev_id, rev_id) = self.rev_manager.next_rev_id();
|
|
let (base_rev_id, rev_id) = self.rev_manager.next_rev_id();
|
|
- tracing::Span::current().record("base_rev_id", &base_rev_id);
|
|
|
|
- tracing::Span::current().record("rev_id", &rev_id);
|
|
|
|
-
|
|
|
|
let delta_data = delta_data.to_vec();
|
|
let delta_data = delta_data.to_vec();
|
|
let revision = Revision::new(base_rev_id, rev_id, delta_data, &self.doc_id, RevType::Local);
|
|
let revision = Revision::new(base_rev_id, rev_id, delta_data, &self.doc_id, RevType::Local);
|
|
let _ = self.rev_manager.add_revision(&revision).await?;
|
|
let _ = self.rev_manager.add_revision(&revision).await?;
|
|
@@ -165,7 +161,7 @@ impl ClientEditDoc {
|
|
}
|
|
}
|
|
|
|
|
|
#[tracing::instrument(level = "debug", skip(self, data), err)]
|
|
#[tracing::instrument(level = "debug", skip(self, data), err)]
|
|
- pub(crate) async fn compose_local_delta(&self, data: Bytes) -> Result<(), DocError> {
|
|
|
|
|
|
+ pub(crate) async fn composing_local_delta(&self, data: Bytes) -> Result<(), DocError> {
|
|
let delta = Delta::from_bytes(&data)?;
|
|
let delta = Delta::from_bytes(&data)?;
|
|
let (ret, rx) = oneshot::channel::<DocResult<()>>();
|
|
let (ret, rx) = oneshot::channel::<DocResult<()>>();
|
|
let msg = DocumentMsg::Delta {
|
|
let msg = DocumentMsg::Delta {
|
|
@@ -175,7 +171,7 @@ impl ClientEditDoc {
|
|
let _ = self.document.send(msg);
|
|
let _ = self.document.send(msg);
|
|
let _ = rx.await.map_err(internal_error)??;
|
|
let _ = rx.await.map_err(internal_error)??;
|
|
|
|
|
|
- let rev_id = self.save_revision(delta).await?;
|
|
|
|
|
|
+ let rev_id = self.save_local_delta(delta).await?;
|
|
save_document(self.document.clone(), rev_id).await
|
|
save_document(self.document.clone(), rev_id).await
|
|
}
|
|
}
|
|
|
|
|
|
@@ -271,7 +267,7 @@ impl ClientEditDoc {
|
|
WsDataType::NewDocUser => {},
|
|
WsDataType::NewDocUser => {},
|
|
WsDataType::Acked => {
|
|
WsDataType::Acked => {
|
|
let rev_id = RevId::try_from(bytes)?;
|
|
let rev_id = RevId::try_from(bytes)?;
|
|
- let _ = self.rev_manager.ack_rev(rev_id).await?;
|
|
|
|
|
|
+ let _ = self.rev_manager.ack_revision(rev_id).await?;
|
|
},
|
|
},
|
|
WsDataType::Conflict => {},
|
|
WsDataType::Conflict => {},
|
|
}
|
|
}
|
|
@@ -304,7 +300,7 @@ fn spawn_rev_receiver(mut receiver: mpsc::UnboundedReceiver<Revision>, ws: Arc<d
|
|
tokio::spawn(async move {
|
|
tokio::spawn(async move {
|
|
loop {
|
|
loop {
|
|
while let Some(revision) = receiver.recv().await {
|
|
while let Some(revision) = receiver.recv().await {
|
|
- tracing::debug!("Send revision:{} to server", revision.rev_id);
|
|
|
|
|
|
+ // tracing::debug!("Send revision:{} to server", revision.rev_id);
|
|
match ws.send(revision.into()) {
|
|
match ws.send(revision.into()) {
|
|
Ok(_) => {},
|
|
Ok(_) => {},
|
|
Err(e) => log::error!("Send revision failed: {:?}", e),
|
|
Err(e) => log::error!("Send revision failed: {:?}", e),
|