ws_actor.rs 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. use crate::{
  2. context::FlowyPersistence,
  3. services::web_socket::{entities::Socket, WSClientData, WSUser, WebSocketMessage},
  4. util::serde_ext::{md5, parse_from_bytes},
  5. };
  6. use actix_rt::task::spawn_blocking;
  7. use async_stream::stream;
  8. use backend_service::errors::{internal_error, Result, ServerError};
  9. use flowy_collaboration::{
  10. protobuf::{DocumentClientWSData, DocumentClientWSDataType, Revision},
  11. sync::{RevisionUser, ServerDocumentManager, SyncResponse},
  12. };
  13. use futures::stream::StreamExt;
  14. use std::{convert::TryInto, sync::Arc};
  15. use tokio::sync::{mpsc, oneshot};
  16. pub enum WSActorMessage {
  17. ClientData {
  18. client_data: WSClientData,
  19. persistence: Arc<FlowyPersistence>,
  20. ret: oneshot::Sender<Result<()>>,
  21. },
  22. }
  23. pub struct DocumentWebSocketActor {
  24. receiver: Option<mpsc::Receiver<WSActorMessage>>,
  25. doc_manager: Arc<ServerDocumentManager>,
  26. }
  27. impl DocumentWebSocketActor {
  28. pub fn new(receiver: mpsc::Receiver<WSActorMessage>, manager: Arc<ServerDocumentManager>) -> Self {
  29. Self {
  30. receiver: Some(receiver),
  31. doc_manager: manager,
  32. }
  33. }
  34. pub async fn run(mut self) {
  35. let mut receiver = self
  36. .receiver
  37. .take()
  38. .expect("DocActor's receiver should only take one time");
  39. let stream = stream! {
  40. loop {
  41. match receiver.recv().await {
  42. Some(msg) => yield msg,
  43. None => break,
  44. }
  45. }
  46. };
  47. stream.for_each(|msg| self.handle_message(msg)).await;
  48. }
  49. async fn handle_message(&self, msg: WSActorMessage) {
  50. match msg {
  51. WSActorMessage::ClientData {
  52. client_data,
  53. persistence,
  54. ret,
  55. } => {
  56. let _ = ret.send(self.handle_client_data(client_data, persistence).await);
  57. },
  58. }
  59. }
  60. async fn handle_client_data(&self, client_data: WSClientData, persistence: Arc<FlowyPersistence>) -> Result<()> {
  61. let WSClientData { user, socket, data } = client_data;
  62. let document_client_data = spawn_blocking(move || parse_from_bytes::<DocumentClientWSData>(&data))
  63. .await
  64. .map_err(internal_error)??;
  65. tracing::debug!(
  66. "[DocumentWebSocketActor]: receive client data: {}:{}, {:?}",
  67. document_client_data.doc_id,
  68. document_client_data.id,
  69. document_client_data.ty
  70. );
  71. let user = Arc::new(ServerDocUser {
  72. user,
  73. socket,
  74. persistence,
  75. });
  76. match self.handle_revision(user, document_client_data).await {
  77. Ok(_) => {},
  78. Err(e) => {
  79. tracing::error!("[DocumentWebSocketActor]: process client data error {:?}", e);
  80. },
  81. }
  82. Ok(())
  83. }
  84. async fn handle_revision(&self, user: Arc<ServerDocUser>, client_data: DocumentClientWSData) -> Result<()> {
  85. match &client_data.ty {
  86. DocumentClientWSDataType::ClientPushRev => {
  87. let _ = self
  88. .doc_manager
  89. .apply_revisions(user, client_data)
  90. .await
  91. .map_err(internal_error)?;
  92. },
  93. }
  94. Ok(())
  95. }
  96. }
  97. #[allow(dead_code)]
  98. fn verify_md5(revision: &Revision) -> Result<()> {
  99. if md5(&revision.delta_data) != revision.md5 {
  100. return Err(ServerError::internal().context("Revision md5 not match"));
  101. }
  102. Ok(())
  103. }
  104. #[derive(Clone)]
  105. pub struct ServerDocUser {
  106. pub user: Arc<WSUser>,
  107. pub(crate) socket: Socket,
  108. pub persistence: Arc<FlowyPersistence>,
  109. }
  110. impl std::fmt::Debug for ServerDocUser {
  111. fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
  112. f.debug_struct("ServerDocUser")
  113. .field("user", &self.user)
  114. .field("socket", &self.socket)
  115. .finish()
  116. }
  117. }
  118. impl RevisionUser for ServerDocUser {
  119. fn user_id(&self) -> String { self.user.id().to_string() }
  120. fn receive(&self, resp: SyncResponse) {
  121. let result = match resp {
  122. SyncResponse::Pull(data) => {
  123. let msg: WebSocketMessage = data.into();
  124. self.socket.try_send(msg).map_err(internal_error)
  125. },
  126. SyncResponse::Push(data) => {
  127. let msg: WebSocketMessage = data.into();
  128. self.socket.try_send(msg).map_err(internal_error)
  129. },
  130. SyncResponse::Ack(data) => {
  131. let msg: WebSocketMessage = data.into();
  132. self.socket.try_send(msg).map_err(internal_error)
  133. },
  134. SyncResponse::NewRevision(revisions) => {
  135. let kv_store = self.persistence.kv_store();
  136. tokio::task::spawn(async move {
  137. let revisions = revisions
  138. .into_iter()
  139. .map(|revision| revision.try_into().unwrap())
  140. .collect::<Vec<_>>();
  141. match kv_store.batch_set_revision(revisions).await {
  142. Ok(_) => {},
  143. Err(e) => log::error!("{}", e),
  144. }
  145. });
  146. Ok(())
  147. },
  148. };
  149. match result {
  150. Ok(_) => {},
  151. Err(e) => log::error!("[ServerDocUser]: {}", e),
  152. }
  153. }
  154. }