ws_actor.rs 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. use crate::{
  2. context::FlowyPersistence,
  3. services::web_socket::{entities::Socket, WSClientData, WSUser, WebSocketMessage},
  4. util::serde_ext::{md5, parse_from_bytes},
  5. };
  6. use actix_rt::task::spawn_blocking;
  7. use async_stream::stream;
  8. use backend_service::errors::{internal_error, Result, ServerError};
  9. use flowy_collaboration::{
  10. protobuf::{DocumentClientWSData, DocumentClientWSDataType, Revision},
  11. sync::{RevisionUser, ServerDocumentManager, SyncResponse},
  12. };
  13. use futures::stream::StreamExt;
  14. use std::{convert::TryInto, sync::Arc};
  15. use tokio::sync::{mpsc, oneshot};
  16. pub enum WSActorMessage {
  17. ClientData {
  18. client_data: WSClientData,
  19. persistence: Arc<FlowyPersistence>,
  20. ret: oneshot::Sender<Result<()>>,
  21. },
  22. }
  23. pub struct DocumentWebSocketActor {
  24. receiver: Option<mpsc::Receiver<WSActorMessage>>,
  25. doc_manager: Arc<ServerDocumentManager>,
  26. }
  27. impl DocumentWebSocketActor {
  28. pub fn new(receiver: mpsc::Receiver<WSActorMessage>, manager: Arc<ServerDocumentManager>) -> Self {
  29. Self {
  30. receiver: Some(receiver),
  31. doc_manager: manager,
  32. }
  33. }
  34. pub async fn run(mut self) {
  35. let mut receiver = self
  36. .receiver
  37. .take()
  38. .expect("DocActor's receiver should only take one time");
  39. let stream = stream! {
  40. loop {
  41. match receiver.recv().await {
  42. Some(msg) => yield msg,
  43. None => break,
  44. }
  45. }
  46. };
  47. stream.for_each(|msg| self.handle_message(msg)).await;
  48. }
  49. async fn handle_message(&self, msg: WSActorMessage) {
  50. match msg {
  51. WSActorMessage::ClientData {
  52. client_data,
  53. persistence,
  54. ret,
  55. } => {
  56. let _ = ret.send(self.handle_client_data(client_data, persistence).await);
  57. },
  58. }
  59. }
  60. async fn handle_client_data(&self, client_data: WSClientData, persistence: Arc<FlowyPersistence>) -> Result<()> {
  61. let WSClientData { user, socket, data } = client_data;
  62. let document_client_data = spawn_blocking(move || parse_from_bytes::<DocumentClientWSData>(&data))
  63. .await
  64. .map_err(internal_error)??;
  65. tracing::debug!(
  66. "[DocumentWebSocketActor]: receive client data: {}:{}, {:?}",
  67. document_client_data.doc_id,
  68. document_client_data.id,
  69. document_client_data.ty
  70. );
  71. let user = Arc::new(ServerDocUser {
  72. user,
  73. socket,
  74. persistence,
  75. });
  76. match &document_client_data.ty {
  77. DocumentClientWSDataType::ClientPushRev => {
  78. let _ = self
  79. .doc_manager
  80. .handle_client_revisions(user, document_client_data)
  81. .await
  82. .map_err(internal_error)?;
  83. },
  84. DocumentClientWSDataType::ClientPing => {
  85. let _ = self
  86. .doc_manager
  87. .handle_client_ping(user, document_client_data)
  88. .await
  89. .map_err(internal_error)?;
  90. },
  91. }
  92. Ok(())
  93. }
  94. }
  95. #[allow(dead_code)]
  96. fn verify_md5(revision: &Revision) -> Result<()> {
  97. if md5(&revision.delta_data) != revision.md5 {
  98. return Err(ServerError::internal().context("Revision md5 not match"));
  99. }
  100. Ok(())
  101. }
  102. #[derive(Clone)]
  103. pub struct ServerDocUser {
  104. pub user: Arc<WSUser>,
  105. pub(crate) socket: Socket,
  106. pub persistence: Arc<FlowyPersistence>,
  107. }
  108. impl std::fmt::Debug for ServerDocUser {
  109. fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
  110. f.debug_struct("ServerDocUser")
  111. .field("user", &self.user)
  112. .field("socket", &self.socket)
  113. .finish()
  114. }
  115. }
  116. impl RevisionUser for ServerDocUser {
  117. fn user_id(&self) -> String { self.user.id().to_string() }
  118. fn receive(&self, resp: SyncResponse) {
  119. let result = match resp {
  120. SyncResponse::Pull(data) => {
  121. let msg: WebSocketMessage = data.into();
  122. self.socket.try_send(msg).map_err(internal_error)
  123. },
  124. SyncResponse::Push(data) => {
  125. let msg: WebSocketMessage = data.into();
  126. self.socket.try_send(msg).map_err(internal_error)
  127. },
  128. SyncResponse::Ack(data) => {
  129. let msg: WebSocketMessage = data.into();
  130. self.socket.try_send(msg).map_err(internal_error)
  131. },
  132. SyncResponse::NewRevision(revisions) => {
  133. let kv_store = self.persistence.kv_store();
  134. tokio::task::spawn(async move {
  135. let revisions = revisions
  136. .into_iter()
  137. .map(|revision| revision.try_into().unwrap())
  138. .collect::<Vec<_>>();
  139. match kv_store.batch_set_revision(revisions).await {
  140. Ok(_) => {},
  141. Err(e) => log::error!("{}", e),
  142. }
  143. });
  144. Ok(())
  145. },
  146. };
  147. match result {
  148. Ok(_) => {},
  149. Err(e) => log::error!("[ServerDocUser]: {}", e),
  150. }
  151. }
  152. }