web_socket.rs 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. use crate::services::FOLDER_SYNC_INTERVAL_IN_MILLIS;
  2. use bytes::Bytes;
  3. use flowy_client_sync::client_folder::{FolderOperations, FolderPad};
  4. use flowy_client_sync::make_operations_from_revisions;
  5. use flowy_error::{FlowyError, FlowyResult};
  6. use flowy_revision::*;
  7. use flowy_sqlite::ConnectionPool;
  8. use lib_infra::future::{BoxResultFuture, FutureResult};
  9. use lib_ot::core::OperationTransform;
  10. use parking_lot::RwLock;
  11. use revision_model::{Revision, RevisionRange};
  12. use std::{sync::Arc, time::Duration};
  13. use ws_model::ws_revision::{ClientRevisionWSData, NewDocumentUser};
  14. #[derive(Clone)]
  15. pub struct FolderResolveOperations(pub FolderOperations);
  16. impl OperationsDeserializer<FolderResolveOperations> for FolderResolveOperations {
  17. fn deserialize_revisions(revisions: Vec<Revision>) -> FlowyResult<FolderResolveOperations> {
  18. Ok(FolderResolveOperations(make_operations_from_revisions(revisions)?))
  19. }
  20. }
  21. impl OperationsSerializer for FolderResolveOperations {
  22. fn serialize_operations(&self) -> Bytes {
  23. self.0.json_bytes()
  24. }
  25. }
  26. impl FolderResolveOperations {
  27. pub fn into_inner(self) -> FolderOperations {
  28. self.0
  29. }
  30. }
  31. pub type FolderConflictController = ConflictController<FolderResolveOperations, Arc<ConnectionPool>>;
  32. #[allow(dead_code)]
  33. pub(crate) async fn make_folder_ws_manager(
  34. user_id: &str,
  35. folder_id: &str,
  36. rev_manager: Arc<RevisionManager<Arc<ConnectionPool>>>,
  37. web_socket: Arc<dyn RevisionWebSocket>,
  38. folder_pad: Arc<RwLock<FolderPad>>,
  39. ) -> Arc<RevisionWebSocketManager> {
  40. let ws_data_provider = Arc::new(WSDataProvider::new(folder_id, Arc::new(rev_manager.clone())));
  41. let resolver = Arc::new(FolderConflictResolver { folder_pad });
  42. let conflict_controller =
  43. FolderConflictController::new(user_id, resolver, Arc::new(ws_data_provider.clone()), rev_manager);
  44. let ws_data_stream = Arc::new(FolderRevisionWSDataStream::new(conflict_controller));
  45. let ws_data_sink = Arc::new(FolderWSDataSink(ws_data_provider));
  46. let ping_duration = Duration::from_millis(FOLDER_SYNC_INTERVAL_IN_MILLIS);
  47. Arc::new(RevisionWebSocketManager::new(
  48. "Folder",
  49. folder_id,
  50. web_socket,
  51. ws_data_sink,
  52. ws_data_stream,
  53. ping_duration,
  54. ))
  55. }
  56. pub(crate) struct FolderWSDataSink(Arc<WSDataProvider>);
  57. impl RevisionWebSocketSink for FolderWSDataSink {
  58. fn next(&self) -> FutureResult<Option<ClientRevisionWSData>, FlowyError> {
  59. let sink_provider = self.0.clone();
  60. FutureResult::new(async move { sink_provider.next().await })
  61. }
  62. }
  63. struct FolderConflictResolver {
  64. folder_pad: Arc<RwLock<FolderPad>>,
  65. }
  66. impl ConflictResolver<FolderResolveOperations> for FolderConflictResolver {
  67. fn compose_operations(&self, operations: FolderResolveOperations) -> BoxResultFuture<RevisionMD5, FlowyError> {
  68. let operations = operations.into_inner();
  69. let folder_pad = self.folder_pad.clone();
  70. Box::pin(async move {
  71. let md5 = folder_pad.write().compose_remote_operations(operations)?;
  72. Ok(md5.into())
  73. })
  74. }
  75. fn transform_operations(
  76. &self,
  77. operations: FolderResolveOperations,
  78. ) -> BoxResultFuture<TransformOperations<FolderResolveOperations>, FlowyError> {
  79. let folder_pad = self.folder_pad.clone();
  80. let operations = operations.into_inner();
  81. Box::pin(async move {
  82. let read_guard = folder_pad.read();
  83. let mut server_operations: Option<FolderResolveOperations> = None;
  84. let client_operations: FolderResolveOperations;
  85. if read_guard.is_empty() {
  86. // Do nothing
  87. client_operations = FolderResolveOperations(operations);
  88. } else {
  89. let (s_prime, c_prime) = read_guard.get_operations().transform(&operations)?;
  90. client_operations = FolderResolveOperations(c_prime);
  91. server_operations = Some(FolderResolveOperations(s_prime));
  92. }
  93. drop(read_guard);
  94. Ok(TransformOperations {
  95. client_operations,
  96. server_operations,
  97. })
  98. })
  99. }
  100. fn reset_operations(&self, operations: FolderResolveOperations) -> BoxResultFuture<RevisionMD5, FlowyError> {
  101. let folder_pad = self.folder_pad.clone();
  102. Box::pin(async move {
  103. let md5 = folder_pad.write().reset_folder(operations.into_inner())?;
  104. Ok(md5.into())
  105. })
  106. }
  107. }
  108. struct FolderRevisionWSDataStream {
  109. conflict_controller: Arc<FolderConflictController>,
  110. }
  111. impl FolderRevisionWSDataStream {
  112. pub fn new(conflict_controller: FolderConflictController) -> Self {
  113. Self {
  114. conflict_controller: Arc::new(conflict_controller),
  115. }
  116. }
  117. }
  118. impl RevisionWSDataStream for FolderRevisionWSDataStream {
  119. fn receive_push_revision(&self, revisions: Vec<Revision>) -> BoxResultFuture<(), FlowyError> {
  120. let resolver = self.conflict_controller.clone();
  121. Box::pin(async move { resolver.receive_revisions(revisions).await })
  122. }
  123. fn receive_ack(&self, rev_id: i64) -> BoxResultFuture<(), FlowyError> {
  124. let resolver = self.conflict_controller.clone();
  125. Box::pin(async move { resolver.ack_revision(rev_id).await })
  126. }
  127. fn receive_new_user_connect(&self, _new_user: NewDocumentUser) -> BoxResultFuture<(), FlowyError> {
  128. // Do nothing by now, just a placeholder for future extension.
  129. Box::pin(async move { Ok(()) })
  130. }
  131. fn pull_revisions_in_range(&self, range: RevisionRange) -> BoxResultFuture<(), FlowyError> {
  132. let resolver = self.conflict_controller.clone();
  133. Box::pin(async move { resolver.send_revisions(range).await })
  134. }
  135. }