web_socket.rs 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. use crate::old_editor::queue::{EditorCommand, EditorCommandSender, TextTransformOperations};
  2. use crate::TEXT_BLOCK_SYNC_INTERVAL_IN_MILLIS;
  3. use bytes::Bytes;
  4. use flowy_error::{internal_error, FlowyError, FlowyResult};
  5. use flowy_revision::*;
  6. use flowy_sync::entities::revision::Revision;
  7. use flowy_sync::util::make_operations_from_revisions;
  8. use flowy_sync::{
  9. entities::{
  10. revision::RevisionRange,
  11. ws_data::{ClientRevisionWSData, NewDocumentUser, ServerRevisionWSDataType},
  12. },
  13. errors::CollaborateResult,
  14. };
  15. use lib_infra::future::{BoxResultFuture, FutureResult};
  16. use lib_ot::text_delta::DeltaTextOperations;
  17. use lib_ws::WSConnectState;
  18. use std::{sync::Arc, time::Duration};
  19. use tokio::sync::{broadcast, oneshot};
  20. #[derive(Clone)]
  21. pub struct DeltaDocumentResolveOperations(pub DeltaTextOperations);
  22. impl OperationsDeserializer<DeltaDocumentResolveOperations> for DeltaDocumentResolveOperations {
  23. fn deserialize_revisions(revisions: Vec<Revision>) -> FlowyResult<DeltaDocumentResolveOperations> {
  24. Ok(DeltaDocumentResolveOperations(make_operations_from_revisions(
  25. revisions,
  26. )?))
  27. }
  28. }
  29. impl OperationsSerializer for DeltaDocumentResolveOperations {
  30. fn serialize_operations(&self) -> Bytes {
  31. self.0.json_bytes()
  32. }
  33. }
  34. impl DeltaDocumentResolveOperations {
  35. pub fn into_inner(self) -> DeltaTextOperations {
  36. self.0
  37. }
  38. }
  39. pub type DocumentConflictController = ConflictController<DeltaDocumentResolveOperations>;
  40. #[allow(dead_code)]
  41. pub(crate) async fn make_document_ws_manager(
  42. doc_id: String,
  43. user_id: String,
  44. edit_cmd_tx: EditorCommandSender,
  45. rev_manager: Arc<RevisionManager>,
  46. rev_web_socket: Arc<dyn RevisionWebSocket>,
  47. ) -> Arc<RevisionWebSocketManager> {
  48. let ws_data_provider = Arc::new(WSDataProvider::new(&doc_id, Arc::new(rev_manager.clone())));
  49. let resolver = Arc::new(DocumentConflictResolver { edit_cmd_tx });
  50. let conflict_controller =
  51. DocumentConflictController::new(&user_id, resolver, Arc::new(ws_data_provider.clone()), rev_manager);
  52. let ws_data_stream = Arc::new(DocumentRevisionWSDataStream::new(conflict_controller));
  53. let ws_data_sink = Arc::new(DocumentWSDataSink(ws_data_provider));
  54. let ping_duration = Duration::from_millis(TEXT_BLOCK_SYNC_INTERVAL_IN_MILLIS);
  55. let ws_manager = Arc::new(RevisionWebSocketManager::new(
  56. "Block",
  57. &doc_id,
  58. rev_web_socket,
  59. ws_data_sink,
  60. ws_data_stream,
  61. ping_duration,
  62. ));
  63. listen_document_ws_state(&user_id, &doc_id, ws_manager.scribe_state());
  64. ws_manager
  65. }
  66. #[allow(dead_code)]
  67. fn listen_document_ws_state(_user_id: &str, _doc_id: &str, mut subscriber: broadcast::Receiver<WSConnectState>) {
  68. tokio::spawn(async move {
  69. while let Ok(state) = subscriber.recv().await {
  70. match state {
  71. WSConnectState::Init => {}
  72. WSConnectState::Connecting => {}
  73. WSConnectState::Connected => {}
  74. WSConnectState::Disconnected => {}
  75. }
  76. }
  77. });
  78. }
  79. pub(crate) struct DocumentRevisionWSDataStream {
  80. conflict_controller: Arc<DocumentConflictController>,
  81. }
  82. impl DocumentRevisionWSDataStream {
  83. #[allow(dead_code)]
  84. pub fn new(conflict_controller: DocumentConflictController) -> Self {
  85. Self {
  86. conflict_controller: Arc::new(conflict_controller),
  87. }
  88. }
  89. }
  90. impl RevisionWSDataStream for DocumentRevisionWSDataStream {
  91. fn receive_push_revision(&self, bytes: Bytes) -> BoxResultFuture<(), FlowyError> {
  92. let resolver = self.conflict_controller.clone();
  93. Box::pin(async move { resolver.receive_bytes(bytes).await })
  94. }
  95. fn receive_ack(&self, id: String, ty: ServerRevisionWSDataType) -> BoxResultFuture<(), FlowyError> {
  96. let resolver = self.conflict_controller.clone();
  97. Box::pin(async move { resolver.ack_revision(id, ty).await })
  98. }
  99. fn receive_new_user_connect(&self, _new_user: NewDocumentUser) -> BoxResultFuture<(), FlowyError> {
  100. // Do nothing by now, just a placeholder for future extension.
  101. Box::pin(async move { Ok(()) })
  102. }
  103. fn pull_revisions_in_range(&self, range: RevisionRange) -> BoxResultFuture<(), FlowyError> {
  104. let resolver = self.conflict_controller.clone();
  105. Box::pin(async move { resolver.send_revisions(range).await })
  106. }
  107. }
  108. pub(crate) struct DocumentWSDataSink(pub(crate) Arc<WSDataProvider>);
  109. impl RevisionWebSocketSink for DocumentWSDataSink {
  110. fn next(&self) -> FutureResult<Option<ClientRevisionWSData>, FlowyError> {
  111. let sink_provider = self.0.clone();
  112. FutureResult::new(async move { sink_provider.next().await })
  113. }
  114. }
  115. struct DocumentConflictResolver {
  116. edit_cmd_tx: EditorCommandSender,
  117. }
  118. impl ConflictResolver<DeltaDocumentResolveOperations> for DocumentConflictResolver {
  119. fn compose_operations(
  120. &self,
  121. operations: DeltaDocumentResolveOperations,
  122. ) -> BoxResultFuture<OperationsMD5, FlowyError> {
  123. let tx = self.edit_cmd_tx.clone();
  124. let operations = operations.into_inner();
  125. Box::pin(async move {
  126. let (ret, rx) = oneshot::channel();
  127. tx.send(EditorCommand::ComposeRemoteOperation {
  128. client_operations: operations,
  129. ret,
  130. })
  131. .await
  132. .map_err(internal_error)?;
  133. let md5 = rx
  134. .await
  135. .map_err(|e| FlowyError::internal().context(format!("Compose operations failed: {}", e)))??;
  136. Ok(md5)
  137. })
  138. }
  139. fn transform_operations(
  140. &self,
  141. operations: DeltaDocumentResolveOperations,
  142. ) -> BoxResultFuture<TransformOperations<DeltaDocumentResolveOperations>, FlowyError> {
  143. let tx = self.edit_cmd_tx.clone();
  144. let operations = operations.into_inner();
  145. Box::pin(async move {
  146. let (ret, rx) = oneshot::channel::<CollaborateResult<TextTransformOperations>>();
  147. tx.send(EditorCommand::TransformOperations { operations, ret })
  148. .await
  149. .map_err(internal_error)?;
  150. let transformed_operations = rx
  151. .await
  152. .map_err(|e| FlowyError::internal().context(format!("Transform operations failed: {}", e)))??;
  153. Ok(transformed_operations)
  154. })
  155. }
  156. fn reset_operations(
  157. &self,
  158. operations: DeltaDocumentResolveOperations,
  159. ) -> BoxResultFuture<OperationsMD5, FlowyError> {
  160. let tx = self.edit_cmd_tx.clone();
  161. let operations = operations.into_inner();
  162. Box::pin(async move {
  163. let (ret, rx) = oneshot::channel();
  164. let _ = tx
  165. .send(EditorCommand::ResetOperations { operations, ret })
  166. .await
  167. .map_err(internal_error)?;
  168. let md5 = rx
  169. .await
  170. .map_err(|e| FlowyError::internal().context(format!("Reset operations failed: {}", e)))??;
  171. Ok(md5)
  172. })
  173. }
  174. }