web_socket.rs 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. use crate::{queue::EditorCommand, TEXT_BLOCK_SYNC_INTERVAL_IN_MILLIS};
  2. use bytes::Bytes;
  3. use flowy_error::{internal_error, FlowyError};
  4. use flowy_revision::*;
  5. use flowy_sync::{
  6. entities::{
  7. revision::RevisionRange,
  8. ws_data::{ClientRevisionWSData, NewDocumentUser, ServerRevisionWSDataType},
  9. },
  10. errors::CollaborateResult,
  11. };
  12. use lib_infra::future::{BoxResultFuture, FutureResult};
  13. use lib_ot::rich_text::RichTextAttributes;
  14. use lib_ot::rich_text::RichTextDelta;
  15. use lib_ws::WSConnectState;
  16. use std::{sync::Arc, time::Duration};
  17. use tokio::sync::{
  18. broadcast,
  19. mpsc::{Receiver, Sender},
  20. oneshot,
  21. };
  22. pub(crate) type EditorCommandSender = Sender<EditorCommand>;
  23. pub(crate) type EditorCommandReceiver = Receiver<EditorCommand>;
  24. #[allow(dead_code)]
  25. pub(crate) async fn make_block_ws_manager(
  26. doc_id: String,
  27. user_id: String,
  28. edit_cmd_tx: EditorCommandSender,
  29. rev_manager: Arc<RevisionManager>,
  30. rev_web_socket: Arc<dyn RevisionWebSocket>,
  31. ) -> Arc<RevisionWebSocketManager> {
  32. let ws_data_provider = Arc::new(WSDataProvider::new(&doc_id, Arc::new(rev_manager.clone())));
  33. let resolver = Arc::new(TextBlockConflictResolver { edit_cmd_tx });
  34. let conflict_controller =
  35. RichTextConflictController::new(&user_id, resolver, Arc::new(ws_data_provider.clone()), rev_manager);
  36. let ws_data_stream = Arc::new(TextBlockRevisionWSDataStream::new(conflict_controller));
  37. let ws_data_sink = Arc::new(TextBlockWSDataSink(ws_data_provider));
  38. let ping_duration = Duration::from_millis(TEXT_BLOCK_SYNC_INTERVAL_IN_MILLIS);
  39. let ws_manager = Arc::new(RevisionWebSocketManager::new(
  40. "Block",
  41. &doc_id,
  42. rev_web_socket,
  43. ws_data_sink,
  44. ws_data_stream,
  45. ping_duration,
  46. ));
  47. listen_document_ws_state(&user_id, &doc_id, ws_manager.scribe_state());
  48. ws_manager
  49. }
  50. #[allow(dead_code)]
  51. fn listen_document_ws_state(_user_id: &str, _doc_id: &str, mut subscriber: broadcast::Receiver<WSConnectState>) {
  52. tokio::spawn(async move {
  53. while let Ok(state) = subscriber.recv().await {
  54. match state {
  55. WSConnectState::Init => {}
  56. WSConnectState::Connecting => {}
  57. WSConnectState::Connected => {}
  58. WSConnectState::Disconnected => {}
  59. }
  60. }
  61. });
  62. }
  63. pub(crate) struct TextBlockRevisionWSDataStream {
  64. conflict_controller: Arc<RichTextConflictController>,
  65. }
  66. impl TextBlockRevisionWSDataStream {
  67. #[allow(dead_code)]
  68. pub fn new(conflict_controller: RichTextConflictController) -> Self {
  69. Self {
  70. conflict_controller: Arc::new(conflict_controller),
  71. }
  72. }
  73. }
  74. impl RevisionWSDataStream for TextBlockRevisionWSDataStream {
  75. fn receive_push_revision(&self, bytes: Bytes) -> BoxResultFuture<(), FlowyError> {
  76. let resolver = self.conflict_controller.clone();
  77. Box::pin(async move { resolver.receive_bytes(bytes).await })
  78. }
  79. fn receive_ack(&self, id: String, ty: ServerRevisionWSDataType) -> BoxResultFuture<(), FlowyError> {
  80. let resolver = self.conflict_controller.clone();
  81. Box::pin(async move { resolver.ack_revision(id, ty).await })
  82. }
  83. fn receive_new_user_connect(&self, _new_user: NewDocumentUser) -> BoxResultFuture<(), FlowyError> {
  84. // Do nothing by now, just a placeholder for future extension.
  85. Box::pin(async move { Ok(()) })
  86. }
  87. fn pull_revisions_in_range(&self, range: RevisionRange) -> BoxResultFuture<(), FlowyError> {
  88. let resolver = self.conflict_controller.clone();
  89. Box::pin(async move { resolver.send_revisions(range).await })
  90. }
  91. }
  92. pub(crate) struct TextBlockWSDataSink(pub(crate) Arc<WSDataProvider>);
  93. impl RevisionWebSocketSink for TextBlockWSDataSink {
  94. fn next(&self) -> FutureResult<Option<ClientRevisionWSData>, FlowyError> {
  95. let sink_provider = self.0.clone();
  96. FutureResult::new(async move { sink_provider.next().await })
  97. }
  98. }
  99. struct TextBlockConflictResolver {
  100. edit_cmd_tx: EditorCommandSender,
  101. }
  102. impl ConflictResolver<RichTextAttributes> for TextBlockConflictResolver {
  103. fn compose_delta(&self, delta: RichTextDelta) -> BoxResultFuture<DeltaMD5, FlowyError> {
  104. let tx = self.edit_cmd_tx.clone();
  105. Box::pin(async move {
  106. let (ret, rx) = oneshot::channel();
  107. tx.send(EditorCommand::ComposeRemoteDelta {
  108. client_delta: delta,
  109. ret,
  110. })
  111. .await
  112. .map_err(internal_error)?;
  113. let md5 = rx.await.map_err(|e| {
  114. FlowyError::internal().context(format!("handle EditorCommand::ComposeRemoteDelta failed: {}", e))
  115. })??;
  116. Ok(md5)
  117. })
  118. }
  119. fn transform_delta(
  120. &self,
  121. delta: RichTextDelta,
  122. ) -> BoxResultFuture<flowy_revision::RichTextTransformDeltas, FlowyError> {
  123. let tx = self.edit_cmd_tx.clone();
  124. Box::pin(async move {
  125. let (ret, rx) = oneshot::channel::<CollaborateResult<RichTextTransformDeltas>>();
  126. tx.send(EditorCommand::TransformDelta { delta, ret })
  127. .await
  128. .map_err(internal_error)?;
  129. let transform_delta = rx
  130. .await
  131. .map_err(|e| FlowyError::internal().context(format!("TransformDelta failed: {}", e)))??;
  132. Ok(transform_delta)
  133. })
  134. }
  135. fn reset_delta(&self, delta: RichTextDelta) -> BoxResultFuture<DeltaMD5, FlowyError> {
  136. let tx = self.edit_cmd_tx.clone();
  137. Box::pin(async move {
  138. let (ret, rx) = oneshot::channel();
  139. let _ = tx
  140. .send(EditorCommand::ResetDelta { delta, ret })
  141. .await
  142. .map_err(internal_error)?;
  143. let md5 = rx.await.map_err(|e| {
  144. FlowyError::internal().context(format!("handle EditorCommand::OverrideDelta failed: {}", e))
  145. })??;
  146. Ok(md5)
  147. })
  148. }
  149. }