|
@@ -9,21 +9,48 @@ use flowy_sync::{
|
|
|
use lib_infra::future::FutureResult;
|
|
|
use std::sync::Arc;
|
|
|
|
|
|
-pub type SyncObject = lib_ot::text_delta::TextOperations;
|
|
|
-
|
|
|
pub trait RevisionCloudService: Send + Sync {
|
|
|
+ /// Read the object's revision from remote
|
|
|
+ /// Returns a list of revisions that used to build the object
|
|
|
+ /// # Arguments
|
|
|
+ ///
|
|
|
+ /// * `user_id`: the id of the user
|
|
|
+ /// * `object_id`: the id of the object
|
|
|
+ ///
|
|
|
fn fetch_object(&self, user_id: &str, object_id: &str) -> FutureResult<Vec<Revision>, FlowyError>;
|
|
|
}
|
|
|
|
|
|
-pub trait RevisionObjectBuilder: Send + Sync {
|
|
|
+pub trait RevisionObjectDeserializer: Send + Sync {
|
|
|
type Output;
|
|
|
- fn build_object(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output>;
|
|
|
+ /// Deserialize the list of revisions into an concrete object type.
|
|
|
+ ///
|
|
|
+ /// # Arguments
|
|
|
+ ///
|
|
|
+ /// * `object_id`: the id of the object
|
|
|
+ /// * `revisions`: a list of revisions that represent the object
|
|
|
+ ///
|
|
|
+ fn deserialize_revisions(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output>;
|
|
|
+}
|
|
|
+
|
|
|
+pub trait RevisionObjectSerializer: Send + Sync {
|
|
|
+ /// Serialize the list of revisions to `Bytes`
|
|
|
+ ///
|
|
|
+ /// * `revisions`: a list of revisions will be serialized to `Bytes`
|
|
|
+ ///
|
|
|
+ fn serialize_revisions(revisions: Vec<Revision>) -> FlowyResult<Bytes>;
|
|
|
}
|
|
|
|
|
|
-pub trait RevisionCompactor: Send + Sync {
|
|
|
- fn compact(&self, user_id: &str, object_id: &str, mut revisions: Vec<Revision>) -> FlowyResult<Revision> {
|
|
|
+/// `RevisionCompress` is used to compress multiple revisions into one revision
|
|
|
+///
|
|
|
+pub trait RevisionCompress: Send + Sync {
|
|
|
+ fn compress_revisions(
|
|
|
+ &self,
|
|
|
+ user_id: &str,
|
|
|
+ object_id: &str,
|
|
|
+ mut revisions: Vec<Revision>,
|
|
|
+ ) -> FlowyResult<Revision> {
|
|
|
if revisions.is_empty() {
|
|
|
- return Err(FlowyError::internal().context("Can't compact the empty folder's revisions"));
|
|
|
+ return Err(FlowyError::internal().context("Can't compact the empty revisions"));
|
|
|
}
|
|
|
|
|
|
if revisions.len() == 1 {
|
|
@@ -35,11 +62,11 @@ pub trait RevisionCompactor: Send + Sync {
|
|
|
|
|
|
let (base_rev_id, rev_id) = first_revision.pair_rev_id();
|
|
|
let md5 = last_revision.md5.clone();
|
|
|
- let bytes = self.bytes_from_revisions(revisions)?;
|
|
|
+ let bytes = self.serialize_revisions(revisions)?;
|
|
|
Ok(Revision::new(object_id, base_rev_id, rev_id, bytes, user_id, md5))
|
|
|
}
|
|
|
|
|
|
- fn bytes_from_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes>;
|
|
|
+ fn serialize_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes>;
|
|
|
}
|
|
|
|
|
|
pub struct RevisionManager {
|
|
@@ -49,7 +76,7 @@ pub struct RevisionManager {
|
|
|
rev_persistence: Arc<RevisionPersistence>,
|
|
|
#[allow(dead_code)]
|
|
|
rev_snapshot: Arc<RevisionSnapshotManager>,
|
|
|
- rev_compactor: Arc<dyn RevisionCompactor>,
|
|
|
+ rev_compress: Arc<dyn RevisionCompress>,
|
|
|
#[cfg(feature = "flowy_unit_test")]
|
|
|
rev_ack_notifier: tokio::sync::broadcast::Sender<i64>,
|
|
|
}
|
|
@@ -64,13 +91,11 @@ impl RevisionManager {
|
|
|
) -> Self
|
|
|
where
|
|
|
SP: 'static + RevisionSnapshotDiskCache,
|
|
|
- C: 'static + RevisionCompactor,
|
|
|
+ C: 'static + RevisionCompress,
|
|
|
{
|
|
|
let rev_id_counter = RevIdCounter::new(0);
|
|
|
let rev_compactor = Arc::new(rev_compactor);
|
|
|
-
|
|
|
let rev_persistence = Arc::new(rev_persistence);
|
|
|
-
|
|
|
let rev_snapshot = Arc::new(RevisionSnapshotManager::new(user_id, object_id, snapshot_persistence));
|
|
|
#[cfg(feature = "flowy_unit_test")]
|
|
|
let (revision_ack_notifier, _) = tokio::sync::broadcast::channel(1);
|
|
@@ -81,7 +106,7 @@ impl RevisionManager {
|
|
|
rev_id_counter,
|
|
|
rev_persistence,
|
|
|
rev_snapshot,
|
|
|
- rev_compactor,
|
|
|
+ rev_compress: rev_compactor,
|
|
|
#[cfg(feature = "flowy_unit_test")]
|
|
|
rev_ack_notifier: revision_ack_notifier,
|
|
|
}
|
|
@@ -90,7 +115,7 @@ impl RevisionManager {
|
|
|
#[tracing::instrument(level = "debug", skip_all, fields(object_id) err)]
|
|
|
pub async fn load<B>(&mut self, cloud: Option<Arc<dyn RevisionCloudService>>) -> FlowyResult<B::Output>
|
|
|
where
|
|
|
- B: RevisionObjectBuilder,
|
|
|
+ B: RevisionObjectDeserializer,
|
|
|
{
|
|
|
let (revisions, rev_id) = RevisionLoader {
|
|
|
object_id: self.object_id.clone(),
|
|
@@ -102,7 +127,7 @@ impl RevisionManager {
|
|
|
.await?;
|
|
|
self.rev_id_counter.set(rev_id);
|
|
|
tracing::Span::current().record("object_id", &self.object_id.as_str());
|
|
|
- B::build_object(&self.object_id, revisions)
|
|
|
+ B::deserialize_revisions(&self.object_id, revisions)
|
|
|
}
|
|
|
|
|
|
#[tracing::instrument(level = "debug", skip(self, revisions), err)]
|
|
@@ -116,7 +141,7 @@ impl RevisionManager {
|
|
|
#[tracing::instrument(level = "debug", skip(self, revision), err)]
|
|
|
pub async fn add_remote_revision(&self, revision: &Revision) -> Result<(), FlowyError> {
|
|
|
if revision.bytes.is_empty() {
|
|
|
- return Err(FlowyError::internal().context("Delta data should be empty"));
|
|
|
+ return Err(FlowyError::internal().context("Remote revisions is empty"));
|
|
|
}
|
|
|
|
|
|
let _ = self.rev_persistence.add_ack_revision(revision).await?;
|
|
@@ -128,11 +153,11 @@ impl RevisionManager {
|
|
|
#[tracing::instrument(level = "debug", skip_all, err)]
|
|
|
pub async fn add_local_revision(&self, revision: &Revision) -> Result<(), FlowyError> {
|
|
|
if revision.bytes.is_empty() {
|
|
|
- return Err(FlowyError::internal().context("Delta data should be empty"));
|
|
|
+ return Err(FlowyError::internal().context("Local revisions is empty"));
|
|
|
}
|
|
|
let rev_id = self
|
|
|
.rev_persistence
|
|
|
- .add_sync_revision(revision, &self.rev_compactor)
|
|
|
+ .add_sync_revision(revision, &self.rev_compress)
|
|
|
.await?;
|
|
|
// self.rev_history.add_revision(revision).await;
|
|
|
self.rev_id_counter.set(rev_id);
|