ws_actor.rs 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. use crate::{
  2. context::FlowyPersistence,
  3. services::web_socket::{entities::Socket, WSClientData, WSUser, WebSocketMessage},
  4. util::serde_ext::{md5, parse_from_bytes},
  5. };
  6. use actix_rt::task::spawn_blocking;
  7. use async_stream::stream;
  8. use backend_service::errors::{internal_error, Result, ServerError};
  9. use crate::services::web_socket::revision_data_to_ws_message;
  10. use flowy_collaboration::{
  11. protobuf::{
  12. ClientRevisionWSData as ClientRevisionWSDataPB,
  13. ClientRevisionWSDataType as ClientRevisionWSDataTypePB,
  14. Revision as RevisionPB,
  15. },
  16. server_document::ServerDocumentManager,
  17. synchronizer::{RevisionSyncResponse, RevisionUser},
  18. };
  19. use futures::stream::StreamExt;
  20. use lib_ws::WSChannel;
  21. use std::sync::Arc;
  22. use tokio::sync::{mpsc, oneshot};
  23. pub enum DocumentWSActorMessage {
  24. ClientData {
  25. client_data: WSClientData,
  26. persistence: Arc<FlowyPersistence>,
  27. ret: oneshot::Sender<Result<()>>,
  28. },
  29. }
  30. pub struct DocumentWebSocketActor {
  31. receiver: Option<mpsc::Receiver<DocumentWSActorMessage>>,
  32. doc_manager: Arc<ServerDocumentManager>,
  33. }
  34. impl DocumentWebSocketActor {
  35. pub fn new(receiver: mpsc::Receiver<DocumentWSActorMessage>, manager: Arc<ServerDocumentManager>) -> Self {
  36. Self {
  37. receiver: Some(receiver),
  38. doc_manager: manager,
  39. }
  40. }
  41. pub async fn run(mut self) {
  42. let mut receiver = self
  43. .receiver
  44. .take()
  45. .expect("DocumentWebSocketActor's receiver should only take one time");
  46. let stream = stream! {
  47. loop {
  48. match receiver.recv().await {
  49. Some(msg) => yield msg,
  50. None => break,
  51. }
  52. }
  53. };
  54. stream.for_each(|msg| self.handle_message(msg)).await;
  55. }
  56. async fn handle_message(&self, msg: DocumentWSActorMessage) {
  57. match msg {
  58. DocumentWSActorMessage::ClientData {
  59. client_data,
  60. persistence: _,
  61. ret,
  62. } => {
  63. let _ = ret.send(self.handle_document_data(client_data).await);
  64. },
  65. }
  66. }
  67. async fn handle_document_data(&self, client_data: WSClientData) -> Result<()> {
  68. let WSClientData { user, socket, data } = client_data;
  69. let document_client_data = spawn_blocking(move || parse_from_bytes::<ClientRevisionWSDataPB>(&data))
  70. .await
  71. .map_err(internal_error)??;
  72. tracing::debug!(
  73. "[DocumentWebSocketActor]: receive: {}:{}, {:?}",
  74. document_client_data.object_id,
  75. document_client_data.data_id,
  76. document_client_data.ty
  77. );
  78. let user = Arc::new(DocumentRevisionUser { user, socket });
  79. match &document_client_data.ty {
  80. ClientRevisionWSDataTypePB::ClientPushRev => {
  81. let _ = self
  82. .doc_manager
  83. .handle_client_revisions(user, document_client_data)
  84. .await
  85. .map_err(internal_error)?;
  86. },
  87. ClientRevisionWSDataTypePB::ClientPing => {
  88. let _ = self
  89. .doc_manager
  90. .handle_client_ping(user, document_client_data)
  91. .await
  92. .map_err(internal_error)?;
  93. },
  94. }
  95. Ok(())
  96. }
  97. }
  98. #[allow(dead_code)]
  99. fn verify_md5(revision: &RevisionPB) -> Result<()> {
  100. if md5(&revision.delta_data) != revision.md5 {
  101. return Err(ServerError::internal().context("RevisionPB md5 not match"));
  102. }
  103. Ok(())
  104. }
  105. #[derive(Clone)]
  106. pub struct DocumentRevisionUser {
  107. pub user: Arc<WSUser>,
  108. pub(crate) socket: Socket,
  109. }
  110. impl std::fmt::Debug for DocumentRevisionUser {
  111. fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
  112. f.debug_struct("DocumentRevisionUser")
  113. .field("user", &self.user)
  114. .field("socket", &self.socket)
  115. .finish()
  116. }
  117. }
  118. impl RevisionUser for DocumentRevisionUser {
  119. fn user_id(&self) -> String { self.user.id().to_string() }
  120. fn receive(&self, resp: RevisionSyncResponse) {
  121. let result = match resp {
  122. RevisionSyncResponse::Pull(data) => {
  123. let msg: WebSocketMessage = revision_data_to_ws_message(data, WSChannel::Document);
  124. self.socket.try_send(msg).map_err(internal_error)
  125. },
  126. RevisionSyncResponse::Push(data) => {
  127. let msg: WebSocketMessage = revision_data_to_ws_message(data, WSChannel::Document);
  128. self.socket.try_send(msg).map_err(internal_error)
  129. },
  130. RevisionSyncResponse::Ack(data) => {
  131. let msg: WebSocketMessage = revision_data_to_ws_message(data, WSChannel::Document);
  132. self.socket.try_send(msg).map_err(internal_error)
  133. },
  134. };
  135. match result {
  136. Ok(_) => {},
  137. Err(e) => log::error!("[DocumentRevisionUser]: {}", e),
  138. }
  139. }
  140. }