web_socket.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. use crate::{
  2. core::{EditorCommand, TransformDeltas, SYNC_INTERVAL_IN_MILLIS},
  3. ws_receivers::DocumentWSReceiver,
  4. };
  5. use async_trait::async_trait;
  6. use bytes::Bytes;
  7. use flowy_collaboration::{
  8. entities::{
  9. revision::{RepeatedRevision, Revision, RevisionRange},
  10. ws::{ClientRevisionWSData, NewDocumentUser, ServerRevisionWSData, ServerRevisionWSDataType},
  11. },
  12. errors::CollaborateResult,
  13. };
  14. use flowy_error::{internal_error, FlowyError, FlowyResult};
  15. use flowy_sync::{
  16. RevisionManager,
  17. RevisionWSSinkDataProvider,
  18. RevisionWSSteamConsumer,
  19. RevisionWebSocket,
  20. RevisionWebSocketManager,
  21. };
  22. use lib_infra::future::FutureResult;
  23. use lib_ws::WSConnectState;
  24. use std::{collections::VecDeque, convert::TryFrom, sync::Arc, time::Duration};
  25. use tokio::sync::{
  26. broadcast,
  27. mpsc::{Receiver, Sender},
  28. oneshot,
  29. RwLock,
  30. };
  31. pub(crate) type EditorCommandSender = Sender<EditorCommand>;
  32. pub(crate) type EditorCommandReceiver = Receiver<EditorCommand>;
  33. pub(crate) async fn make_document_ws_manager(
  34. doc_id: String,
  35. user_id: String,
  36. edit_cmd_tx: EditorCommandSender,
  37. rev_manager: Arc<RevisionManager>,
  38. ws_conn: Arc<dyn RevisionWebSocket>,
  39. ) -> Arc<RevisionWebSocketManager> {
  40. let shared_sink = Arc::new(SharedWSSinkDataProvider::new(rev_manager.clone()));
  41. let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter {
  42. object_id: doc_id.clone(),
  43. edit_cmd_tx,
  44. rev_manager: rev_manager.clone(),
  45. shared_sink: shared_sink.clone(),
  46. });
  47. let data_provider = Arc::new(DocumentWSSinkDataProviderAdapter(shared_sink));
  48. let ping_duration = Duration::from_millis(SYNC_INTERVAL_IN_MILLIS);
  49. let ws_manager = Arc::new(RevisionWebSocketManager::new(
  50. &doc_id,
  51. ws_conn,
  52. data_provider,
  53. ws_stream_consumer,
  54. ping_duration,
  55. ));
  56. listen_document_ws_state(&user_id, &doc_id, ws_manager.scribe_state(), rev_manager);
  57. ws_manager
  58. }
  59. fn listen_document_ws_state(
  60. _user_id: &str,
  61. _doc_id: &str,
  62. mut subscriber: broadcast::Receiver<WSConnectState>,
  63. _rev_manager: Arc<RevisionManager>,
  64. ) {
  65. tokio::spawn(async move {
  66. while let Ok(state) = subscriber.recv().await {
  67. match state {
  68. WSConnectState::Init => {},
  69. WSConnectState::Connecting => {},
  70. WSConnectState::Connected => {},
  71. WSConnectState::Disconnected => {},
  72. }
  73. }
  74. });
  75. }
  76. pub(crate) struct DocumentWebSocketSteamConsumerAdapter {
  77. pub(crate) object_id: String,
  78. pub(crate) edit_cmd_tx: EditorCommandSender,
  79. pub(crate) rev_manager: Arc<RevisionManager>,
  80. pub(crate) shared_sink: Arc<SharedWSSinkDataProvider>,
  81. }
  82. impl RevisionWSSteamConsumer for DocumentWebSocketSteamConsumerAdapter {
  83. fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError> {
  84. let rev_manager = self.rev_manager.clone();
  85. let edit_cmd_tx = self.edit_cmd_tx.clone();
  86. let shared_sink = self.shared_sink.clone();
  87. let object_id = self.object_id.clone();
  88. FutureResult::new(async move {
  89. if let Some(server_composed_revision) = handle_remote_revision(edit_cmd_tx, rev_manager, bytes).await? {
  90. let data = ClientRevisionWSData::from_revisions(&object_id, vec![server_composed_revision]);
  91. shared_sink.push_back(data).await;
  92. }
  93. Ok(())
  94. })
  95. }
  96. fn receive_ack(&self, id: String, ty: ServerRevisionWSDataType) -> FutureResult<(), FlowyError> {
  97. let shared_sink = self.shared_sink.clone();
  98. FutureResult::new(async move { shared_sink.ack(id, ty).await })
  99. }
  100. fn receive_new_user_connect(&self, _new_user: NewDocumentUser) -> FutureResult<(), FlowyError> {
  101. // Do nothing by now, just a placeholder for future extension.
  102. FutureResult::new(async move { Ok(()) })
  103. }
  104. fn pull_revisions_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError> {
  105. let rev_manager = self.rev_manager.clone();
  106. let shared_sink = self.shared_sink.clone();
  107. let object_id = self.object_id.clone();
  108. FutureResult::new(async move {
  109. let revisions = rev_manager.get_revisions_in_range(range).await?;
  110. let data = ClientRevisionWSData::from_revisions(&object_id, revisions);
  111. shared_sink.push_back(data).await;
  112. Ok(())
  113. })
  114. }
  115. }
  116. pub(crate) struct DocumentWSSinkDataProviderAdapter(pub(crate) Arc<SharedWSSinkDataProvider>);
  117. impl RevisionWSSinkDataProvider for DocumentWSSinkDataProviderAdapter {
  118. fn next(&self) -> FutureResult<Option<ClientRevisionWSData>, FlowyError> {
  119. let shared_sink = self.0.clone();
  120. FutureResult::new(async move { shared_sink.next().await })
  121. }
  122. }
  123. async fn transform_pushed_revisions(
  124. revisions: Vec<Revision>,
  125. edit_cmd: &EditorCommandSender,
  126. ) -> FlowyResult<TransformDeltas> {
  127. let (ret, rx) = oneshot::channel::<CollaborateResult<TransformDeltas>>();
  128. let _ = edit_cmd.send(EditorCommand::TransformRevision { revisions, ret });
  129. Ok(rx.await.map_err(internal_error)??)
  130. }
  131. #[tracing::instrument(level = "debug", skip(edit_cmd_tx, rev_manager, bytes))]
  132. pub(crate) async fn handle_remote_revision(
  133. edit_cmd_tx: EditorCommandSender,
  134. rev_manager: Arc<RevisionManager>,
  135. bytes: Bytes,
  136. ) -> FlowyResult<Option<Revision>> {
  137. let mut revisions = RepeatedRevision::try_from(bytes)?.into_inner();
  138. if revisions.is_empty() {
  139. return Ok(None);
  140. }
  141. let first_revision = revisions.first().unwrap();
  142. if let Some(local_revision) = rev_manager.get_revision(first_revision.rev_id).await {
  143. if local_revision.md5 == first_revision.md5 {
  144. // The local revision is equal to the pushed revision. Just ignore it.
  145. revisions = revisions.split_off(1);
  146. if revisions.is_empty() {
  147. return Ok(None);
  148. }
  149. } else {
  150. return Ok(None);
  151. }
  152. }
  153. let TransformDeltas {
  154. client_prime,
  155. server_prime,
  156. } = transform_pushed_revisions(revisions.clone(), &edit_cmd_tx).await?;
  157. match server_prime {
  158. None => {
  159. // The server_prime is None means the client local revisions conflict with the
  160. // server, and it needs to override the client delta.
  161. let (ret, rx) = oneshot::channel();
  162. let _ = edit_cmd_tx.send(EditorCommand::OverrideDelta {
  163. revisions,
  164. delta: client_prime,
  165. ret,
  166. });
  167. let _ = rx.await.map_err(internal_error)??;
  168. Ok(None)
  169. },
  170. Some(server_prime) => {
  171. let (ret, rx) = oneshot::channel();
  172. let _ = edit_cmd_tx.send(EditorCommand::ComposeRemoteDelta {
  173. revisions,
  174. client_delta: client_prime,
  175. server_delta: server_prime,
  176. ret,
  177. });
  178. Ok(rx.await.map_err(internal_error)??)
  179. },
  180. }
  181. }
  182. #[derive(Clone)]
  183. enum SourceType {
  184. Shared,
  185. Revision,
  186. }
  187. #[derive(Clone)]
  188. pub(crate) struct SharedWSSinkDataProvider {
  189. shared: Arc<RwLock<VecDeque<ClientRevisionWSData>>>,
  190. rev_manager: Arc<RevisionManager>,
  191. source_ty: Arc<RwLock<SourceType>>,
  192. }
  193. impl SharedWSSinkDataProvider {
  194. pub(crate) fn new(rev_manager: Arc<RevisionManager>) -> Self {
  195. SharedWSSinkDataProvider {
  196. shared: Arc::new(RwLock::new(VecDeque::new())),
  197. rev_manager,
  198. source_ty: Arc::new(RwLock::new(SourceType::Shared)),
  199. }
  200. }
  201. #[allow(dead_code)]
  202. pub(crate) async fn push_front(&self, data: ClientRevisionWSData) { self.shared.write().await.push_front(data); }
  203. async fn push_back(&self, data: ClientRevisionWSData) { self.shared.write().await.push_back(data); }
  204. async fn next(&self) -> FlowyResult<Option<ClientRevisionWSData>> {
  205. let source_ty = self.source_ty.read().await.clone();
  206. match source_ty {
  207. SourceType::Shared => match self.shared.read().await.front() {
  208. None => {
  209. *self.source_ty.write().await = SourceType::Revision;
  210. Ok(None)
  211. },
  212. Some(data) => {
  213. tracing::debug!("[SharedWSSinkDataProvider]: {}:{:?}", data.object_id, data.ty);
  214. Ok(Some(data.clone()))
  215. },
  216. },
  217. SourceType::Revision => {
  218. if !self.shared.read().await.is_empty() {
  219. *self.source_ty.write().await = SourceType::Shared;
  220. return Ok(None);
  221. }
  222. match self.rev_manager.next_sync_revision().await? {
  223. Some(rev) => {
  224. let doc_id = rev.object_id.clone();
  225. Ok(Some(ClientRevisionWSData::from_revisions(&doc_id, vec![rev])))
  226. },
  227. None => {
  228. //
  229. let doc_id = self.rev_manager.object_id.clone();
  230. let latest_rev_id = self.rev_manager.rev_id();
  231. Ok(Some(ClientRevisionWSData::ping(&doc_id, latest_rev_id)))
  232. },
  233. }
  234. },
  235. }
  236. }
  237. async fn ack(&self, id: String, _ty: ServerRevisionWSDataType) -> FlowyResult<()> {
  238. // let _ = self.rev_manager.ack_revision(id).await?;
  239. let source_ty = self.source_ty.read().await.clone();
  240. match source_ty {
  241. SourceType::Shared => {
  242. let should_pop = match self.shared.read().await.front() {
  243. None => false,
  244. Some(val) => {
  245. let expected_id = val.id();
  246. if expected_id == id {
  247. true
  248. } else {
  249. tracing::error!("The front element's {} is not equal to the {}", expected_id, id);
  250. false
  251. }
  252. },
  253. };
  254. if should_pop {
  255. let _ = self.shared.write().await.pop_front();
  256. }
  257. },
  258. SourceType::Revision => {
  259. match id.parse::<i64>() {
  260. Ok(rev_id) => {
  261. let _ = self.rev_manager.ack_revision(rev_id).await?;
  262. },
  263. Err(e) => {
  264. tracing::error!("Parse rev_id from {} failed. {}", id, e);
  265. },
  266. };
  267. },
  268. }
  269. Ok(())
  270. }
  271. }
  272. // RevisionWebSocketManager registers itself as a DocumentWSReceiver for each
  273. // opened document.
  274. #[async_trait]
  275. impl DocumentWSReceiver for RevisionWebSocketManager {
  276. #[tracing::instrument(level = "debug", skip(self, data), err)]
  277. async fn receive_ws_data(&self, data: ServerRevisionWSData) -> Result<(), FlowyError> {
  278. let _ = self.ws_passthrough_tx.send(data).await.map_err(|e| {
  279. let err_msg = format!("{} passthrough error: {}", self.object_id, e);
  280. FlowyError::internal().context(err_msg)
  281. })?;
  282. Ok(())
  283. }
  284. fn connect_state_changed(&self, state: WSConnectState) {
  285. match self.state_passthrough_tx.send(state) {
  286. Ok(_) => {},
  287. Err(e) => tracing::error!("{}", e),
  288. }
  289. }
  290. }