web_socket.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. use crate::services::doc::{
  2. web_socket::{
  3. local_ws_impl::LocalWebSocketManager,
  4. DocumentWSSinkDataProvider,
  5. DocumentWSSteamConsumer,
  6. HttpWebSocketManager,
  7. },
  8. DocumentMD5,
  9. DocumentWSReceiver,
  10. DocumentWebSocket,
  11. EditorCommand,
  12. RevisionManager,
  13. TransformDeltas,
  14. };
  15. use bytes::Bytes;
  16. use flowy_collaboration::{
  17. entities::{
  18. revision::{RepeatedRevision, RevType, Revision, RevisionRange},
  19. ws::{DocumentClientWSData, DocumentClientWSDataType, DocumentServerWSDataBuilder, NewDocumentUser},
  20. },
  21. errors::CollaborateResult,
  22. };
  23. use flowy_error::{internal_error, FlowyError, FlowyResult};
  24. use lib_infra::future::FutureResult;
  25. use flowy_collaboration::entities::ws::DocumentServerWSDataType;
  26. use lib_ws::WSConnectState;
  27. use std::{
  28. collections::VecDeque,
  29. convert::{TryFrom, TryInto},
  30. sync::Arc,
  31. };
  32. use tokio::sync::{broadcast, mpsc::UnboundedSender, oneshot, RwLock};
  33. pub(crate) trait DocumentWebSocketManager: Send + Sync {
  34. fn stop(&self);
  35. fn receiver(&self) -> Arc<dyn DocumentWSReceiver>;
  36. }
  37. pub(crate) async fn make_document_ws_manager(
  38. doc_id: String,
  39. user_id: String,
  40. editor_edit_queue: UnboundedSender<EditorCommand>,
  41. rev_manager: Arc<RevisionManager>,
  42. ws: Arc<dyn DocumentWebSocket>,
  43. ) -> Arc<dyn DocumentWebSocketManager> {
  44. if cfg!(feature = "http_server") {
  45. let shared_sink = Arc::new(SharedWSSinkDataProvider::new(rev_manager.clone()));
  46. let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter {
  47. doc_id: doc_id.clone(),
  48. user_id: user_id.clone(),
  49. editor_edit_queue: editor_edit_queue.clone(),
  50. rev_manager: rev_manager.clone(),
  51. shared_sink: shared_sink.clone(),
  52. });
  53. let ws_stream_provider = DocumentWSSinkDataProviderAdapter(shared_sink.clone());
  54. let ws_manager = Arc::new(HttpWebSocketManager::new(
  55. &doc_id,
  56. ws.clone(),
  57. Arc::new(ws_stream_provider),
  58. ws_stream_consumer,
  59. ));
  60. notify_user_has_connected(&user_id, &doc_id, rev_manager.clone(), shared_sink).await;
  61. listen_document_ws_state(&user_id, &doc_id, ws_manager.scribe_state(), rev_manager.clone());
  62. Arc::new(ws_manager)
  63. } else {
  64. Arc::new(Arc::new(LocalWebSocketManager {}))
  65. }
  66. }
  67. async fn notify_user_has_connected(
  68. user_id: &str,
  69. doc_id: &str,
  70. rev_manager: Arc<RevisionManager>,
  71. shared_sink: Arc<SharedWSSinkDataProvider>,
  72. ) {
  73. // let need_notify = match shared_sink.front().await {
  74. // None => true,
  75. // Some(data) => data.ty != DocumentClientWSDataType::UserConnect,
  76. // };
  77. //
  78. // if need_notify {
  79. // let revision_data: Bytes =
  80. // rev_manager.latest_revision().await.try_into().unwrap();
  81. // let new_connect = NewDocumentUser {
  82. // user_id: user_id.to_owned(),
  83. // doc_id: doc_id.to_owned(),
  84. // revision_data: revision_data.to_vec(),
  85. // };
  86. //
  87. // let data =
  88. // DocumentWSDataBuilder::build_new_document_user_message(doc_id,
  89. // new_connect); shared_sink.push_front(data).await;
  90. // }
  91. }
  92. fn listen_document_ws_state(
  93. _user_id: &str,
  94. _doc_id: &str,
  95. mut subscriber: broadcast::Receiver<WSConnectState>,
  96. _rev_manager: Arc<RevisionManager>,
  97. ) {
  98. tokio::spawn(async move {
  99. while let Ok(state) = subscriber.recv().await {
  100. match state {
  101. WSConnectState::Init => {},
  102. WSConnectState::Connecting => {},
  103. WSConnectState::Connected => {},
  104. WSConnectState::Disconnected => {},
  105. }
  106. }
  107. });
  108. }
  109. pub(crate) struct DocumentWebSocketSteamConsumerAdapter {
  110. pub(crate) doc_id: String,
  111. pub(crate) user_id: String,
  112. pub(crate) editor_edit_queue: UnboundedSender<EditorCommand>,
  113. pub(crate) rev_manager: Arc<RevisionManager>,
  114. pub(crate) shared_sink: Arc<SharedWSSinkDataProvider>,
  115. }
  116. impl DocumentWSSteamConsumer for DocumentWebSocketSteamConsumerAdapter {
  117. fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError> {
  118. let user_id = self.user_id.clone();
  119. let rev_manager = self.rev_manager.clone();
  120. let edit_cmd_tx = self.editor_edit_queue.clone();
  121. let shared_sink = self.shared_sink.clone();
  122. let doc_id = self.doc_id.clone();
  123. FutureResult::new(async move {
  124. if let Some(server_composed_revision) =
  125. handle_push_rev(&doc_id, &user_id, edit_cmd_tx, rev_manager, bytes).await?
  126. {
  127. shared_sink.push_back(server_composed_revision.into()).await;
  128. }
  129. Ok(())
  130. })
  131. }
  132. fn receive_ack(&self, id: String, ty: DocumentServerWSDataType) -> FutureResult<(), FlowyError> {
  133. let shared_sink = self.shared_sink.clone();
  134. FutureResult::new(async move { shared_sink.ack(id, ty).await })
  135. }
  136. fn receive_new_user_connect(&self, _new_user: NewDocumentUser) -> FutureResult<(), FlowyError> {
  137. // the _new_user will be used later
  138. FutureResult::new(async move { Ok(()) })
  139. }
  140. fn pull_revisions_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError> {
  141. let rev_manager = self.rev_manager.clone();
  142. let shared_sink = self.shared_sink.clone();
  143. FutureResult::new(async move {
  144. let data = rev_manager
  145. .get_revisions_in_range(range)
  146. .await?
  147. .into_iter()
  148. .map(|revision| revision.into())
  149. .collect::<Vec<DocumentClientWSData>>();
  150. shared_sink.append(data).await;
  151. Ok(())
  152. })
  153. }
  154. }
  155. pub(crate) struct DocumentWSSinkDataProviderAdapter(pub(crate) Arc<SharedWSSinkDataProvider>);
  156. impl DocumentWSSinkDataProvider for DocumentWSSinkDataProviderAdapter {
  157. fn next(&self) -> FutureResult<Option<DocumentClientWSData>, FlowyError> {
  158. let shared_sink = self.0.clone();
  159. FutureResult::new(async move { shared_sink.next().await })
  160. }
  161. }
  162. #[tracing::instrument(level = "debug", skip(edit_cmd_tx, rev_manager, bytes))]
  163. pub(crate) async fn handle_push_rev(
  164. doc_id: &str,
  165. user_id: &str,
  166. edit_cmd_tx: UnboundedSender<EditorCommand>,
  167. rev_manager: Arc<RevisionManager>,
  168. bytes: Bytes,
  169. ) -> FlowyResult<Option<Revision>> {
  170. // Transform the revision
  171. let (ret, rx) = oneshot::channel::<CollaborateResult<TransformDeltas>>();
  172. let mut revisions = RepeatedRevision::try_from(bytes)?.into_inner();
  173. if revisions.is_empty() {
  174. return Ok(None);
  175. }
  176. let first_revision = revisions.first().unwrap();
  177. if let Some(local_revision) = rev_manager.get_revision(first_revision.rev_id).await {
  178. if local_revision.md5 != first_revision.md5 {
  179. // The local revision is equal to the pushed revision. Just ignore it.
  180. return Ok(None);
  181. }
  182. }
  183. let revisions = revisions.split_off(1);
  184. if revisions.is_empty() {
  185. return Ok(None);
  186. }
  187. let _ = edit_cmd_tx.send(EditorCommand::ProcessRemoteRevision {
  188. revisions: revisions.clone(),
  189. ret,
  190. });
  191. let TransformDeltas {
  192. client_prime,
  193. server_prime,
  194. } = rx.await.map_err(internal_error)??;
  195. for revision in &revisions {
  196. let _ = rev_manager.add_remote_revision(revision).await?;
  197. }
  198. // compose delta
  199. let (ret, rx) = oneshot::channel::<CollaborateResult<DocumentMD5>>();
  200. let _ = edit_cmd_tx.send(EditorCommand::ComposeDelta {
  201. delta: client_prime.clone(),
  202. ret,
  203. });
  204. let md5 = rx.await.map_err(internal_error)??;
  205. let (local_base_rev_id, local_rev_id) = rev_manager.next_rev_id();
  206. // save the revision
  207. let revision = Revision::new(
  208. &doc_id,
  209. local_base_rev_id,
  210. local_rev_id,
  211. client_prime.to_bytes(),
  212. RevType::Remote,
  213. &user_id,
  214. md5.clone(),
  215. );
  216. let _ = rev_manager.add_remote_revision(&revision).await?;
  217. // send the server_prime delta
  218. Ok(Some(Revision::new(
  219. &doc_id,
  220. local_base_rev_id,
  221. local_rev_id,
  222. server_prime.to_bytes(),
  223. RevType::Local,
  224. &user_id,
  225. md5,
  226. )))
  227. }
  228. #[derive(Clone)]
  229. enum SourceType {
  230. Shared,
  231. Revision,
  232. }
  233. #[derive(Clone)]
  234. pub(crate) struct SharedWSSinkDataProvider {
  235. shared: Arc<RwLock<VecDeque<DocumentClientWSData>>>,
  236. rev_manager: Arc<RevisionManager>,
  237. source_ty: Arc<RwLock<SourceType>>,
  238. }
  239. impl SharedWSSinkDataProvider {
  240. pub(crate) fn new(rev_manager: Arc<RevisionManager>) -> Self {
  241. SharedWSSinkDataProvider {
  242. shared: Arc::new(RwLock::new(VecDeque::new())),
  243. rev_manager,
  244. source_ty: Arc::new(RwLock::new(SourceType::Shared)),
  245. }
  246. }
  247. // TODO: return Option<&DocumentWSData> would be better
  248. pub(crate) async fn front(&self) -> Option<DocumentClientWSData> { self.shared.read().await.front().cloned() }
  249. pub(crate) async fn push_front(&self, data: DocumentClientWSData) { self.shared.write().await.push_front(data); }
  250. async fn push_back(&self, data: DocumentClientWSData) { self.shared.write().await.push_back(data); }
  251. async fn append(&self, data: Vec<DocumentClientWSData>) {
  252. let mut buf: VecDeque<_> = data.into_iter().collect();
  253. self.shared.write().await.append(&mut buf);
  254. }
  255. async fn next(&self) -> FlowyResult<Option<DocumentClientWSData>> {
  256. let source_ty = self.source_ty.read().await.clone();
  257. match source_ty {
  258. SourceType::Shared => match self.shared.read().await.front() {
  259. None => {
  260. *self.source_ty.write().await = SourceType::Revision;
  261. Ok(None)
  262. },
  263. Some(data) => {
  264. tracing::debug!("[DocumentSinkDataProvider]: {}:{:?}", data.doc_id, data.ty);
  265. Ok(Some(data.clone()))
  266. },
  267. },
  268. SourceType::Revision => {
  269. if !self.shared.read().await.is_empty() {
  270. *self.source_ty.write().await = SourceType::Shared;
  271. return Ok(None);
  272. }
  273. match self.rev_manager.next_sync_revision().await? {
  274. Some(rev) => {
  275. tracing::debug!("[DocumentSinkDataProvider]: {}:{:?}", rev.doc_id, rev.rev_id);
  276. Ok(Some(rev.into()))
  277. },
  278. None => Ok(None),
  279. }
  280. },
  281. }
  282. }
  283. async fn ack(&self, id: String, _ty: DocumentServerWSDataType) -> FlowyResult<()> {
  284. // let _ = self.rev_manager.ack_revision(id).await?;
  285. let source_ty = self.source_ty.read().await.clone();
  286. match source_ty {
  287. SourceType::Shared => {
  288. let should_pop = match self.shared.read().await.front() {
  289. None => false,
  290. Some(val) => {
  291. if val.id == id {
  292. true
  293. } else {
  294. tracing::error!("The front element's {} is not equal to the {}", val.id, id);
  295. false
  296. }
  297. },
  298. };
  299. if should_pop {
  300. let _ = self.shared.write().await.pop_front();
  301. }
  302. },
  303. SourceType::Revision => {
  304. match id.parse::<i64>() {
  305. Ok(rev_id) => {
  306. let _ = self.rev_manager.ack_revision(rev_id).await?;
  307. },
  308. Err(e) => {
  309. tracing::error!("Parse rev_id from {} failed. {}", id, e);
  310. },
  311. };
  312. },
  313. }
  314. Ok(())
  315. }
  316. }