conflict_resolve.rs 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. use crate::RevisionManager;
  2. use bytes::Bytes;
  3. use flowy_collaboration::{
  4. entities::{
  5. revision::{RepeatedRevision, Revision, RevisionRange},
  6. ws_data::ServerRevisionWSDataType,
  7. },
  8. util::make_delta_from_revisions,
  9. };
  10. use flowy_error::{FlowyError, FlowyResult};
  11. use lib_infra::future::BoxResultFuture;
  12. use lib_ot::core::{Attributes, Delta};
  13. use serde::de::DeserializeOwned;
  14. use std::{convert::TryFrom, sync::Arc};
  15. pub type DeltaMD5 = String;
  16. pub trait ResolverTarget<T>
  17. where
  18. T: Attributes + Send + Sync,
  19. {
  20. fn compose_delta(&self, delta: Delta<T>) -> BoxResultFuture<DeltaMD5, FlowyError>;
  21. fn transform_delta(&self, delta: Delta<T>) -> BoxResultFuture<TransformDeltas<T>, FlowyError>;
  22. fn reset_delta(&self, delta: Delta<T>) -> BoxResultFuture<DeltaMD5, FlowyError>;
  23. }
  24. pub trait ResolverRevisionSink: Send + Sync + 'static {
  25. fn send(&self, revisions: Vec<Revision>) -> BoxResultFuture<(), FlowyError>;
  26. fn ack(&self, rev_id: String, ty: ServerRevisionWSDataType) -> BoxResultFuture<(), FlowyError>;
  27. }
  28. pub struct RevisionConflictResolver<T>
  29. where
  30. T: Attributes + Send + Sync,
  31. {
  32. user_id: String,
  33. target: Arc<dyn ResolverTarget<T> + Send + Sync>,
  34. rev_sink: Arc<dyn ResolverRevisionSink>,
  35. rev_manager: Arc<RevisionManager>,
  36. }
  37. impl<T> RevisionConflictResolver<T>
  38. where
  39. T: Attributes + Send + Sync + DeserializeOwned + serde::Serialize,
  40. {
  41. pub fn new(
  42. user_id: &str,
  43. target: Arc<dyn ResolverTarget<T> + Send + Sync>,
  44. rev_sink: Arc<dyn ResolverRevisionSink>,
  45. rev_manager: Arc<RevisionManager>,
  46. ) -> Self {
  47. let user_id = user_id.to_owned();
  48. Self {
  49. user_id,
  50. target,
  51. rev_sink,
  52. rev_manager,
  53. }
  54. }
  55. pub async fn receive_bytes(&self, bytes: Bytes) -> FlowyResult<()> {
  56. let repeated_revision = RepeatedRevision::try_from(bytes)?;
  57. if repeated_revision.is_empty() {
  58. return Ok(());
  59. }
  60. match self.handle_revision(repeated_revision).await? {
  61. None => {}
  62. Some(server_revision) => {
  63. self.rev_sink.send(vec![server_revision]).await?;
  64. }
  65. }
  66. Ok(())
  67. }
  68. pub async fn ack_revision(&self, rev_id: String, ty: ServerRevisionWSDataType) -> FlowyResult<()> {
  69. let _ = self.rev_sink.ack(rev_id, ty).await?;
  70. Ok(())
  71. }
  72. pub async fn send_revisions(&self, range: RevisionRange) -> FlowyResult<()> {
  73. let revisions = self.rev_manager.get_revisions_in_range(range).await?;
  74. let _ = self.rev_sink.send(revisions).await?;
  75. Ok(())
  76. }
  77. async fn handle_revision(&self, repeated_revision: RepeatedRevision) -> FlowyResult<Option<Revision>> {
  78. let mut revisions = repeated_revision.into_inner();
  79. let first_revision = revisions.first().unwrap();
  80. if let Some(local_revision) = self.rev_manager.get_revision(first_revision.rev_id).await {
  81. if local_revision.md5 == first_revision.md5 {
  82. // The local revision is equal to the pushed revision. Just ignore it.
  83. revisions = revisions.split_off(1);
  84. if revisions.is_empty() {
  85. return Ok(None);
  86. }
  87. } else {
  88. return Ok(None);
  89. }
  90. }
  91. let new_delta = make_delta_from_revisions(revisions.clone())?;
  92. let TransformDeltas {
  93. client_prime,
  94. server_prime,
  95. } = self.target.transform_delta(new_delta).await?;
  96. match server_prime {
  97. None => {
  98. // The server_prime is None means the client local revisions conflict with the
  99. // // server, and it needs to override the client delta.
  100. let md5 = self.target.reset_delta(client_prime).await?;
  101. let repeated_revision = RepeatedRevision::new(revisions);
  102. assert_eq!(repeated_revision.last().unwrap().md5, md5);
  103. let _ = self.rev_manager.reset_object(repeated_revision).await?;
  104. Ok(None)
  105. }
  106. Some(server_prime) => {
  107. let md5 = self.target.compose_delta(client_prime.clone()).await?;
  108. for revision in &revisions {
  109. let _ = self.rev_manager.add_remote_revision(revision).await?;
  110. }
  111. let (client_revision, server_revision) = make_client_and_server_revision(
  112. &self.user_id,
  113. &self.rev_manager,
  114. client_prime,
  115. Some(server_prime),
  116. md5,
  117. );
  118. let _ = self.rev_manager.add_remote_revision(&client_revision).await?;
  119. Ok(server_revision)
  120. }
  121. }
  122. }
  123. }
  124. fn make_client_and_server_revision<T>(
  125. user_id: &str,
  126. rev_manager: &Arc<RevisionManager>,
  127. client_delta: Delta<T>,
  128. server_delta: Option<Delta<T>>,
  129. md5: String,
  130. ) -> (Revision, Option<Revision>)
  131. where
  132. T: Attributes + serde::Serialize,
  133. {
  134. let (base_rev_id, rev_id) = rev_manager.next_rev_id_pair();
  135. let client_revision = Revision::new(
  136. &rev_manager.object_id,
  137. base_rev_id,
  138. rev_id,
  139. client_delta.to_bytes(),
  140. user_id,
  141. md5.clone(),
  142. );
  143. match server_delta {
  144. None => (client_revision, None),
  145. Some(server_delta) => {
  146. let server_revision = Revision::new(
  147. &rev_manager.object_id,
  148. base_rev_id,
  149. rev_id,
  150. server_delta.to_bytes(),
  151. user_id,
  152. md5,
  153. );
  154. (client_revision, Some(server_revision))
  155. }
  156. }
  157. }
  158. pub struct TransformDeltas<T>
  159. where
  160. T: Attributes,
  161. {
  162. pub client_prime: Delta<T>,
  163. pub server_prime: Option<Delta<T>>,
  164. }