conflict_resolve.rs 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. use crate::RevisionManager;
  2. use bytes::Bytes;
  3. use flowy_collaboration::{
  4. entities::{
  5. revision::{RepeatedRevision, Revision, RevisionRange},
  6. ws_data::{ClientRevisionWSData, ServerRevisionWSDataType},
  7. },
  8. util::make_delta_from_revisions,
  9. };
  10. use flowy_error::{FlowyError, FlowyResult};
  11. use lib_infra::future::BoxResultFuture;
  12. use lib_ot::core::{Attributes, Delta};
  13. use serde::de::DeserializeOwned;
  14. use std::{convert::TryFrom, sync::Arc};
  15. use tokio::sync::oneshot;
  16. pub type DeltaMD5 = String;
  17. pub trait ResolverTarget<T>
  18. where
  19. T: Attributes + Send + Sync,
  20. {
  21. fn compose_delta(&self, delta: Delta<T>) -> BoxResultFuture<DeltaMD5, FlowyError>;
  22. fn transform_delta(&self, delta: Delta<T>) -> BoxResultFuture<TransformDeltas<T>, FlowyError>;
  23. fn reset_delta(&self, delta: Delta<T>) -> BoxResultFuture<DeltaMD5, FlowyError>;
  24. }
  25. pub trait ResolverRevisionSink: Send + Sync {
  26. fn send(&self, revisions: Vec<Revision>) -> BoxResultFuture<(), FlowyError>;
  27. fn ack(&self, rev_id: String, ty: ServerRevisionWSDataType) -> BoxResultFuture<(), FlowyError>;
  28. }
  29. pub struct RevisionConflictResolver<T>
  30. where
  31. T: Attributes + Send + Sync,
  32. {
  33. user_id: String,
  34. target: Arc<dyn ResolverTarget<T>>,
  35. rev_sink: Arc<dyn ResolverRevisionSink>,
  36. rev_manager: Arc<RevisionManager>,
  37. }
  38. impl<T> RevisionConflictResolver<T>
  39. where
  40. T: Attributes + Send + Sync,
  41. // DeserializeOwned + serde::Serialize,
  42. {
  43. pub fn new(
  44. user_id: &str,
  45. target: Arc<dyn ResolverTarget<T>>,
  46. rev_sink: Arc<dyn ResolverRevisionSink>,
  47. rev_manager: Arc<RevisionManager>,
  48. ) -> Self {
  49. let user_id = user_id.to_owned();
  50. Self {
  51. user_id,
  52. target,
  53. rev_sink,
  54. rev_manager,
  55. }
  56. }
  57. pub async fn receive_bytes(&self, bytes: Bytes) -> FlowyResult<()> {
  58. let repeated_revision = RepeatedRevision::try_from(bytes)?;
  59. if repeated_revision.is_empty() {
  60. return Ok(());
  61. }
  62. // match self.handle_revision(repeated_revision).await? {
  63. // None => {},
  64. // Some(server_revision) => {
  65. // self.rev_sink.send(vec![server_revision]);
  66. // },
  67. // }
  68. Ok(())
  69. }
  70. pub async fn ack_revision(&self, rev_id: String, ty: ServerRevisionWSDataType) -> FlowyResult<()> {
  71. self.rev_sink.ack(rev_id, ty).await
  72. }
  73. pub async fn send_revisions(&self, range: RevisionRange) -> FlowyResult<()> {
  74. let revisions = self.rev_manager.get_revisions_in_range(range).await?;
  75. self.rev_sink.send(revisions).await;
  76. Ok(())
  77. }
  78. // async fn handle_revision(&self, repeated_revision: RepeatedRevision) ->
  79. // FlowyResult<Option<Revision>> { let mut revisions =
  80. // repeated_revision.into_inner(); let first_revision =
  81. // revisions.first().unwrap(); if let Some(local_revision) =
  82. // self.rev_manager.get_revision(first_revision.rev_id).await { if
  83. // local_revision.md5 == first_revision.md5 { // The local
  84. // revision is equal to the pushed revision. Just ignore it.
  85. // revisions = revisions.split_off(1); if revisions.is_empty() {
  86. // return Ok(None);
  87. // }
  88. // } else {
  89. // return Ok(None);
  90. // }
  91. // }
  92. //
  93. // let new_delta = make_delta_from_revisions(revisions.clone())?;
  94. //
  95. // let TransformDeltas {
  96. // client_prime,
  97. // server_prime,
  98. // } = self.target.transform_delta(new_delta).await?;
  99. //
  100. // match server_prime {
  101. // None => {
  102. // // The server_prime is None means the client local revisions
  103. // conflict with the // server, and it needs to override the
  104. // client delta. let md5 =
  105. // self.target.reset_delta(client_prime).await?; let
  106. // repeated_revision = RepeatedRevision::new(revisions);
  107. // assert_eq!(repeated_revision.last().unwrap().md5, md5); let _
  108. // = self.rev_manager.reset_object(repeated_revision).await?;
  109. // Ok(None) },
  110. // Some(server_prime) => {
  111. // let md5 = self.target.compose_delta(client_prime.clone()).await?;
  112. // for revision in &revisions {
  113. // let _ =
  114. // self.rev_manager.add_remote_revision(revision).await?; }
  115. // let (client_revision, server_revision) =
  116. // make_client_and_server_revision( &self.user_id,
  117. // &self.rev_manager,
  118. // client_prime,
  119. // Some(server_prime),
  120. // md5,
  121. // );
  122. // let _ =
  123. // self.rev_manager.add_remote_revision(&client_revision).await?;
  124. // Ok(server_revision)
  125. // },
  126. // }
  127. // }
  128. }
  129. fn make_client_and_server_revision<T>(
  130. user_id: &str,
  131. rev_manager: &Arc<RevisionManager>,
  132. client_delta: Delta<T>,
  133. server_delta: Option<Delta<T>>,
  134. md5: String,
  135. ) -> (Revision, Option<Revision>)
  136. where
  137. T: Attributes + serde::Serialize,
  138. {
  139. let (base_rev_id, rev_id) = rev_manager.next_rev_id_pair();
  140. let client_revision = Revision::new(
  141. &rev_manager.object_id,
  142. base_rev_id,
  143. rev_id,
  144. client_delta.to_bytes(),
  145. &user_id,
  146. md5.clone(),
  147. );
  148. match server_delta {
  149. None => (client_revision, None),
  150. Some(server_delta) => {
  151. let server_revision = Revision::new(
  152. &rev_manager.object_id,
  153. base_rev_id,
  154. rev_id,
  155. server_delta.to_bytes(),
  156. &user_id,
  157. md5,
  158. );
  159. (client_revision, Some(server_revision))
  160. },
  161. }
  162. }
  163. pub struct TransformDeltas<T>
  164. where
  165. T: Attributes,
  166. {
  167. pub client_prime: Delta<T>,
  168. pub server_prime: Option<Delta<T>>,
  169. }