conflict_resolve.rs 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. use crate::{RevisionMD5, RevisionManager};
  2. use bytes::Bytes;
  3. use flowy_error::{FlowyError, FlowyResult};
  4. use flowy_http_model::{
  5. revision::{RepeatedRevision, Revision, RevisionRange},
  6. ws_data::ServerRevisionWSDataType,
  7. };
  8. use lib_infra::future::BoxResultFuture;
  9. use std::{convert::TryFrom, sync::Arc};
  10. pub struct TransformOperations<Operations> {
  11. pub client_operations: Operations,
  12. pub server_operations: Option<Operations>,
  13. }
  14. pub trait OperationsDeserializer<T>: Send + Sync {
  15. fn deserialize_revisions(revisions: Vec<Revision>) -> FlowyResult<T>;
  16. }
  17. pub trait OperationsSerializer: Send + Sync {
  18. fn serialize_operations(&self) -> Bytes;
  19. }
  20. pub struct ConflictOperations<T>(T);
  21. pub trait ConflictResolver<Operations>
  22. where
  23. Operations: Send + Sync,
  24. {
  25. fn compose_operations(&self, operations: Operations) -> BoxResultFuture<RevisionMD5, FlowyError>;
  26. fn transform_operations(
  27. &self,
  28. operations: Operations,
  29. ) -> BoxResultFuture<TransformOperations<Operations>, FlowyError>;
  30. fn reset_operations(&self, operations: Operations) -> BoxResultFuture<RevisionMD5, FlowyError>;
  31. }
  32. pub trait ConflictRevisionSink: Send + Sync + 'static {
  33. fn send(&self, revisions: Vec<Revision>) -> BoxResultFuture<(), FlowyError>;
  34. fn ack(&self, rev_id: String, ty: ServerRevisionWSDataType) -> BoxResultFuture<(), FlowyError>;
  35. }
  36. pub struct ConflictController<Operations, Connection>
  37. where
  38. Operations: Send + Sync,
  39. {
  40. user_id: String,
  41. resolver: Arc<dyn ConflictResolver<Operations> + Send + Sync>,
  42. rev_sink: Arc<dyn ConflictRevisionSink>,
  43. rev_manager: Arc<RevisionManager<Connection>>,
  44. }
  45. impl<Operations, Connection> ConflictController<Operations, Connection>
  46. where
  47. Operations: Clone + Send + Sync,
  48. Connection: 'static,
  49. {
  50. pub fn new(
  51. user_id: &str,
  52. resolver: Arc<dyn ConflictResolver<Operations> + Send + Sync>,
  53. rev_sink: Arc<dyn ConflictRevisionSink>,
  54. rev_manager: Arc<RevisionManager<Connection>>,
  55. ) -> Self {
  56. let user_id = user_id.to_owned();
  57. Self {
  58. user_id,
  59. resolver,
  60. rev_sink,
  61. rev_manager,
  62. }
  63. }
  64. }
  65. impl<Operations, Connection> ConflictController<Operations, Connection>
  66. where
  67. Operations: OperationsSerializer + OperationsDeserializer<Operations> + Clone + Send + Sync,
  68. Connection: Send + Sync + 'static,
  69. {
  70. pub async fn receive_bytes(&self, bytes: Bytes) -> FlowyResult<()> {
  71. let repeated_revision = RepeatedRevision::try_from(bytes)?;
  72. if repeated_revision.is_empty() {
  73. return Ok(());
  74. }
  75. match self.handle_revision(repeated_revision).await? {
  76. None => {}
  77. Some(server_revision) => {
  78. self.rev_sink.send(vec![server_revision]).await?;
  79. }
  80. }
  81. Ok(())
  82. }
  83. pub async fn ack_revision(&self, rev_id: String, ty: ServerRevisionWSDataType) -> FlowyResult<()> {
  84. let _ = self.rev_sink.ack(rev_id, ty).await?;
  85. Ok(())
  86. }
  87. pub async fn send_revisions(&self, range: RevisionRange) -> FlowyResult<()> {
  88. let revisions = self.rev_manager.get_revisions_in_range(range).await?;
  89. let _ = self.rev_sink.send(revisions).await?;
  90. Ok(())
  91. }
  92. async fn handle_revision(&self, repeated_revision: RepeatedRevision) -> FlowyResult<Option<Revision>> {
  93. let mut revisions = repeated_revision.into_inner();
  94. let first_revision = revisions.first().unwrap();
  95. if let Some(local_revision) = self.rev_manager.get_revision(first_revision.rev_id).await {
  96. if local_revision.md5 == first_revision.md5 {
  97. // The local revision is equal to the pushed revision. Just ignore it.
  98. revisions = revisions.split_off(1);
  99. if revisions.is_empty() {
  100. return Ok(None);
  101. }
  102. } else {
  103. return Ok(None);
  104. }
  105. }
  106. let new_operations = Operations::deserialize_revisions(revisions.clone())?;
  107. let TransformOperations {
  108. client_operations,
  109. server_operations,
  110. } = self.resolver.transform_operations(new_operations).await?;
  111. match server_operations {
  112. None => {
  113. // The server_prime is None means the client local revisions conflict with the
  114. // // server, and it needs to override the client delta.
  115. let md5 = self.resolver.reset_operations(client_operations).await?;
  116. debug_assert!(md5.is_equal(&revisions.last().unwrap().md5));
  117. let _ = self.rev_manager.reset_object(revisions).await?;
  118. Ok(None)
  119. }
  120. Some(server_operations) => {
  121. let md5 = self.resolver.compose_operations(client_operations.clone()).await?;
  122. for revision in &revisions {
  123. let _ = self.rev_manager.add_remote_revision(revision).await?;
  124. }
  125. let (client_revision, server_revision) = make_client_and_server_revision(
  126. &self.user_id,
  127. &self.rev_manager,
  128. client_operations,
  129. Some(server_operations),
  130. md5,
  131. );
  132. let _ = self.rev_manager.add_remote_revision(&client_revision).await?;
  133. Ok(server_revision)
  134. }
  135. }
  136. }
  137. }
  138. fn make_client_and_server_revision<Operations, Connection>(
  139. _user_id: &str,
  140. rev_manager: &Arc<RevisionManager<Connection>>,
  141. client_operations: Operations,
  142. server_operations: Option<Operations>,
  143. md5: RevisionMD5,
  144. ) -> (Revision, Option<Revision>)
  145. where
  146. Operations: OperationsSerializer,
  147. Connection: 'static,
  148. {
  149. let (base_rev_id, rev_id) = rev_manager.next_rev_id_pair();
  150. let bytes = client_operations.serialize_operations();
  151. let client_revision = Revision::new(&rev_manager.object_id, base_rev_id, rev_id, bytes, md5.clone());
  152. match server_operations {
  153. None => (client_revision, None),
  154. Some(operations) => {
  155. let bytes = operations.serialize_operations();
  156. let server_revision = Revision::new(&rev_manager.object_id, base_rev_id, rev_id, bytes, md5);
  157. (client_revision, Some(server_revision))
  158. }
  159. }
  160. }