123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- use crate::{
- context::FlowyPersistence,
- services::web_socket::{entities::Socket, WSClientData, WSUser, WebSocketMessage},
- util::serde_ext::{md5, parse_from_bytes},
- };
- use actix_rt::task::spawn_blocking;
- use async_stream::stream;
- use backend_service::errors::{internal_error, Result, ServerError};
- use flowy_collaboration::{
- protobuf::{DocumentClientWSData, DocumentClientWSDataType, Revision},
- sync::{RevisionUser, ServerDocumentManager, SyncResponse},
- };
- use futures::stream::StreamExt;
- use std::{convert::TryInto, sync::Arc};
- use tokio::sync::{mpsc, oneshot};
- pub enum WSActorMessage {
- ClientData {
- client_data: WSClientData,
- persistence: Arc<FlowyPersistence>,
- ret: oneshot::Sender<Result<()>>,
- },
- }
- pub struct DocumentWebSocketActor {
- receiver: Option<mpsc::Receiver<WSActorMessage>>,
- doc_manager: Arc<ServerDocumentManager>,
- }
- impl DocumentWebSocketActor {
- pub fn new(receiver: mpsc::Receiver<WSActorMessage>, manager: Arc<ServerDocumentManager>) -> Self {
- Self {
- receiver: Some(receiver),
- doc_manager: manager,
- }
- }
- pub async fn run(mut self) {
- let mut receiver = self
- .receiver
- .take()
- .expect("DocActor's receiver should only take one time");
- let stream = stream! {
- loop {
- match receiver.recv().await {
- Some(msg) => yield msg,
- None => break,
- }
- }
- };
- stream.for_each(|msg| self.handle_message(msg)).await;
- }
- async fn handle_message(&self, msg: WSActorMessage) {
- match msg {
- WSActorMessage::ClientData {
- client_data,
- persistence,
- ret,
- } => {
- let _ = ret.send(self.handle_client_data(client_data, persistence).await);
- },
- }
- }
- async fn handle_client_data(&self, client_data: WSClientData, persistence: Arc<FlowyPersistence>) -> Result<()> {
- let WSClientData { user, socket, data } = client_data;
- let document_client_data = spawn_blocking(move || parse_from_bytes::<DocumentClientWSData>(&data))
- .await
- .map_err(internal_error)??;
- tracing::debug!(
- "[DocumentWebSocketActor]: receive client data: {}:{}, {:?}",
- document_client_data.doc_id,
- document_client_data.id,
- document_client_data.ty
- );
- let user = Arc::new(ServerDocUser {
- user,
- socket,
- persistence,
- });
- match self.handle_revision(user, document_client_data).await {
- Ok(_) => {},
- Err(e) => {
- tracing::error!("[DocumentWebSocketActor]: process client data error {:?}", e);
- },
- }
- Ok(())
- }
- async fn handle_revision(&self, user: Arc<ServerDocUser>, client_data: DocumentClientWSData) -> Result<()> {
- match &client_data.ty {
- DocumentClientWSDataType::ClientPushRev => {
- let _ = self
- .doc_manager
- .apply_revisions(user, client_data)
- .await
- .map_err(internal_error)?;
- },
- }
- Ok(())
- }
- }
- #[allow(dead_code)]
- fn verify_md5(revision: &Revision) -> Result<()> {
- if md5(&revision.delta_data) != revision.md5 {
- return Err(ServerError::internal().context("Revision md5 not match"));
- }
- Ok(())
- }
- #[derive(Clone)]
- pub struct ServerDocUser {
- pub user: Arc<WSUser>,
- pub(crate) socket: Socket,
- pub persistence: Arc<FlowyPersistence>,
- }
- impl std::fmt::Debug for ServerDocUser {
- fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- f.debug_struct("ServerDocUser")
- .field("user", &self.user)
- .field("socket", &self.socket)
- .finish()
- }
- }
- impl RevisionUser for ServerDocUser {
- fn user_id(&self) -> String { self.user.id().to_string() }
- fn receive(&self, resp: SyncResponse) {
- let result = match resp {
- SyncResponse::Pull(data) => {
- let msg: WebSocketMessage = data.into();
- self.socket.try_send(msg).map_err(internal_error)
- },
- SyncResponse::Push(data) => {
- let msg: WebSocketMessage = data.into();
- self.socket.try_send(msg).map_err(internal_error)
- },
- SyncResponse::Ack(data) => {
- let msg: WebSocketMessage = data.into();
- self.socket.try_send(msg).map_err(internal_error)
- },
- SyncResponse::NewRevision(revisions) => {
- let kv_store = self.persistence.kv_store();
- tokio::task::spawn(async move {
- let revisions = revisions
- .into_iter()
- .map(|revision| revision.try_into().unwrap())
- .collect::<Vec<_>>();
- match kv_store.batch_set_revision(revisions).await {
- Ok(_) => {},
- Err(e) => log::error!("{}", e),
- }
- });
- Ok(())
- },
- };
- match result {
- Ok(_) => {},
- Err(e) => log::error!("[ServerDocUser]: {}", e),
- }
- }
- }
|