conflict_resolve.rs 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. use crate::RevisionManager;
  2. use bytes::Bytes;
  3. use flowy_error::{FlowyError, FlowyResult};
  4. use flowy_sync::{
  5. entities::{
  6. revision::{RepeatedRevision, Revision, RevisionRange},
  7. ws_data::ServerRevisionWSDataType,
  8. },
  9. util::make_delta_from_revisions,
  10. };
  11. use lib_infra::future::BoxResultFuture;
  12. use lib_ot::core::{Attributes, Delta, PhantomAttributes};
  13. use lib_ot::rich_text::RichTextAttributes;
  14. use serde::de::DeserializeOwned;
  15. use std::{convert::TryFrom, sync::Arc};
  16. pub type DeltaMD5 = String;
  17. pub trait ConflictResolver<T>
  18. where
  19. T: Attributes + Send + Sync,
  20. {
  21. fn compose_delta(&self, delta: Delta<T>) -> BoxResultFuture<DeltaMD5, FlowyError>;
  22. fn transform_delta(&self, delta: Delta<T>) -> BoxResultFuture<TransformDeltas<T>, FlowyError>;
  23. fn reset_delta(&self, delta: Delta<T>) -> BoxResultFuture<DeltaMD5, FlowyError>;
  24. }
  25. pub trait ConflictRevisionSink: Send + Sync + 'static {
  26. fn send(&self, revisions: Vec<Revision>) -> BoxResultFuture<(), FlowyError>;
  27. fn ack(&self, rev_id: String, ty: ServerRevisionWSDataType) -> BoxResultFuture<(), FlowyError>;
  28. }
  29. pub type RichTextConflictController = ConflictController<RichTextAttributes>;
  30. pub type PlainTextConflictController = ConflictController<PhantomAttributes>;
  31. pub struct ConflictController<T>
  32. where
  33. T: Attributes + Send + Sync,
  34. {
  35. user_id: String,
  36. resolver: Arc<dyn ConflictResolver<T> + Send + Sync>,
  37. rev_sink: Arc<dyn ConflictRevisionSink>,
  38. rev_manager: Arc<RevisionManager>,
  39. }
  40. impl<T> ConflictController<T>
  41. where
  42. T: Attributes + Send + Sync + DeserializeOwned + serde::Serialize,
  43. {
  44. pub fn new(
  45. user_id: &str,
  46. resolver: Arc<dyn ConflictResolver<T> + Send + Sync>,
  47. rev_sink: Arc<dyn ConflictRevisionSink>,
  48. rev_manager: Arc<RevisionManager>,
  49. ) -> Self {
  50. let user_id = user_id.to_owned();
  51. Self {
  52. user_id,
  53. resolver,
  54. rev_sink,
  55. rev_manager,
  56. }
  57. }
  58. pub async fn receive_bytes(&self, bytes: Bytes) -> FlowyResult<()> {
  59. let repeated_revision = RepeatedRevision::try_from(bytes)?;
  60. if repeated_revision.is_empty() {
  61. return Ok(());
  62. }
  63. match self.handle_revision(repeated_revision).await? {
  64. None => {}
  65. Some(server_revision) => {
  66. self.rev_sink.send(vec![server_revision]).await?;
  67. }
  68. }
  69. Ok(())
  70. }
  71. pub async fn ack_revision(&self, rev_id: String, ty: ServerRevisionWSDataType) -> FlowyResult<()> {
  72. let _ = self.rev_sink.ack(rev_id, ty).await?;
  73. Ok(())
  74. }
  75. pub async fn send_revisions(&self, range: RevisionRange) -> FlowyResult<()> {
  76. let revisions = self.rev_manager.get_revisions_in_range(range).await?;
  77. let _ = self.rev_sink.send(revisions).await?;
  78. Ok(())
  79. }
  80. async fn handle_revision(&self, repeated_revision: RepeatedRevision) -> FlowyResult<Option<Revision>> {
  81. let mut revisions = repeated_revision.into_inner();
  82. let first_revision = revisions.first().unwrap();
  83. if let Some(local_revision) = self.rev_manager.get_revision(first_revision.rev_id).await {
  84. if local_revision.md5 == first_revision.md5 {
  85. // The local revision is equal to the pushed revision. Just ignore it.
  86. revisions = revisions.split_off(1);
  87. if revisions.is_empty() {
  88. return Ok(None);
  89. }
  90. } else {
  91. return Ok(None);
  92. }
  93. }
  94. let new_delta = make_delta_from_revisions(revisions.clone())?;
  95. let TransformDeltas {
  96. client_prime,
  97. server_prime,
  98. } = self.resolver.transform_delta(new_delta).await?;
  99. match server_prime {
  100. None => {
  101. // The server_prime is None means the client local revisions conflict with the
  102. // // server, and it needs to override the client delta.
  103. let md5 = self.resolver.reset_delta(client_prime).await?;
  104. let repeated_revision = RepeatedRevision::new(revisions);
  105. assert_eq!(repeated_revision.last().unwrap().md5, md5);
  106. let _ = self.rev_manager.reset_object(repeated_revision).await?;
  107. Ok(None)
  108. }
  109. Some(server_prime) => {
  110. let md5 = self.resolver.compose_delta(client_prime.clone()).await?;
  111. for revision in &revisions {
  112. let _ = self.rev_manager.add_remote_revision(revision).await?;
  113. }
  114. let (client_revision, server_revision) = make_client_and_server_revision(
  115. &self.user_id,
  116. &self.rev_manager,
  117. client_prime,
  118. Some(server_prime),
  119. md5,
  120. );
  121. let _ = self.rev_manager.add_remote_revision(&client_revision).await?;
  122. Ok(server_revision)
  123. }
  124. }
  125. }
  126. }
  127. fn make_client_and_server_revision<T>(
  128. user_id: &str,
  129. rev_manager: &Arc<RevisionManager>,
  130. client_delta: Delta<T>,
  131. server_delta: Option<Delta<T>>,
  132. md5: String,
  133. ) -> (Revision, Option<Revision>)
  134. where
  135. T: Attributes + serde::Serialize,
  136. {
  137. let (base_rev_id, rev_id) = rev_manager.next_rev_id_pair();
  138. let client_revision = Revision::new(
  139. &rev_manager.object_id,
  140. base_rev_id,
  141. rev_id,
  142. client_delta.json_bytes(),
  143. user_id,
  144. md5.clone(),
  145. );
  146. match server_delta {
  147. None => (client_revision, None),
  148. Some(server_delta) => {
  149. let server_revision = Revision::new(
  150. &rev_manager.object_id,
  151. base_rev_id,
  152. rev_id,
  153. server_delta.json_bytes(),
  154. user_id,
  155. md5,
  156. );
  157. (client_revision, Some(server_revision))
  158. }
  159. }
  160. }
  161. pub type RichTextTransformDeltas = TransformDeltas<RichTextAttributes>;
  162. pub struct TransformDeltas<T>
  163. where
  164. T: Attributes,
  165. {
  166. pub client_prime: Delta<T>,
  167. pub server_prime: Option<Delta<T>>,
  168. }