ws_actor.rs 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. use crate::{
  2. services::{
  3. document::persistence::update_doc,
  4. web_socket::{entities::Socket, WSClientData, WSMessageAdaptor, WSUser},
  5. },
  6. util::serde_ext::{md5, parse_from_bytes},
  7. };
  8. use actix_rt::task::spawn_blocking;
  9. use async_stream::stream;
  10. use backend_service::errors::{internal_error, Result, ServerError};
  11. use flowy_collaboration::{
  12. core::sync::{RevisionUser, ServerDocumentManager, SyncResponse},
  13. protobuf::{DocumentWSData, DocumentWSDataType, NewDocumentUser, Revision, UpdateDocParams},
  14. };
  15. use futures::stream::StreamExt;
  16. use sqlx::PgPool;
  17. use std::{convert::TryInto, sync::Arc};
  18. use tokio::sync::{mpsc, oneshot};
  19. pub enum WSActorMessage {
  20. ClientData {
  21. client_data: WSClientData,
  22. pool: PgPool,
  23. ret: oneshot::Sender<Result<()>>,
  24. },
  25. }
  26. pub struct DocumentWebSocketActor {
  27. receiver: Option<mpsc::Receiver<WSActorMessage>>,
  28. doc_manager: Arc<ServerDocumentManager>,
  29. }
  30. impl DocumentWebSocketActor {
  31. pub fn new(receiver: mpsc::Receiver<WSActorMessage>, manager: Arc<ServerDocumentManager>) -> Self {
  32. Self {
  33. receiver: Some(receiver),
  34. doc_manager: manager,
  35. }
  36. }
  37. pub async fn run(mut self) {
  38. let mut receiver = self
  39. .receiver
  40. .take()
  41. .expect("DocActor's receiver should only take one time");
  42. let stream = stream! {
  43. loop {
  44. match receiver.recv().await {
  45. Some(msg) => yield msg,
  46. None => break,
  47. }
  48. }
  49. };
  50. stream.for_each(|msg| self.handle_message(msg)).await;
  51. }
  52. async fn handle_message(&self, msg: WSActorMessage) {
  53. match msg {
  54. WSActorMessage::ClientData { client_data, pool, ret } => {
  55. let _ = ret.send(self.handle_client_data(client_data, pool).await);
  56. },
  57. }
  58. }
  59. async fn handle_client_data(&self, client_data: WSClientData, pg_pool: PgPool) -> Result<()> {
  60. let WSClientData { user, socket, data } = client_data;
  61. let document_data = spawn_blocking(move || {
  62. let document_data: DocumentWSData = parse_from_bytes(&data)?;
  63. Result::Ok(document_data)
  64. })
  65. .await
  66. .map_err(internal_error)??;
  67. tracing::debug!(
  68. "[HTTP_SERVER_WS]: receive client data: {}:{}, {:?}",
  69. document_data.doc_id,
  70. document_data.id,
  71. document_data.ty
  72. );
  73. let user = Arc::new(ServerDocUser { user, socket, pg_pool });
  74. let result = match &document_data.ty {
  75. DocumentWSDataType::Ack => Ok(()),
  76. DocumentWSDataType::PushRev => self.handle_pushed_rev(user, document_data.data).await,
  77. DocumentWSDataType::PullRev => Ok(()),
  78. DocumentWSDataType::UserConnect => self.handle_user_connect(user, document_data).await,
  79. };
  80. match result {
  81. Ok(_) => {},
  82. Err(e) => {
  83. tracing::error!("[HTTP_SERVER_WS]: process client data error {:?}", e);
  84. },
  85. }
  86. Ok(())
  87. }
  88. async fn handle_user_connect(&self, user: Arc<ServerDocUser>, document_data: DocumentWSData) -> Result<()> {
  89. let mut new_user = spawn_blocking(move || parse_from_bytes::<NewDocumentUser>(&document_data.data))
  90. .await
  91. .map_err(internal_error)??;
  92. let revision_pb = spawn_blocking(move || parse_from_bytes::<Revision>(&new_user.take_revision_data()))
  93. .await
  94. .map_err(internal_error)??;
  95. let _ = self.handle_revision(user, revision_pb).await?;
  96. Ok(())
  97. }
  98. async fn handle_pushed_rev(&self, user: Arc<ServerDocUser>, data: Vec<u8>) -> Result<()> {
  99. let revision_pb = spawn_blocking(move || {
  100. let revision: Revision = parse_from_bytes(&data)?;
  101. // let _ = verify_md5(&revision)?;
  102. Result::Ok(revision)
  103. })
  104. .await
  105. .map_err(internal_error)??;
  106. self.handle_revision(user, revision_pb).await
  107. }
  108. async fn handle_revision(&self, user: Arc<ServerDocUser>, mut revision: Revision) -> Result<()> {
  109. let revision: flowy_collaboration::entities::revision::Revision =
  110. (&mut revision).try_into().map_err(internal_error)?;
  111. // Create the document if it doesn't exist
  112. let handler = match self.doc_manager.get(&revision.doc_id).await {
  113. None => self
  114. .doc_manager
  115. .create_doc(revision.clone())
  116. .await
  117. .map_err(internal_error)?,
  118. Some(handler) => handler,
  119. };
  120. handler.apply_revision(user, revision).await.map_err(internal_error)?;
  121. Ok(())
  122. }
  123. }
  124. #[allow(dead_code)]
  125. fn verify_md5(revision: &Revision) -> Result<()> {
  126. if md5(&revision.delta_data) != revision.md5 {
  127. return Err(ServerError::internal().context("Revision md5 not match"));
  128. }
  129. Ok(())
  130. }
  131. #[derive(Clone, Debug)]
  132. pub struct ServerDocUser {
  133. pub user: Arc<WSUser>,
  134. pub(crate) socket: Socket,
  135. pub pg_pool: PgPool,
  136. }
  137. impl RevisionUser for ServerDocUser {
  138. fn user_id(&self) -> String { self.user.id().to_string() }
  139. fn receive(&self, resp: SyncResponse) {
  140. let result = match resp {
  141. SyncResponse::Pull(data) => {
  142. let msg: WSMessageAdaptor = data.into();
  143. self.socket.try_send(msg).map_err(internal_error)
  144. },
  145. SyncResponse::Push(data) => {
  146. let msg: WSMessageAdaptor = data.into();
  147. self.socket.try_send(msg).map_err(internal_error)
  148. },
  149. SyncResponse::Ack(data) => {
  150. let msg: WSMessageAdaptor = data.into();
  151. self.socket.try_send(msg).map_err(internal_error)
  152. },
  153. SyncResponse::NewRevision {
  154. rev_id,
  155. doc_id,
  156. doc_json,
  157. } => {
  158. let pg_pool = self.pg_pool.clone();
  159. tokio::task::spawn(async move {
  160. let mut params = UpdateDocParams::new();
  161. params.set_doc_id(doc_id);
  162. params.set_data(doc_json);
  163. params.set_rev_id(rev_id);
  164. match update_doc(&pg_pool, params).await {
  165. Ok(_) => {},
  166. Err(e) => log::error!("{}", e),
  167. }
  168. });
  169. Ok(())
  170. },
  171. };
  172. match result {
  173. Ok(_) => {},
  174. Err(e) => log::error!("[ServerDocUser]: {}", e),
  175. }
  176. }
  177. }