web_socket.rs 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. use crate::old_editor::queue::{EditorCommand, EditorCommandSender, TextTransformOperations};
  2. use crate::TEXT_BLOCK_SYNC_INTERVAL_IN_MILLIS;
  3. use bytes::Bytes;
  4. use flowy_database::ConnectionPool;
  5. use flowy_error::{internal_error, FlowyError, FlowyResult};
  6. use flowy_http_model::{
  7. revision::{Revision, RevisionRange},
  8. ws_data::{ClientRevisionWSData, NewDocumentUser, ServerRevisionWSDataType},
  9. };
  10. use flowy_revision::*;
  11. use flowy_sync::errors::CollaborateResult;
  12. use flowy_sync::util::make_operations_from_revisions;
  13. use lib_infra::future::{BoxResultFuture, FutureResult};
  14. use lib_ot::text_delta::DeltaTextOperations;
  15. use lib_ws::WSConnectState;
  16. use std::{sync::Arc, time::Duration};
  17. use tokio::sync::{broadcast, oneshot};
  18. #[derive(Clone)]
  19. pub struct DeltaDocumentResolveOperations(pub DeltaTextOperations);
  20. impl OperationsDeserializer<DeltaDocumentResolveOperations> for DeltaDocumentResolveOperations {
  21. fn deserialize_revisions(revisions: Vec<Revision>) -> FlowyResult<DeltaDocumentResolveOperations> {
  22. Ok(DeltaDocumentResolveOperations(make_operations_from_revisions(
  23. revisions,
  24. )?))
  25. }
  26. }
  27. impl OperationsSerializer for DeltaDocumentResolveOperations {
  28. fn serialize_operations(&self) -> Bytes {
  29. self.0.json_bytes()
  30. }
  31. }
  32. impl DeltaDocumentResolveOperations {
  33. pub fn into_inner(self) -> DeltaTextOperations {
  34. self.0
  35. }
  36. }
  37. pub type DocumentConflictController = ConflictController<DeltaDocumentResolveOperations, Arc<ConnectionPool>>;
  38. #[allow(dead_code)]
  39. pub(crate) async fn make_document_ws_manager(
  40. doc_id: String,
  41. user_id: String,
  42. edit_cmd_tx: EditorCommandSender,
  43. rev_manager: Arc<RevisionManager<Arc<ConnectionPool>>>,
  44. rev_web_socket: Arc<dyn RevisionWebSocket>,
  45. ) -> Arc<RevisionWebSocketManager> {
  46. let ws_data_provider = Arc::new(WSDataProvider::new(&doc_id, Arc::new(rev_manager.clone())));
  47. let resolver = Arc::new(DocumentConflictResolver { edit_cmd_tx });
  48. let conflict_controller =
  49. DocumentConflictController::new(&user_id, resolver, Arc::new(ws_data_provider.clone()), rev_manager);
  50. let ws_data_stream = Arc::new(DocumentRevisionWSDataStream::new(conflict_controller));
  51. let ws_data_sink = Arc::new(DocumentWSDataSink(ws_data_provider));
  52. let ping_duration = Duration::from_millis(TEXT_BLOCK_SYNC_INTERVAL_IN_MILLIS);
  53. let ws_manager = Arc::new(RevisionWebSocketManager::new(
  54. "Block",
  55. &doc_id,
  56. rev_web_socket,
  57. ws_data_sink,
  58. ws_data_stream,
  59. ping_duration,
  60. ));
  61. listen_document_ws_state(&user_id, &doc_id, ws_manager.scribe_state());
  62. ws_manager
  63. }
  64. #[allow(dead_code)]
  65. fn listen_document_ws_state(_user_id: &str, _doc_id: &str, mut subscriber: broadcast::Receiver<WSConnectState>) {
  66. tokio::spawn(async move {
  67. while let Ok(state) = subscriber.recv().await {
  68. match state {
  69. WSConnectState::Init => {}
  70. WSConnectState::Connecting => {}
  71. WSConnectState::Connected => {}
  72. WSConnectState::Disconnected => {}
  73. }
  74. }
  75. });
  76. }
  77. pub(crate) struct DocumentRevisionWSDataStream {
  78. conflict_controller: Arc<DocumentConflictController>,
  79. }
  80. impl DocumentRevisionWSDataStream {
  81. #[allow(dead_code)]
  82. pub fn new(conflict_controller: DocumentConflictController) -> Self {
  83. Self {
  84. conflict_controller: Arc::new(conflict_controller),
  85. }
  86. }
  87. }
  88. impl RevisionWSDataStream for DocumentRevisionWSDataStream {
  89. fn receive_push_revision(&self, bytes: Bytes) -> BoxResultFuture<(), FlowyError> {
  90. let resolver = self.conflict_controller.clone();
  91. Box::pin(async move { resolver.receive_bytes(bytes).await })
  92. }
  93. fn receive_ack(&self, id: String, ty: ServerRevisionWSDataType) -> BoxResultFuture<(), FlowyError> {
  94. let resolver = self.conflict_controller.clone();
  95. Box::pin(async move { resolver.ack_revision(id, ty).await })
  96. }
  97. fn receive_new_user_connect(&self, _new_user: NewDocumentUser) -> BoxResultFuture<(), FlowyError> {
  98. // Do nothing by now, just a placeholder for future extension.
  99. Box::pin(async move { Ok(()) })
  100. }
  101. fn pull_revisions_in_range(&self, range: RevisionRange) -> BoxResultFuture<(), FlowyError> {
  102. let resolver = self.conflict_controller.clone();
  103. Box::pin(async move { resolver.send_revisions(range).await })
  104. }
  105. }
  106. pub(crate) struct DocumentWSDataSink(pub(crate) Arc<WSDataProvider>);
  107. impl RevisionWebSocketSink for DocumentWSDataSink {
  108. fn next(&self) -> FutureResult<Option<ClientRevisionWSData>, FlowyError> {
  109. let sink_provider = self.0.clone();
  110. FutureResult::new(async move { sink_provider.next().await })
  111. }
  112. }
  113. struct DocumentConflictResolver {
  114. edit_cmd_tx: EditorCommandSender,
  115. }
  116. impl ConflictResolver<DeltaDocumentResolveOperations> for DocumentConflictResolver {
  117. fn compose_operations(
  118. &self,
  119. operations: DeltaDocumentResolveOperations,
  120. ) -> BoxResultFuture<RevisionMD5, FlowyError> {
  121. let tx = self.edit_cmd_tx.clone();
  122. let operations = operations.into_inner();
  123. Box::pin(async move {
  124. let (ret, rx) = oneshot::channel();
  125. tx.send(EditorCommand::ComposeRemoteOperation {
  126. client_operations: operations,
  127. ret,
  128. })
  129. .await
  130. .map_err(internal_error)?;
  131. let md5 = rx
  132. .await
  133. .map_err(|e| FlowyError::internal().context(format!("Compose operations failed: {}", e)))??;
  134. Ok(md5)
  135. })
  136. }
  137. fn transform_operations(
  138. &self,
  139. operations: DeltaDocumentResolveOperations,
  140. ) -> BoxResultFuture<TransformOperations<DeltaDocumentResolveOperations>, FlowyError> {
  141. let tx = self.edit_cmd_tx.clone();
  142. let operations = operations.into_inner();
  143. Box::pin(async move {
  144. let (ret, rx) = oneshot::channel::<CollaborateResult<TextTransformOperations>>();
  145. tx.send(EditorCommand::TransformOperations { operations, ret })
  146. .await
  147. .map_err(internal_error)?;
  148. let transformed_operations = rx
  149. .await
  150. .map_err(|e| FlowyError::internal().context(format!("Transform operations failed: {}", e)))??;
  151. Ok(transformed_operations)
  152. })
  153. }
  154. fn reset_operations(&self, operations: DeltaDocumentResolveOperations) -> BoxResultFuture<RevisionMD5, FlowyError> {
  155. let tx = self.edit_cmd_tx.clone();
  156. let operations = operations.into_inner();
  157. Box::pin(async move {
  158. let (ret, rx) = oneshot::channel();
  159. let _ = tx
  160. .send(EditorCommand::ResetOperations { operations, ret })
  161. .await
  162. .map_err(internal_error)?;
  163. let md5 = rx
  164. .await
  165. .map_err(|e| FlowyError::internal().context(format!("Reset operations failed: {}", e)))??;
  166. Ok(md5)
  167. })
  168. }
  169. }