web_socket.rs 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. use crate::{
  2. core::{EditorCommand, DOCUMENT_SYNC_INTERVAL_IN_MILLIS},
  3. DocumentWSReceiver,
  4. };
  5. use async_trait::async_trait;
  6. use bytes::Bytes;
  7. use flowy_collaboration::{
  8. entities::{
  9. revision::RevisionRange,
  10. ws_data::{ClientRevisionWSData, NewDocumentUser, ServerRevisionWSData, ServerRevisionWSDataType},
  11. },
  12. errors::CollaborateResult,
  13. };
  14. use flowy_error::{internal_error, FlowyError};
  15. use flowy_sync::*;
  16. use lib_infra::future::{BoxResultFuture, FutureResult};
  17. use lib_ot::{core::Delta, rich_text::RichTextAttributes};
  18. use lib_ws::WSConnectState;
  19. use std::{sync::Arc, time::Duration};
  20. use tokio::sync::{
  21. broadcast,
  22. mpsc::{Receiver, Sender},
  23. oneshot,
  24. };
  25. pub(crate) type EditorCommandSender = Sender<EditorCommand>;
  26. pub(crate) type EditorCommandReceiver = Receiver<EditorCommand>;
  27. pub(crate) async fn make_document_ws_manager(
  28. doc_id: String,
  29. user_id: String,
  30. edit_cmd_tx: EditorCommandSender,
  31. rev_manager: Arc<RevisionManager>,
  32. web_socket: Arc<dyn RevisionWebSocket>,
  33. ) -> Arc<RevisionWebSocketManager> {
  34. let composite_sink_provider = Arc::new(CompositeWSSinkDataProvider::new(&doc_id, rev_manager.clone()));
  35. let resolve_target = Arc::new(DocumentRevisionResolveTarget { edit_cmd_tx });
  36. let resolver = RevisionConflictResolver::<RichTextAttributes>::new(
  37. &user_id,
  38. resolve_target,
  39. Arc::new(composite_sink_provider.clone()),
  40. rev_manager,
  41. );
  42. let ws_stream_consumer = Arc::new(DocumentWSSteamConsumerAdapter {
  43. resolver: Arc::new(resolver),
  44. });
  45. let sink_provider = Arc::new(DocumentWSSinkDataProviderAdapter(composite_sink_provider));
  46. let ping_duration = Duration::from_millis(DOCUMENT_SYNC_INTERVAL_IN_MILLIS);
  47. let ws_manager = Arc::new(RevisionWebSocketManager::new(
  48. "Document",
  49. &doc_id,
  50. web_socket,
  51. sink_provider,
  52. ws_stream_consumer,
  53. ping_duration,
  54. ));
  55. listen_document_ws_state(&user_id, &doc_id, ws_manager.scribe_state());
  56. ws_manager
  57. }
  58. fn listen_document_ws_state(_user_id: &str, _doc_id: &str, mut subscriber: broadcast::Receiver<WSConnectState>) {
  59. tokio::spawn(async move {
  60. while let Ok(state) = subscriber.recv().await {
  61. match state {
  62. WSConnectState::Init => {},
  63. WSConnectState::Connecting => {},
  64. WSConnectState::Connected => {},
  65. WSConnectState::Disconnected => {},
  66. }
  67. }
  68. });
  69. }
  70. pub(crate) struct DocumentWSSteamConsumerAdapter {
  71. resolver: Arc<RevisionConflictResolver<RichTextAttributes>>,
  72. }
  73. impl RevisionWSSteamConsumer for DocumentWSSteamConsumerAdapter {
  74. fn receive_push_revision(&self, bytes: Bytes) -> BoxResultFuture<(), FlowyError> {
  75. let resolver = self.resolver.clone();
  76. Box::pin(async move { resolver.receive_bytes(bytes).await })
  77. }
  78. fn receive_ack(&self, id: String, ty: ServerRevisionWSDataType) -> BoxResultFuture<(), FlowyError> {
  79. let resolver = self.resolver.clone();
  80. Box::pin(async move { resolver.ack_revision(id, ty).await })
  81. }
  82. fn receive_new_user_connect(&self, _new_user: NewDocumentUser) -> BoxResultFuture<(), FlowyError> {
  83. // Do nothing by now, just a placeholder for future extension.
  84. Box::pin(async move { Ok(()) })
  85. }
  86. fn pull_revisions_in_range(&self, range: RevisionRange) -> BoxResultFuture<(), FlowyError> {
  87. let resolver = self.resolver.clone();
  88. Box::pin(async move { resolver.send_revisions(range).await })
  89. }
  90. }
  91. pub(crate) struct DocumentWSSinkDataProviderAdapter(pub(crate) Arc<CompositeWSSinkDataProvider>);
  92. impl RevisionWSSinkDataProvider for DocumentWSSinkDataProviderAdapter {
  93. fn next(&self) -> FutureResult<Option<ClientRevisionWSData>, FlowyError> {
  94. let sink_provider = self.0.clone();
  95. FutureResult::new(async move { sink_provider.next().await })
  96. }
  97. }
  98. struct DocumentRevisionResolveTarget {
  99. edit_cmd_tx: EditorCommandSender,
  100. }
  101. impl ResolverTarget<RichTextAttributes> for DocumentRevisionResolveTarget {
  102. fn compose_delta(&self, delta: Delta<RichTextAttributes>) -> BoxResultFuture<DeltaMD5, FlowyError> {
  103. let tx = self.edit_cmd_tx.clone();
  104. Box::pin(async move {
  105. let (ret, rx) = oneshot::channel();
  106. tx.send(EditorCommand::ComposeRemoteDelta {
  107. client_delta: delta,
  108. ret,
  109. })
  110. .await
  111. .map_err(internal_error)?;
  112. let md5 = rx.await.map_err(|e| {
  113. FlowyError::internal().context(format!("handle EditorCommand::ComposeRemoteDelta failed: {}", e))
  114. })??;
  115. Ok(md5)
  116. })
  117. }
  118. fn transform_delta(
  119. &self,
  120. delta: Delta<RichTextAttributes>,
  121. ) -> BoxResultFuture<flowy_sync::TransformDeltas<RichTextAttributes>, FlowyError> {
  122. let tx = self.edit_cmd_tx.clone();
  123. Box::pin(async move {
  124. let (ret, rx) = oneshot::channel::<CollaborateResult<TransformDeltas<RichTextAttributes>>>();
  125. tx.send(EditorCommand::TransformDelta { delta, ret })
  126. .await
  127. .map_err(internal_error)?;
  128. let transform_delta = rx
  129. .await
  130. .map_err(|e| FlowyError::internal().context(format!("TransformDelta failed: {}", e)))??;
  131. Ok(transform_delta)
  132. })
  133. }
  134. fn reset_delta(&self, delta: Delta<RichTextAttributes>) -> BoxResultFuture<DeltaMD5, FlowyError> {
  135. let tx = self.edit_cmd_tx.clone();
  136. Box::pin(async move {
  137. let (ret, rx) = oneshot::channel();
  138. let _ = tx
  139. .send(EditorCommand::ResetDelta { delta, ret })
  140. .await
  141. .map_err(internal_error)?;
  142. let md5 = rx.await.map_err(|e| {
  143. FlowyError::internal().context(format!("handle EditorCommand::OverrideDelta failed: {}", e))
  144. })??;
  145. Ok(md5)
  146. })
  147. }
  148. }
  149. // RevisionWebSocketManager registers itself as a DocumentWSReceiver for each
  150. // opened document.
  151. #[async_trait]
  152. impl DocumentWSReceiver for RevisionWebSocketManager {
  153. #[tracing::instrument(level = "debug", skip(self, data), err)]
  154. async fn receive_ws_data(&self, data: ServerRevisionWSData) -> Result<(), FlowyError> {
  155. let _ = self.ws_passthrough_tx.send(data).await.map_err(|e| {
  156. let err_msg = format!("{} passthrough error: {}", self.object_id, e);
  157. FlowyError::internal().context(err_msg)
  158. })?;
  159. Ok(())
  160. }
  161. fn connect_state_changed(&self, state: WSConnectState) {
  162. match self.state_passthrough_tx.send(state) {
  163. Ok(_) => {},
  164. Err(e) => tracing::error!("{}", e),
  165. }
  166. }
  167. }