web_socket.rs 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. use crate::old_editor::queue::{EditorCommand, EditorCommandSender, TextTransformOperations};
  2. use crate::TEXT_BLOCK_SYNC_INTERVAL_IN_MILLIS;
  3. use bytes::Bytes;
  4. use flowy_client_sync::errors::SyncResult;
  5. use flowy_client_sync::make_operations_from_revisions;
  6. use flowy_error::{internal_error, FlowyError, FlowyResult};
  7. use flowy_revision::*;
  8. use flowy_sqlite::ConnectionPool;
  9. use lib_infra::future::{BoxResultFuture, FutureResult};
  10. use lib_ot::text_delta::DeltaTextOperations;
  11. use lib_ws::WSConnectState;
  12. use revision_model::{Revision, RevisionRange};
  13. use std::{sync::Arc, time::Duration};
  14. use tokio::sync::{broadcast, oneshot};
  15. use ws_model::ws_revision::{ClientRevisionWSData, NewDocumentUser};
  16. #[derive(Clone)]
  17. pub struct DeltaDocumentResolveOperations(pub DeltaTextOperations);
  18. impl OperationsDeserializer<DeltaDocumentResolveOperations> for DeltaDocumentResolveOperations {
  19. fn deserialize_revisions(
  20. revisions: Vec<Revision>,
  21. ) -> FlowyResult<DeltaDocumentResolveOperations> {
  22. Ok(DeltaDocumentResolveOperations(
  23. make_operations_from_revisions(revisions)?,
  24. ))
  25. }
  26. }
  27. impl OperationsSerializer for DeltaDocumentResolveOperations {
  28. fn serialize_operations(&self) -> Bytes {
  29. self.0.json_bytes()
  30. }
  31. }
  32. impl DeltaDocumentResolveOperations {
  33. pub fn into_inner(self) -> DeltaTextOperations {
  34. self.0
  35. }
  36. }
  37. pub type DocumentConflictController =
  38. ConflictController<DeltaDocumentResolveOperations, Arc<ConnectionPool>>;
  39. #[allow(dead_code)]
  40. pub(crate) async fn make_document_ws_manager(
  41. doc_id: String,
  42. user_id: String,
  43. edit_cmd_tx: EditorCommandSender,
  44. rev_manager: Arc<RevisionManager<Arc<ConnectionPool>>>,
  45. rev_web_socket: Arc<dyn RevisionWebSocket>,
  46. ) -> Arc<RevisionWebSocketManager> {
  47. let ws_data_provider = Arc::new(WSDataProvider::new(&doc_id, Arc::new(rev_manager.clone())));
  48. let resolver = Arc::new(DocumentConflictResolver { edit_cmd_tx });
  49. let conflict_controller = DocumentConflictController::new(
  50. &user_id,
  51. resolver,
  52. Arc::new(ws_data_provider.clone()),
  53. rev_manager,
  54. );
  55. let ws_data_stream = Arc::new(DocumentRevisionWSDataStream::new(conflict_controller));
  56. let ws_data_sink = Arc::new(DocumentWSDataSink(ws_data_provider));
  57. let ping_duration = Duration::from_millis(TEXT_BLOCK_SYNC_INTERVAL_IN_MILLIS);
  58. let ws_manager = Arc::new(RevisionWebSocketManager::new(
  59. "Block",
  60. &doc_id,
  61. rev_web_socket,
  62. ws_data_sink,
  63. ws_data_stream,
  64. ping_duration,
  65. ));
  66. listen_document_ws_state(&user_id, &doc_id, ws_manager.scribe_state());
  67. ws_manager
  68. }
  69. #[allow(dead_code)]
  70. fn listen_document_ws_state(
  71. _user_id: &str,
  72. _doc_id: &str,
  73. mut subscriber: broadcast::Receiver<WSConnectState>,
  74. ) {
  75. tokio::spawn(async move {
  76. while let Ok(state) = subscriber.recv().await {
  77. match state {
  78. WSConnectState::Init => {},
  79. WSConnectState::Connecting => {},
  80. WSConnectState::Connected => {},
  81. WSConnectState::Disconnected => {},
  82. }
  83. }
  84. });
  85. }
  86. pub(crate) struct DocumentRevisionWSDataStream {
  87. conflict_controller: Arc<DocumentConflictController>,
  88. }
  89. impl DocumentRevisionWSDataStream {
  90. #[allow(dead_code)]
  91. pub fn new(conflict_controller: DocumentConflictController) -> Self {
  92. Self {
  93. conflict_controller: Arc::new(conflict_controller),
  94. }
  95. }
  96. }
  97. impl RevisionWSDataStream for DocumentRevisionWSDataStream {
  98. fn receive_push_revision(&self, revisions: Vec<Revision>) -> BoxResultFuture<(), FlowyError> {
  99. let resolver = self.conflict_controller.clone();
  100. Box::pin(async move { resolver.receive_revisions(revisions).await })
  101. }
  102. fn receive_ack(&self, rev_id: i64) -> BoxResultFuture<(), FlowyError> {
  103. let resolver = self.conflict_controller.clone();
  104. Box::pin(async move { resolver.ack_revision(rev_id).await })
  105. }
  106. fn receive_new_user_connect(
  107. &self,
  108. _new_user: NewDocumentUser,
  109. ) -> BoxResultFuture<(), FlowyError> {
  110. // Do nothing by now, just a placeholder for future extension.
  111. Box::pin(async move { Ok(()) })
  112. }
  113. fn pull_revisions_in_range(&self, range: RevisionRange) -> BoxResultFuture<(), FlowyError> {
  114. let resolver = self.conflict_controller.clone();
  115. Box::pin(async move { resolver.send_revisions(range).await })
  116. }
  117. }
  118. pub(crate) struct DocumentWSDataSink(pub(crate) Arc<WSDataProvider>);
  119. impl RevisionWebSocketSink for DocumentWSDataSink {
  120. fn next(&self) -> FutureResult<Option<ClientRevisionWSData>, FlowyError> {
  121. let sink_provider = self.0.clone();
  122. FutureResult::new(async move { sink_provider.next().await })
  123. }
  124. }
  125. struct DocumentConflictResolver {
  126. edit_cmd_tx: EditorCommandSender,
  127. }
  128. impl ConflictResolver<DeltaDocumentResolveOperations> for DocumentConflictResolver {
  129. fn compose_operations(
  130. &self,
  131. operations: DeltaDocumentResolveOperations,
  132. ) -> BoxResultFuture<RevisionMD5, FlowyError> {
  133. let tx = self.edit_cmd_tx.clone();
  134. let operations = operations.into_inner();
  135. Box::pin(async move {
  136. let (ret, rx) = oneshot::channel();
  137. tx.send(EditorCommand::ComposeRemoteOperation {
  138. client_operations: operations,
  139. ret,
  140. })
  141. .await
  142. .map_err(internal_error)?;
  143. let md5 = rx.await.map_err(|e| {
  144. FlowyError::internal().context(format!("Compose operations failed: {}", e))
  145. })??;
  146. Ok(md5)
  147. })
  148. }
  149. fn transform_operations(
  150. &self,
  151. operations: DeltaDocumentResolveOperations,
  152. ) -> BoxResultFuture<TransformOperations<DeltaDocumentResolveOperations>, FlowyError> {
  153. let tx = self.edit_cmd_tx.clone();
  154. let operations = operations.into_inner();
  155. Box::pin(async move {
  156. let (ret, rx) = oneshot::channel::<SyncResult<TextTransformOperations>>();
  157. tx.send(EditorCommand::TransformOperations { operations, ret })
  158. .await
  159. .map_err(internal_error)?;
  160. let transformed_operations = rx.await.map_err(|e| {
  161. FlowyError::internal().context(format!("Transform operations failed: {}", e))
  162. })??;
  163. Ok(transformed_operations)
  164. })
  165. }
  166. fn reset_operations(
  167. &self,
  168. operations: DeltaDocumentResolveOperations,
  169. ) -> BoxResultFuture<RevisionMD5, FlowyError> {
  170. let tx = self.edit_cmd_tx.clone();
  171. let operations = operations.into_inner();
  172. Box::pin(async move {
  173. let (ret, rx) = oneshot::channel();
  174. tx.send(EditorCommand::ResetOperations { operations, ret })
  175. .await
  176. .map_err(internal_error)?;
  177. let md5 = rx
  178. .await
  179. .map_err(|e| FlowyError::internal().context(format!("Reset operations failed: {}", e)))??;
  180. Ok(md5)
  181. })
  182. }
  183. }