web_socket.rs 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. use crate::services::FOLDER_SYNC_INTERVAL_IN_MILLIS;
  2. use bytes::Bytes;
  3. use flowy_collaboration::{
  4. entities::{
  5. revision::RevisionRange,
  6. ws_data::{ClientRevisionWSData, NewDocumentUser, ServerRevisionWSDataType},
  7. },
  8. folder::FolderPad,
  9. };
  10. use flowy_error::FlowyError;
  11. use flowy_sync::*;
  12. use lib_infra::future::{BoxResultFuture, FutureResult};
  13. use lib_ot::core::{Delta, OperationTransformable, PlainDelta, PlainTextAttributes};
  14. use parking_lot::RwLock;
  15. use std::{sync::Arc, time::Duration};
  16. pub(crate) async fn make_folder_ws_manager(
  17. user_id: &str,
  18. folder_id: &str,
  19. rev_manager: Arc<RevisionManager>,
  20. web_socket: Arc<dyn RevisionWebSocket>,
  21. folder_pad: Arc<RwLock<FolderPad>>,
  22. ) -> Arc<RevisionWebSocketManager> {
  23. let composite_sink_provider = Arc::new(CompositeWSSinkDataProvider::new(folder_id, rev_manager.clone()));
  24. let resolve_target = Arc::new(FolderRevisionResolveTarget { folder_pad });
  25. let resolver = RevisionConflictResolver::<PlainTextAttributes>::new(
  26. user_id,
  27. resolve_target,
  28. Arc::new(composite_sink_provider.clone()),
  29. rev_manager,
  30. );
  31. let ws_stream_consumer = Arc::new(FolderWSStreamConsumerAdapter {
  32. resolver: Arc::new(resolver),
  33. });
  34. let sink_provider = Arc::new(FolderWSSinkDataProviderAdapter(composite_sink_provider));
  35. let ping_duration = Duration::from_millis(FOLDER_SYNC_INTERVAL_IN_MILLIS);
  36. Arc::new(RevisionWebSocketManager::new(
  37. "Folder",
  38. folder_id,
  39. web_socket,
  40. sink_provider,
  41. ws_stream_consumer,
  42. ping_duration,
  43. ))
  44. }
  45. pub(crate) struct FolderWSSinkDataProviderAdapter(Arc<CompositeWSSinkDataProvider>);
  46. impl RevisionWSSinkDataProvider for FolderWSSinkDataProviderAdapter {
  47. fn next(&self) -> FutureResult<Option<ClientRevisionWSData>, FlowyError> {
  48. let sink_provider = self.0.clone();
  49. FutureResult::new(async move { sink_provider.next().await })
  50. }
  51. }
  52. struct FolderRevisionResolveTarget {
  53. folder_pad: Arc<RwLock<FolderPad>>,
  54. }
  55. impl ResolverTarget<PlainTextAttributes> for FolderRevisionResolveTarget {
  56. fn compose_delta(&self, delta: Delta<PlainTextAttributes>) -> BoxResultFuture<DeltaMD5, FlowyError> {
  57. let folder_pad = self.folder_pad.clone();
  58. Box::pin(async move {
  59. let md5 = folder_pad.write().compose_remote_delta(delta)?;
  60. Ok(md5)
  61. })
  62. }
  63. fn transform_delta(
  64. &self,
  65. delta: Delta<PlainTextAttributes>,
  66. ) -> BoxResultFuture<TransformDeltas<PlainTextAttributes>, FlowyError> {
  67. let folder_pad = self.folder_pad.clone();
  68. Box::pin(async move {
  69. let read_guard = folder_pad.read();
  70. let mut server_prime: Option<PlainDelta> = None;
  71. let client_prime: PlainDelta;
  72. if read_guard.is_empty() {
  73. // Do nothing
  74. client_prime = delta;
  75. } else {
  76. let (s_prime, c_prime) = read_guard.delta().transform(&delta)?;
  77. client_prime = c_prime;
  78. server_prime = Some(s_prime);
  79. }
  80. drop(read_guard);
  81. Ok(TransformDeltas {
  82. client_prime,
  83. server_prime,
  84. })
  85. })
  86. }
  87. fn reset_delta(&self, delta: Delta<PlainTextAttributes>) -> BoxResultFuture<DeltaMD5, FlowyError> {
  88. let folder_pad = self.folder_pad.clone();
  89. Box::pin(async move {
  90. let md5 = folder_pad.write().reset_folder(delta)?;
  91. Ok(md5)
  92. })
  93. }
  94. }
  95. struct FolderWSStreamConsumerAdapter {
  96. resolver: Arc<RevisionConflictResolver<PlainTextAttributes>>,
  97. }
  98. impl RevisionWSSteamConsumer for FolderWSStreamConsumerAdapter {
  99. fn receive_push_revision(&self, bytes: Bytes) -> BoxResultFuture<(), FlowyError> {
  100. let resolver = self.resolver.clone();
  101. Box::pin(async move { resolver.receive_bytes(bytes).await })
  102. }
  103. fn receive_ack(&self, id: String, ty: ServerRevisionWSDataType) -> BoxResultFuture<(), FlowyError> {
  104. let resolver = self.resolver.clone();
  105. Box::pin(async move { resolver.ack_revision(id, ty).await })
  106. }
  107. fn receive_new_user_connect(&self, _new_user: NewDocumentUser) -> BoxResultFuture<(), FlowyError> {
  108. // Do nothing by now, just a placeholder for future extension.
  109. Box::pin(async move { Ok(()) })
  110. }
  111. fn pull_revisions_in_range(&self, range: RevisionRange) -> BoxResultFuture<(), FlowyError> {
  112. let resolver = self.resolver.clone();
  113. Box::pin(async move { resolver.send_revisions(range).await })
  114. }
  115. }