conflict_resolve.rs 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. use crate::RevisionManager;
  2. use bytes::Bytes;
  3. use flowy_error::{FlowyError, FlowyResult};
  4. use flowy_sync::entities::{
  5. revision::{RepeatedRevision, Revision, RevisionRange},
  6. ws_data::ServerRevisionWSDataType,
  7. };
  8. use lib_infra::future::BoxResultFuture;
  9. use std::{convert::TryFrom, sync::Arc};
  10. pub type OperationsMD5 = String;
  11. pub struct TransformOperations<Operations> {
  12. pub client_operations: Operations,
  13. pub server_operations: Option<Operations>,
  14. }
  15. pub trait OperationsDeserializer<T>: Send + Sync {
  16. fn deserialize_revisions(revisions: Vec<Revision>) -> FlowyResult<T>;
  17. }
  18. pub trait OperationsSerializer: Send + Sync {
  19. fn serialize_operations(&self) -> Bytes;
  20. }
  21. pub struct ConflictOperations<T>(T);
  22. pub trait ConflictResolver<Operations>
  23. where
  24. Operations: Send + Sync,
  25. {
  26. fn compose_operations(&self, operations: Operations) -> BoxResultFuture<OperationsMD5, FlowyError>;
  27. fn transform_operations(
  28. &self,
  29. operations: Operations,
  30. ) -> BoxResultFuture<TransformOperations<Operations>, FlowyError>;
  31. fn reset_operations(&self, operations: Operations) -> BoxResultFuture<OperationsMD5, FlowyError>;
  32. }
  33. pub trait ConflictRevisionSink: Send + Sync + 'static {
  34. fn send(&self, revisions: Vec<Revision>) -> BoxResultFuture<(), FlowyError>;
  35. fn ack(&self, rev_id: String, ty: ServerRevisionWSDataType) -> BoxResultFuture<(), FlowyError>;
  36. }
  37. pub struct ConflictController<Operations>
  38. where
  39. Operations: Send + Sync,
  40. {
  41. user_id: String,
  42. resolver: Arc<dyn ConflictResolver<Operations> + Send + Sync>,
  43. rev_sink: Arc<dyn ConflictRevisionSink>,
  44. rev_manager: Arc<RevisionManager>,
  45. }
  46. impl<Operations> ConflictController<Operations>
  47. where
  48. Operations: Clone + Send + Sync,
  49. {
  50. pub fn new(
  51. user_id: &str,
  52. resolver: Arc<dyn ConflictResolver<Operations> + Send + Sync>,
  53. rev_sink: Arc<dyn ConflictRevisionSink>,
  54. rev_manager: Arc<RevisionManager>,
  55. ) -> Self {
  56. let user_id = user_id.to_owned();
  57. Self {
  58. user_id,
  59. resolver,
  60. rev_sink,
  61. rev_manager,
  62. }
  63. }
  64. }
  65. impl<Operations> ConflictController<Operations>
  66. where
  67. Operations: OperationsSerializer + OperationsDeserializer<Operations> + Clone + Send + Sync,
  68. {
  69. pub async fn receive_bytes(&self, bytes: Bytes) -> FlowyResult<()> {
  70. let repeated_revision = RepeatedRevision::try_from(bytes)?;
  71. if repeated_revision.is_empty() {
  72. return Ok(());
  73. }
  74. match self.handle_revision(repeated_revision).await? {
  75. None => {}
  76. Some(server_revision) => {
  77. self.rev_sink.send(vec![server_revision]).await?;
  78. }
  79. }
  80. Ok(())
  81. }
  82. pub async fn ack_revision(&self, rev_id: String, ty: ServerRevisionWSDataType) -> FlowyResult<()> {
  83. let _ = self.rev_sink.ack(rev_id, ty).await?;
  84. Ok(())
  85. }
  86. pub async fn send_revisions(&self, range: RevisionRange) -> FlowyResult<()> {
  87. let revisions = self.rev_manager.get_revisions_in_range(range).await?;
  88. let _ = self.rev_sink.send(revisions).await?;
  89. Ok(())
  90. }
  91. async fn handle_revision(&self, repeated_revision: RepeatedRevision) -> FlowyResult<Option<Revision>> {
  92. let mut revisions = repeated_revision.into_inner();
  93. let first_revision = revisions.first().unwrap();
  94. if let Some(local_revision) = self.rev_manager.get_revision(first_revision.rev_id).await {
  95. if local_revision.md5 == first_revision.md5 {
  96. // The local revision is equal to the pushed revision. Just ignore it.
  97. revisions = revisions.split_off(1);
  98. if revisions.is_empty() {
  99. return Ok(None);
  100. }
  101. } else {
  102. return Ok(None);
  103. }
  104. }
  105. let new_operations = Operations::deserialize_revisions(revisions.clone())?;
  106. let TransformOperations {
  107. client_operations,
  108. server_operations,
  109. } = self.resolver.transform_operations(new_operations).await?;
  110. match server_operations {
  111. None => {
  112. // The server_prime is None means the client local revisions conflict with the
  113. // // server, and it needs to override the client delta.
  114. let md5 = self.resolver.reset_operations(client_operations).await?;
  115. let repeated_revision = RepeatedRevision::new(revisions);
  116. assert_eq!(repeated_revision.last().unwrap().md5, md5);
  117. let _ = self.rev_manager.reset_object(repeated_revision).await?;
  118. Ok(None)
  119. }
  120. Some(server_operations) => {
  121. let md5 = self.resolver.compose_operations(client_operations.clone()).await?;
  122. for revision in &revisions {
  123. let _ = self.rev_manager.add_remote_revision(revision).await?;
  124. }
  125. let (client_revision, server_revision) = make_client_and_server_revision(
  126. &self.user_id,
  127. &self.rev_manager,
  128. client_operations,
  129. Some(server_operations),
  130. md5,
  131. );
  132. let _ = self.rev_manager.add_remote_revision(&client_revision).await?;
  133. Ok(server_revision)
  134. }
  135. }
  136. }
  137. }
  138. fn make_client_and_server_revision<Operations>(
  139. user_id: &str,
  140. rev_manager: &Arc<RevisionManager>,
  141. client_operations: Operations,
  142. server_operations: Option<Operations>,
  143. md5: String,
  144. ) -> (Revision, Option<Revision>)
  145. where
  146. Operations: OperationsSerializer,
  147. {
  148. let (base_rev_id, rev_id) = rev_manager.next_rev_id_pair();
  149. let bytes = client_operations.serialize_operations();
  150. let client_revision = Revision::new(&rev_manager.object_id, base_rev_id, rev_id, bytes, user_id, md5.clone());
  151. match server_operations {
  152. None => (client_revision, None),
  153. Some(operations) => {
  154. let bytes = operations.serialize_operations();
  155. let server_revision = Revision::new(&rev_manager.object_id, base_rev_id, rev_id, bytes, user_id, md5);
  156. (client_revision, Some(server_revision))
  157. }
  158. }
  159. }