web_socket.rs 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. use crate::services::FOLDER_SYNC_INTERVAL_IN_MILLIS;
  2. use bytes::Bytes;
  3. use flowy_collaboration::{
  4. client_folder::FolderPad,
  5. entities::{
  6. revision::RevisionRange,
  7. ws_data::{ClientRevisionWSData, NewDocumentUser, ServerRevisionWSDataType},
  8. },
  9. };
  10. use flowy_error::FlowyError;
  11. use flowy_sync::*;
  12. use lib_infra::future::{BoxResultFuture, FutureResult};
  13. use lib_ot::core::{OperationTransformable, PlainTextAttributes, PlainTextDelta};
  14. use parking_lot::RwLock;
  15. use std::{sync::Arc, time::Duration};
  16. pub(crate) async fn make_folder_ws_manager(
  17. user_id: &str,
  18. folder_id: &str,
  19. rev_manager: Arc<RevisionManager>,
  20. web_socket: Arc<dyn RevisionWebSocket>,
  21. folder_pad: Arc<RwLock<FolderPad>>,
  22. ) -> Arc<RevisionWebSocketManager> {
  23. let ws_data_provider = Arc::new(WSDataProvider::new(folder_id, Arc::new(rev_manager.clone())));
  24. let resolver = Arc::new(FolderConflictResolver { folder_pad });
  25. let conflict_controller = ConflictController::<PlainTextAttributes>::new(
  26. user_id,
  27. resolver,
  28. Arc::new(ws_data_provider.clone()),
  29. rev_manager,
  30. );
  31. let ws_data_stream = Arc::new(FolderRevisionWSDataStream::new(conflict_controller));
  32. let ws_data_sink = Arc::new(FolderWSDataSink(ws_data_provider));
  33. let ping_duration = Duration::from_millis(FOLDER_SYNC_INTERVAL_IN_MILLIS);
  34. Arc::new(RevisionWebSocketManager::new(
  35. "Folder",
  36. folder_id,
  37. web_socket,
  38. ws_data_sink,
  39. ws_data_stream,
  40. ping_duration,
  41. ))
  42. }
  43. pub(crate) struct FolderWSDataSink(Arc<WSDataProvider>);
  44. impl RevisionWSDataIterator for FolderWSDataSink {
  45. fn next(&self) -> FutureResult<Option<ClientRevisionWSData>, FlowyError> {
  46. let sink_provider = self.0.clone();
  47. FutureResult::new(async move { sink_provider.next().await })
  48. }
  49. }
  50. struct FolderConflictResolver {
  51. folder_pad: Arc<RwLock<FolderPad>>,
  52. }
  53. impl ConflictResolver<PlainTextAttributes> for FolderConflictResolver {
  54. fn compose_delta(&self, delta: PlainTextDelta) -> BoxResultFuture<DeltaMD5, FlowyError> {
  55. let folder_pad = self.folder_pad.clone();
  56. Box::pin(async move {
  57. let md5 = folder_pad.write().compose_remote_delta(delta)?;
  58. Ok(md5)
  59. })
  60. }
  61. fn transform_delta(
  62. &self,
  63. delta: PlainTextDelta,
  64. ) -> BoxResultFuture<TransformDeltas<PlainTextAttributes>, FlowyError> {
  65. let folder_pad = self.folder_pad.clone();
  66. Box::pin(async move {
  67. let read_guard = folder_pad.read();
  68. let mut server_prime: Option<PlainTextDelta> = None;
  69. let client_prime: PlainTextDelta;
  70. if read_guard.is_empty() {
  71. // Do nothing
  72. client_prime = delta;
  73. } else {
  74. let (s_prime, c_prime) = read_guard.delta().transform(&delta)?;
  75. client_prime = c_prime;
  76. server_prime = Some(s_prime);
  77. }
  78. drop(read_guard);
  79. Ok(TransformDeltas {
  80. client_prime,
  81. server_prime,
  82. })
  83. })
  84. }
  85. fn reset_delta(&self, delta: PlainTextDelta) -> BoxResultFuture<DeltaMD5, FlowyError> {
  86. let folder_pad = self.folder_pad.clone();
  87. Box::pin(async move {
  88. let md5 = folder_pad.write().reset_folder(delta)?;
  89. Ok(md5)
  90. })
  91. }
  92. }
  93. struct FolderRevisionWSDataStream {
  94. conflict_controller: Arc<PlainTextConflictController>,
  95. }
  96. impl FolderRevisionWSDataStream {
  97. pub fn new(conflict_controller: PlainTextConflictController) -> Self {
  98. Self {
  99. conflict_controller: Arc::new(conflict_controller),
  100. }
  101. }
  102. }
  103. impl RevisionWSDataStream for FolderRevisionWSDataStream {
  104. fn receive_push_revision(&self, bytes: Bytes) -> BoxResultFuture<(), FlowyError> {
  105. let resolver = self.conflict_controller.clone();
  106. Box::pin(async move { resolver.receive_bytes(bytes).await })
  107. }
  108. fn receive_ack(&self, id: String, ty: ServerRevisionWSDataType) -> BoxResultFuture<(), FlowyError> {
  109. let resolver = self.conflict_controller.clone();
  110. Box::pin(async move { resolver.ack_revision(id, ty).await })
  111. }
  112. fn receive_new_user_connect(&self, _new_user: NewDocumentUser) -> BoxResultFuture<(), FlowyError> {
  113. // Do nothing by now, just a placeholder for future extension.
  114. Box::pin(async move { Ok(()) })
  115. }
  116. fn pull_revisions_in_range(&self, range: RevisionRange) -> BoxResultFuture<(), FlowyError> {
  117. let resolver = self.conflict_controller.clone();
  118. Box::pin(async move { resolver.send_revisions(range).await })
  119. }
  120. }