web_socket.rs 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. use crate::services::FOLDER_SYNC_INTERVAL_IN_MILLIS;
  2. use bytes::Bytes;
  3. use flowy_error::FlowyError;
  4. use flowy_revision::*;
  5. use flowy_sync::{
  6. client_folder::FolderPad,
  7. entities::{
  8. revision::RevisionRange,
  9. ws_data::{ClientRevisionWSData, NewDocumentUser, ServerRevisionWSDataType},
  10. },
  11. };
  12. use lib_infra::future::{BoxResultFuture, FutureResult};
  13. use lib_ot::core::{Delta, EmptyAttributes, OperationTransform};
  14. use parking_lot::RwLock;
  15. use std::{sync::Arc, time::Duration};
  16. #[allow(dead_code)]
  17. pub(crate) async fn make_folder_ws_manager(
  18. user_id: &str,
  19. folder_id: &str,
  20. rev_manager: Arc<RevisionManager>,
  21. web_socket: Arc<dyn RevisionWebSocket>,
  22. folder_pad: Arc<RwLock<FolderPad>>,
  23. ) -> Arc<RevisionWebSocketManager> {
  24. let ws_data_provider = Arc::new(WSDataProvider::new(folder_id, Arc::new(rev_manager.clone())));
  25. let resolver = Arc::new(FolderConflictResolver { folder_pad });
  26. let conflict_controller =
  27. ConflictController::<EmptyAttributes>::new(user_id, resolver, Arc::new(ws_data_provider.clone()), rev_manager);
  28. let ws_data_stream = Arc::new(FolderRevisionWSDataStream::new(conflict_controller));
  29. let ws_data_sink = Arc::new(FolderWSDataSink(ws_data_provider));
  30. let ping_duration = Duration::from_millis(FOLDER_SYNC_INTERVAL_IN_MILLIS);
  31. Arc::new(RevisionWebSocketManager::new(
  32. "Folder",
  33. folder_id,
  34. web_socket,
  35. ws_data_sink,
  36. ws_data_stream,
  37. ping_duration,
  38. ))
  39. }
  40. pub(crate) struct FolderWSDataSink(Arc<WSDataProvider>);
  41. impl RevisionWebSocketSink for FolderWSDataSink {
  42. fn next(&self) -> FutureResult<Option<ClientRevisionWSData>, FlowyError> {
  43. let sink_provider = self.0.clone();
  44. FutureResult::new(async move { sink_provider.next().await })
  45. }
  46. }
  47. struct FolderConflictResolver {
  48. folder_pad: Arc<RwLock<FolderPad>>,
  49. }
  50. impl ConflictResolver<EmptyAttributes> for FolderConflictResolver {
  51. fn compose_delta(&self, delta: Delta) -> BoxResultFuture<DeltaMD5, FlowyError> {
  52. let folder_pad = self.folder_pad.clone();
  53. Box::pin(async move {
  54. let md5 = folder_pad.write().compose_remote_delta(delta)?;
  55. Ok(md5)
  56. })
  57. }
  58. fn transform_delta(&self, delta: Delta) -> BoxResultFuture<TransformDeltas<EmptyAttributes>, FlowyError> {
  59. let folder_pad = self.folder_pad.clone();
  60. Box::pin(async move {
  61. let read_guard = folder_pad.read();
  62. let mut server_prime: Option<Delta> = None;
  63. let client_prime: Delta;
  64. if read_guard.is_empty() {
  65. // Do nothing
  66. client_prime = delta;
  67. } else {
  68. let (s_prime, c_prime) = read_guard.delta().transform(&delta)?;
  69. client_prime = c_prime;
  70. server_prime = Some(s_prime);
  71. }
  72. drop(read_guard);
  73. Ok(TransformDeltas {
  74. client_prime,
  75. server_prime,
  76. })
  77. })
  78. }
  79. fn reset_delta(&self, delta: Delta) -> BoxResultFuture<DeltaMD5, FlowyError> {
  80. let folder_pad = self.folder_pad.clone();
  81. Box::pin(async move {
  82. let md5 = folder_pad.write().reset_folder(delta)?;
  83. Ok(md5)
  84. })
  85. }
  86. }
  87. struct FolderRevisionWSDataStream {
  88. conflict_controller: Arc<PlainTextConflictController>,
  89. }
  90. impl FolderRevisionWSDataStream {
  91. pub fn new(conflict_controller: PlainTextConflictController) -> Self {
  92. Self {
  93. conflict_controller: Arc::new(conflict_controller),
  94. }
  95. }
  96. }
  97. impl RevisionWSDataStream for FolderRevisionWSDataStream {
  98. fn receive_push_revision(&self, bytes: Bytes) -> BoxResultFuture<(), FlowyError> {
  99. let resolver = self.conflict_controller.clone();
  100. Box::pin(async move { resolver.receive_bytes(bytes).await })
  101. }
  102. fn receive_ack(&self, id: String, ty: ServerRevisionWSDataType) -> BoxResultFuture<(), FlowyError> {
  103. let resolver = self.conflict_controller.clone();
  104. Box::pin(async move { resolver.ack_revision(id, ty).await })
  105. }
  106. fn receive_new_user_connect(&self, _new_user: NewDocumentUser) -> BoxResultFuture<(), FlowyError> {
  107. // Do nothing by now, just a placeholder for future extension.
  108. Box::pin(async move { Ok(()) })
  109. }
  110. fn pull_revisions_in_range(&self, range: RevisionRange) -> BoxResultFuture<(), FlowyError> {
  111. let resolver = self.conflict_controller.clone();
  112. Box::pin(async move { resolver.send_revisions(range).await })
  113. }
  114. }