conflict_resolve.rs 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. use crate::RevisionManager;
  2. use bytes::Bytes;
  3. use flowy_error::{FlowyError, FlowyResult};
  4. use flowy_sync::entities::{
  5. revision::{RepeatedRevision, Revision, RevisionRange},
  6. ws_data::ServerRevisionWSDataType,
  7. };
  8. use lib_infra::future::BoxResultFuture;
  9. use std::{convert::TryFrom, sync::Arc};
  10. pub type OperationsMD5 = String;
  11. pub struct TransformOperations<Operations> {
  12. pub client_operations: Operations,
  13. pub server_operations: Option<Operations>,
  14. }
  15. pub trait OperationsDeserializer<T>: Send + Sync {
  16. fn deserialize_revisions(revisions: Vec<Revision>) -> FlowyResult<T>;
  17. }
  18. pub trait OperationsSerializer: Send + Sync {
  19. fn serialize_operations(&self) -> Bytes;
  20. }
  21. pub struct ConflictOperations<T>(T);
  22. pub trait ConflictResolver<Operations>
  23. where
  24. Operations: Send + Sync,
  25. {
  26. fn compose_operations(&self, operations: Operations) -> BoxResultFuture<OperationsMD5, FlowyError>;
  27. fn transform_operations(
  28. &self,
  29. operations: Operations,
  30. ) -> BoxResultFuture<TransformOperations<Operations>, FlowyError>;
  31. fn reset_operations(&self, operations: Operations) -> BoxResultFuture<OperationsMD5, FlowyError>;
  32. }
  33. pub trait ConflictRevisionSink: Send + Sync + 'static {
  34. fn send(&self, revisions: Vec<Revision>) -> BoxResultFuture<(), FlowyError>;
  35. fn ack(&self, rev_id: String, ty: ServerRevisionWSDataType) -> BoxResultFuture<(), FlowyError>;
  36. }
  37. pub struct ConflictController<Operations, Connection>
  38. where
  39. Operations: Send + Sync,
  40. {
  41. user_id: String,
  42. resolver: Arc<dyn ConflictResolver<Operations> + Send + Sync>,
  43. rev_sink: Arc<dyn ConflictRevisionSink>,
  44. rev_manager: Arc<RevisionManager<Connection>>,
  45. }
  46. impl<Operations, Connection> ConflictController<Operations, Connection>
  47. where
  48. Operations: Clone + Send + Sync,
  49. Connection: 'static,
  50. {
  51. pub fn new(
  52. user_id: &str,
  53. resolver: Arc<dyn ConflictResolver<Operations> + Send + Sync>,
  54. rev_sink: Arc<dyn ConflictRevisionSink>,
  55. rev_manager: Arc<RevisionManager<Connection>>,
  56. ) -> Self {
  57. let user_id = user_id.to_owned();
  58. Self {
  59. user_id,
  60. resolver,
  61. rev_sink,
  62. rev_manager,
  63. }
  64. }
  65. }
  66. impl<Operations, Connection> ConflictController<Operations, Connection>
  67. where
  68. Operations: OperationsSerializer + OperationsDeserializer<Operations> + Clone + Send + Sync,
  69. Connection: Send + Sync + 'static,
  70. {
  71. pub async fn receive_bytes(&self, bytes: Bytes) -> FlowyResult<()> {
  72. let repeated_revision = RepeatedRevision::try_from(bytes)?;
  73. if repeated_revision.is_empty() {
  74. return Ok(());
  75. }
  76. match self.handle_revision(repeated_revision).await? {
  77. None => {}
  78. Some(server_revision) => {
  79. self.rev_sink.send(vec![server_revision]).await?;
  80. }
  81. }
  82. Ok(())
  83. }
  84. pub async fn ack_revision(&self, rev_id: String, ty: ServerRevisionWSDataType) -> FlowyResult<()> {
  85. let _ = self.rev_sink.ack(rev_id, ty).await?;
  86. Ok(())
  87. }
  88. pub async fn send_revisions(&self, range: RevisionRange) -> FlowyResult<()> {
  89. let revisions = self.rev_manager.get_revisions_in_range(range).await?;
  90. let _ = self.rev_sink.send(revisions).await?;
  91. Ok(())
  92. }
  93. async fn handle_revision(&self, repeated_revision: RepeatedRevision) -> FlowyResult<Option<Revision>> {
  94. let mut revisions = repeated_revision.into_inner();
  95. let first_revision = revisions.first().unwrap();
  96. if let Some(local_revision) = self.rev_manager.get_revision(first_revision.rev_id).await {
  97. if local_revision.md5 == first_revision.md5 {
  98. // The local revision is equal to the pushed revision. Just ignore it.
  99. revisions = revisions.split_off(1);
  100. if revisions.is_empty() {
  101. return Ok(None);
  102. }
  103. } else {
  104. return Ok(None);
  105. }
  106. }
  107. let new_operations = Operations::deserialize_revisions(revisions.clone())?;
  108. let TransformOperations {
  109. client_operations,
  110. server_operations,
  111. } = self.resolver.transform_operations(new_operations).await?;
  112. match server_operations {
  113. None => {
  114. // The server_prime is None means the client local revisions conflict with the
  115. // // server, and it needs to override the client delta.
  116. let md5 = self.resolver.reset_operations(client_operations).await?;
  117. let repeated_revision = RepeatedRevision::new(revisions);
  118. assert_eq!(repeated_revision.last().unwrap().md5, md5);
  119. let _ = self.rev_manager.reset_object(repeated_revision).await?;
  120. Ok(None)
  121. }
  122. Some(server_operations) => {
  123. let md5 = self.resolver.compose_operations(client_operations.clone()).await?;
  124. for revision in &revisions {
  125. let _ = self.rev_manager.add_remote_revision(revision).await?;
  126. }
  127. let (client_revision, server_revision) = make_client_and_server_revision(
  128. &self.user_id,
  129. &self.rev_manager,
  130. client_operations,
  131. Some(server_operations),
  132. md5,
  133. );
  134. let _ = self.rev_manager.add_remote_revision(&client_revision).await?;
  135. Ok(server_revision)
  136. }
  137. }
  138. }
  139. }
  140. fn make_client_and_server_revision<Operations, Connection>(
  141. user_id: &str,
  142. rev_manager: &Arc<RevisionManager<Connection>>,
  143. client_operations: Operations,
  144. server_operations: Option<Operations>,
  145. md5: String,
  146. ) -> (Revision, Option<Revision>)
  147. where
  148. Operations: OperationsSerializer,
  149. Connection: 'static,
  150. {
  151. let (base_rev_id, rev_id) = rev_manager.next_rev_id_pair();
  152. let bytes = client_operations.serialize_operations();
  153. let client_revision = Revision::new(&rev_manager.object_id, base_rev_id, rev_id, bytes, user_id, md5.clone());
  154. match server_operations {
  155. None => (client_revision, None),
  156. Some(operations) => {
  157. let bytes = operations.serialize_operations();
  158. let server_revision = Revision::new(&rev_manager.object_id, base_rev_id, rev_id, bytes, user_id, md5);
  159. (client_revision, Some(server_revision))
  160. }
  161. }
  162. }