|
@@ -1,28 +1,39 @@
|
|
use crate::RevisionManager;
|
|
use crate::RevisionManager;
|
|
use bytes::Bytes;
|
|
use bytes::Bytes;
|
|
use flowy_error::{FlowyError, FlowyResult};
|
|
use flowy_error::{FlowyError, FlowyResult};
|
|
-use flowy_sync::{
|
|
|
|
- entities::{
|
|
|
|
- revision::{RepeatedRevision, Revision, RevisionRange},
|
|
|
|
- ws_data::ServerRevisionWSDataType,
|
|
|
|
- },
|
|
|
|
- util::make_operations_from_revisions,
|
|
|
|
|
|
+use flowy_sync::entities::{
|
|
|
|
+ revision::{RepeatedRevision, Revision, RevisionRange},
|
|
|
|
+ ws_data::ServerRevisionWSDataType,
|
|
};
|
|
};
|
|
use lib_infra::future::BoxResultFuture;
|
|
use lib_infra::future::BoxResultFuture;
|
|
-use lib_ot::core::{AttributeHashMap, DeltaOperations, EmptyAttributes, OperationAttributes};
|
|
|
|
|
|
|
|
-use serde::de::DeserializeOwned;
|
|
|
|
use std::{convert::TryFrom, sync::Arc};
|
|
use std::{convert::TryFrom, sync::Arc};
|
|
-
|
|
|
|
pub type OperationsMD5 = String;
|
|
pub type OperationsMD5 = String;
|
|
|
|
|
|
-pub trait ConflictResolver<T>
|
|
|
|
|
|
+pub struct TransformOperations<Operations> {
|
|
|
|
+ pub client_operations: Operations,
|
|
|
|
+ pub server_operations: Option<Operations>,
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+pub trait OperationsDeserializer<T>: Send + Sync {
|
|
|
|
+ fn deserialize_revisions(revisions: Vec<Revision>) -> FlowyResult<T>;
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+pub trait OperationsSerializer: Send + Sync {
|
|
|
|
+ fn serialize_operations(&self) -> Bytes;
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+pub struct ConflictOperations<T>(T);
|
|
|
|
+pub trait ConflictResolver<Operations>
|
|
where
|
|
where
|
|
- T: OperationAttributes + Send + Sync,
|
|
|
|
|
|
+ Operations: Send + Sync,
|
|
{
|
|
{
|
|
- fn compose_operations(&self, delta: DeltaOperations<T>) -> BoxResultFuture<OperationsMD5, FlowyError>;
|
|
|
|
- fn transform_operations(&self, delta: DeltaOperations<T>) -> BoxResultFuture<TransformOperations<T>, FlowyError>;
|
|
|
|
- fn reset_operations(&self, delta: DeltaOperations<T>) -> BoxResultFuture<OperationsMD5, FlowyError>;
|
|
|
|
|
|
+ fn compose_operations(&self, operations: Operations) -> BoxResultFuture<OperationsMD5, FlowyError>;
|
|
|
|
+ fn transform_operations(
|
|
|
|
+ &self,
|
|
|
|
+ operations: Operations,
|
|
|
|
+ ) -> BoxResultFuture<TransformOperations<Operations>, FlowyError>;
|
|
|
|
+ fn reset_operations(&self, operations: Operations) -> BoxResultFuture<OperationsMD5, FlowyError>;
|
|
}
|
|
}
|
|
|
|
|
|
pub trait ConflictRevisionSink: Send + Sync + 'static {
|
|
pub trait ConflictRevisionSink: Send + Sync + 'static {
|
|
@@ -30,26 +41,23 @@ pub trait ConflictRevisionSink: Send + Sync + 'static {
|
|
fn ack(&self, rev_id: String, ty: ServerRevisionWSDataType) -> BoxResultFuture<(), FlowyError>;
|
|
fn ack(&self, rev_id: String, ty: ServerRevisionWSDataType) -> BoxResultFuture<(), FlowyError>;
|
|
}
|
|
}
|
|
|
|
|
|
-pub type RichTextConflictController = ConflictController<AttributeHashMap>;
|
|
|
|
-pub type PlainTextConflictController = ConflictController<EmptyAttributes>;
|
|
|
|
-
|
|
|
|
-pub struct ConflictController<T>
|
|
|
|
|
|
+pub struct ConflictController<Operations>
|
|
where
|
|
where
|
|
- T: OperationAttributes + Send + Sync,
|
|
|
|
|
|
+ Operations: Send + Sync,
|
|
{
|
|
{
|
|
user_id: String,
|
|
user_id: String,
|
|
- resolver: Arc<dyn ConflictResolver<T> + Send + Sync>,
|
|
|
|
|
|
+ resolver: Arc<dyn ConflictResolver<Operations> + Send + Sync>,
|
|
rev_sink: Arc<dyn ConflictRevisionSink>,
|
|
rev_sink: Arc<dyn ConflictRevisionSink>,
|
|
rev_manager: Arc<RevisionManager>,
|
|
rev_manager: Arc<RevisionManager>,
|
|
}
|
|
}
|
|
|
|
|
|
-impl<T> ConflictController<T>
|
|
|
|
|
|
+impl<Operations> ConflictController<Operations>
|
|
where
|
|
where
|
|
- T: OperationAttributes + Send + Sync + DeserializeOwned + serde::Serialize,
|
|
|
|
|
|
+ Operations: Clone + Send + Sync,
|
|
{
|
|
{
|
|
pub fn new(
|
|
pub fn new(
|
|
user_id: &str,
|
|
user_id: &str,
|
|
- resolver: Arc<dyn ConflictResolver<T> + Send + Sync>,
|
|
|
|
|
|
+ resolver: Arc<dyn ConflictResolver<Operations> + Send + Sync>,
|
|
rev_sink: Arc<dyn ConflictRevisionSink>,
|
|
rev_sink: Arc<dyn ConflictRevisionSink>,
|
|
rev_manager: Arc<RevisionManager>,
|
|
rev_manager: Arc<RevisionManager>,
|
|
) -> Self {
|
|
) -> Self {
|
|
@@ -61,7 +69,12 @@ where
|
|
rev_manager,
|
|
rev_manager,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+}
|
|
|
|
|
|
|
|
+impl<Operations> ConflictController<Operations>
|
|
|
|
+where
|
|
|
|
+ Operations: OperationsSerializer + OperationsDeserializer<Operations> + Clone + Send + Sync,
|
|
|
|
+{
|
|
pub async fn receive_bytes(&self, bytes: Bytes) -> FlowyResult<()> {
|
|
pub async fn receive_bytes(&self, bytes: Bytes) -> FlowyResult<()> {
|
|
let repeated_revision = RepeatedRevision::try_from(bytes)?;
|
|
let repeated_revision = RepeatedRevision::try_from(bytes)?;
|
|
if repeated_revision.is_empty() {
|
|
if repeated_revision.is_empty() {
|
|
@@ -103,33 +116,32 @@ where
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- let new_delta = make_operations_from_revisions(revisions.clone())?;
|
|
|
|
-
|
|
|
|
|
|
+ let new_operations = Operations::deserialize_revisions(revisions.clone())?;
|
|
let TransformOperations {
|
|
let TransformOperations {
|
|
- client_prime,
|
|
|
|
- server_prime,
|
|
|
|
- } = self.resolver.transform_operations(new_delta).await?;
|
|
|
|
|
|
+ client_operations,
|
|
|
|
+ server_operations,
|
|
|
|
+ } = self.resolver.transform_operations(new_operations).await?;
|
|
|
|
|
|
- match server_prime {
|
|
|
|
|
|
+ match server_operations {
|
|
None => {
|
|
None => {
|
|
// The server_prime is None means the client local revisions conflict with the
|
|
// The server_prime is None means the client local revisions conflict with the
|
|
// // server, and it needs to override the client delta.
|
|
// // server, and it needs to override the client delta.
|
|
- let md5 = self.resolver.reset_operations(client_prime).await?;
|
|
|
|
|
|
+ let md5 = self.resolver.reset_operations(client_operations).await?;
|
|
let repeated_revision = RepeatedRevision::new(revisions);
|
|
let repeated_revision = RepeatedRevision::new(revisions);
|
|
assert_eq!(repeated_revision.last().unwrap().md5, md5);
|
|
assert_eq!(repeated_revision.last().unwrap().md5, md5);
|
|
let _ = self.rev_manager.reset_object(repeated_revision).await?;
|
|
let _ = self.rev_manager.reset_object(repeated_revision).await?;
|
|
Ok(None)
|
|
Ok(None)
|
|
}
|
|
}
|
|
- Some(server_prime) => {
|
|
|
|
- let md5 = self.resolver.compose_operations(client_prime.clone()).await?;
|
|
|
|
|
|
+ Some(server_operations) => {
|
|
|
|
+ let md5 = self.resolver.compose_operations(client_operations.clone()).await?;
|
|
for revision in &revisions {
|
|
for revision in &revisions {
|
|
let _ = self.rev_manager.add_remote_revision(revision).await?;
|
|
let _ = self.rev_manager.add_remote_revision(revision).await?;
|
|
}
|
|
}
|
|
let (client_revision, server_revision) = make_client_and_server_revision(
|
|
let (client_revision, server_revision) = make_client_and_server_revision(
|
|
&self.user_id,
|
|
&self.user_id,
|
|
&self.rev_manager,
|
|
&self.rev_manager,
|
|
- client_prime,
|
|
|
|
- Some(server_prime),
|
|
|
|
|
|
+ client_operations,
|
|
|
|
+ Some(server_operations),
|
|
md5,
|
|
md5,
|
|
);
|
|
);
|
|
let _ = self.rev_manager.add_remote_revision(&client_revision).await?;
|
|
let _ = self.rev_manager.add_remote_revision(&client_revision).await?;
|
|
@@ -139,48 +151,26 @@ where
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-fn make_client_and_server_revision<T>(
|
|
|
|
|
|
+fn make_client_and_server_revision<Operations>(
|
|
user_id: &str,
|
|
user_id: &str,
|
|
rev_manager: &Arc<RevisionManager>,
|
|
rev_manager: &Arc<RevisionManager>,
|
|
- client_delta: DeltaOperations<T>,
|
|
|
|
- server_delta: Option<DeltaOperations<T>>,
|
|
|
|
|
|
+ client_operations: Operations,
|
|
|
|
+ server_operations: Option<Operations>,
|
|
md5: String,
|
|
md5: String,
|
|
) -> (Revision, Option<Revision>)
|
|
) -> (Revision, Option<Revision>)
|
|
where
|
|
where
|
|
- T: OperationAttributes + serde::Serialize,
|
|
|
|
|
|
+ Operations: OperationsSerializer,
|
|
{
|
|
{
|
|
let (base_rev_id, rev_id) = rev_manager.next_rev_id_pair();
|
|
let (base_rev_id, rev_id) = rev_manager.next_rev_id_pair();
|
|
- let client_revision = Revision::new(
|
|
|
|
- &rev_manager.object_id,
|
|
|
|
- base_rev_id,
|
|
|
|
- rev_id,
|
|
|
|
- client_delta.json_bytes(),
|
|
|
|
- user_id,
|
|
|
|
- md5.clone(),
|
|
|
|
- );
|
|
|
|
-
|
|
|
|
- match server_delta {
|
|
|
|
|
|
+ let bytes = client_operations.serialize_operations();
|
|
|
|
+ let client_revision = Revision::new(&rev_manager.object_id, base_rev_id, rev_id, bytes, user_id, md5.clone());
|
|
|
|
+
|
|
|
|
+ match server_operations {
|
|
None => (client_revision, None),
|
|
None => (client_revision, None),
|
|
- Some(server_delta) => {
|
|
|
|
- let server_revision = Revision::new(
|
|
|
|
- &rev_manager.object_id,
|
|
|
|
- base_rev_id,
|
|
|
|
- rev_id,
|
|
|
|
- server_delta.json_bytes(),
|
|
|
|
- user_id,
|
|
|
|
- md5,
|
|
|
|
- );
|
|
|
|
|
|
+ Some(operations) => {
|
|
|
|
+ let bytes = operations.serialize_operations();
|
|
|
|
+ let server_revision = Revision::new(&rev_manager.object_id, base_rev_id, rev_id, bytes, user_id, md5);
|
|
(client_revision, Some(server_revision))
|
|
(client_revision, Some(server_revision))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
-pub type TextTransformOperations = TransformOperations<AttributeHashMap>;
|
|
|
|
-
|
|
|
|
-pub struct TransformOperations<T>
|
|
|
|
-where
|
|
|
|
- T: OperationAttributes,
|
|
|
|
-{
|
|
|
|
- pub client_prime: DeltaOperations<T>,
|
|
|
|
- pub server_prime: Option<DeltaOperations<T>>,
|
|
|
|
-}
|
|
|