web_socket.rs 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. use crate::queue::TextTransformOperations;
  2. use crate::{queue::EditorCommand, TEXT_BLOCK_SYNC_INTERVAL_IN_MILLIS};
  3. use bytes::Bytes;
  4. use flowy_error::{internal_error, FlowyError, FlowyResult};
  5. use flowy_revision::*;
  6. use flowy_sync::entities::revision::Revision;
  7. use flowy_sync::{
  8. entities::{
  9. revision::RevisionRange,
  10. ws_data::{ClientRevisionWSData, NewDocumentUser, ServerRevisionWSDataType},
  11. },
  12. errors::CollaborateResult,
  13. };
  14. use lib_infra::future::{BoxResultFuture, FutureResult};
  15. use flowy_sync::util::make_operations_from_revisions;
  16. use lib_ot::text_delta::TextOperations;
  17. use lib_ws::WSConnectState;
  18. use std::{sync::Arc, time::Duration};
  19. use tokio::sync::{
  20. broadcast,
  21. mpsc::{Receiver, Sender},
  22. oneshot,
  23. };
  24. pub(crate) type EditorCommandSender = Sender<EditorCommand>;
  25. pub(crate) type EditorCommandReceiver = Receiver<EditorCommand>;
  26. #[derive(Clone)]
  27. pub struct DocumentResolveOperations(pub TextOperations);
  28. impl OperationsDeserializer<DocumentResolveOperations> for DocumentResolveOperations {
  29. fn deserialize_revisions(revisions: Vec<Revision>) -> FlowyResult<DocumentResolveOperations> {
  30. Ok(DocumentResolveOperations(make_operations_from_revisions(revisions)?))
  31. }
  32. }
  33. impl OperationsSerializer for DocumentResolveOperations {
  34. fn serialize_operations(&self) -> Bytes {
  35. self.0.json_bytes()
  36. }
  37. }
  38. impl DocumentResolveOperations {
  39. pub fn into_inner(self) -> TextOperations {
  40. self.0
  41. }
  42. }
  43. pub type DocumentConflictController = ConflictController<DocumentResolveOperations>;
  44. #[allow(dead_code)]
  45. pub(crate) async fn make_document_ws_manager(
  46. doc_id: String,
  47. user_id: String,
  48. edit_cmd_tx: EditorCommandSender,
  49. rev_manager: Arc<RevisionManager>,
  50. rev_web_socket: Arc<dyn RevisionWebSocket>,
  51. ) -> Arc<RevisionWebSocketManager> {
  52. let ws_data_provider = Arc::new(WSDataProvider::new(&doc_id, Arc::new(rev_manager.clone())));
  53. let resolver = Arc::new(DocumentConflictResolver { edit_cmd_tx });
  54. let conflict_controller =
  55. DocumentConflictController::new(&user_id, resolver, Arc::new(ws_data_provider.clone()), rev_manager);
  56. let ws_data_stream = Arc::new(DocumentRevisionWSDataStream::new(conflict_controller));
  57. let ws_data_sink = Arc::new(DocumentWSDataSink(ws_data_provider));
  58. let ping_duration = Duration::from_millis(TEXT_BLOCK_SYNC_INTERVAL_IN_MILLIS);
  59. let ws_manager = Arc::new(RevisionWebSocketManager::new(
  60. "Block",
  61. &doc_id,
  62. rev_web_socket,
  63. ws_data_sink,
  64. ws_data_stream,
  65. ping_duration,
  66. ));
  67. listen_document_ws_state(&user_id, &doc_id, ws_manager.scribe_state());
  68. ws_manager
  69. }
  70. #[allow(dead_code)]
  71. fn listen_document_ws_state(_user_id: &str, _doc_id: &str, mut subscriber: broadcast::Receiver<WSConnectState>) {
  72. tokio::spawn(async move {
  73. while let Ok(state) = subscriber.recv().await {
  74. match state {
  75. WSConnectState::Init => {}
  76. WSConnectState::Connecting => {}
  77. WSConnectState::Connected => {}
  78. WSConnectState::Disconnected => {}
  79. }
  80. }
  81. });
  82. }
  83. pub(crate) struct DocumentRevisionWSDataStream {
  84. conflict_controller: Arc<DocumentConflictController>,
  85. }
  86. impl DocumentRevisionWSDataStream {
  87. #[allow(dead_code)]
  88. pub fn new(conflict_controller: DocumentConflictController) -> Self {
  89. Self {
  90. conflict_controller: Arc::new(conflict_controller),
  91. }
  92. }
  93. }
  94. impl RevisionWSDataStream for DocumentRevisionWSDataStream {
  95. fn receive_push_revision(&self, bytes: Bytes) -> BoxResultFuture<(), FlowyError> {
  96. let resolver = self.conflict_controller.clone();
  97. Box::pin(async move { resolver.receive_bytes(bytes).await })
  98. }
  99. fn receive_ack(&self, id: String, ty: ServerRevisionWSDataType) -> BoxResultFuture<(), FlowyError> {
  100. let resolver = self.conflict_controller.clone();
  101. Box::pin(async move { resolver.ack_revision(id, ty).await })
  102. }
  103. fn receive_new_user_connect(&self, _new_user: NewDocumentUser) -> BoxResultFuture<(), FlowyError> {
  104. // Do nothing by now, just a placeholder for future extension.
  105. Box::pin(async move { Ok(()) })
  106. }
  107. fn pull_revisions_in_range(&self, range: RevisionRange) -> BoxResultFuture<(), FlowyError> {
  108. let resolver = self.conflict_controller.clone();
  109. Box::pin(async move { resolver.send_revisions(range).await })
  110. }
  111. }
  112. pub(crate) struct DocumentWSDataSink(pub(crate) Arc<WSDataProvider>);
  113. impl RevisionWebSocketSink for DocumentWSDataSink {
  114. fn next(&self) -> FutureResult<Option<ClientRevisionWSData>, FlowyError> {
  115. let sink_provider = self.0.clone();
  116. FutureResult::new(async move { sink_provider.next().await })
  117. }
  118. }
  119. struct DocumentConflictResolver {
  120. edit_cmd_tx: EditorCommandSender,
  121. }
  122. impl ConflictResolver<DocumentResolveOperations> for DocumentConflictResolver {
  123. fn compose_operations(&self, operations: DocumentResolveOperations) -> BoxResultFuture<OperationsMD5, FlowyError> {
  124. let tx = self.edit_cmd_tx.clone();
  125. let operations = operations.into_inner();
  126. Box::pin(async move {
  127. let (ret, rx) = oneshot::channel();
  128. tx.send(EditorCommand::ComposeRemoteOperation {
  129. client_operations: operations,
  130. ret,
  131. })
  132. .await
  133. .map_err(internal_error)?;
  134. let md5 = rx
  135. .await
  136. .map_err(|e| FlowyError::internal().context(format!("Compose operations failed: {}", e)))??;
  137. Ok(md5)
  138. })
  139. }
  140. fn transform_operations(
  141. &self,
  142. operations: DocumentResolveOperations,
  143. ) -> BoxResultFuture<TransformOperations<DocumentResolveOperations>, FlowyError> {
  144. let tx = self.edit_cmd_tx.clone();
  145. let operations = operations.into_inner();
  146. Box::pin(async move {
  147. let (ret, rx) = oneshot::channel::<CollaborateResult<TextTransformOperations>>();
  148. tx.send(EditorCommand::TransformOperations { operations, ret })
  149. .await
  150. .map_err(internal_error)?;
  151. let transformed_operations = rx
  152. .await
  153. .map_err(|e| FlowyError::internal().context(format!("Transform operations failed: {}", e)))??;
  154. Ok(transformed_operations)
  155. })
  156. }
  157. fn reset_operations(&self, operations: DocumentResolveOperations) -> BoxResultFuture<OperationsMD5, FlowyError> {
  158. let tx = self.edit_cmd_tx.clone();
  159. let operations = operations.into_inner();
  160. Box::pin(async move {
  161. let (ret, rx) = oneshot::channel();
  162. let _ = tx
  163. .send(EditorCommand::ResetOperations { operations, ret })
  164. .await
  165. .map_err(internal_error)?;
  166. let md5 = rx
  167. .await
  168. .map_err(|e| FlowyError::internal().context(format!("Reset operations failed: {}", e)))??;
  169. Ok(md5)
  170. })
  171. }
  172. }