|
@@ -1,9 +1,9 @@
|
|
use crate::{
|
|
use crate::{
|
|
services::{
|
|
services::{
|
|
- doc::editor::ServerDocUser,
|
|
|
|
- util::{md5, parse_from_bytes},
|
|
|
|
|
|
+ document::update_doc,
|
|
|
|
+ web_socket::{entities::Socket, WSClientData, WSMessageAdaptor, WSUser},
|
|
},
|
|
},
|
|
- web_socket::WsClientData,
|
|
|
|
|
|
+ util::serde_ext::{md5, parse_from_bytes},
|
|
};
|
|
};
|
|
use actix_rt::task::spawn_blocking;
|
|
use actix_rt::task::spawn_blocking;
|
|
use actix_web::web::Data;
|
|
use actix_web::web::Data;
|
|
@@ -11,32 +11,29 @@ use async_stream::stream;
|
|
use backend_service::errors::{internal_error, Result, ServerError};
|
|
use backend_service::errors::{internal_error, Result, ServerError};
|
|
use flowy_collaboration::{
|
|
use flowy_collaboration::{
|
|
core::sync::{RevisionUser, ServerDocManager, SyncResponse},
|
|
core::sync::{RevisionUser, ServerDocManager, SyncResponse},
|
|
- entities::ws::DocumentWSDataBuilder,
|
|
|
|
- protobuf::{DocumentWSData, DocumentWSDataType},
|
|
|
|
|
|
+ protobuf::{DocumentWSData, DocumentWSDataType, NewDocumentUser, UpdateDocParams},
|
|
};
|
|
};
|
|
-
|
|
|
|
-use flowy_collaboration::protobuf::NewDocumentUser;
|
|
|
|
use futures::stream::StreamExt;
|
|
use futures::stream::StreamExt;
|
|
use lib_ot::protobuf::Revision;
|
|
use lib_ot::protobuf::Revision;
|
|
use sqlx::PgPool;
|
|
use sqlx::PgPool;
|
|
use std::{convert::TryInto, sync::Arc};
|
|
use std::{convert::TryInto, sync::Arc};
|
|
use tokio::sync::{mpsc, oneshot};
|
|
use tokio::sync::{mpsc, oneshot};
|
|
|
|
|
|
-pub enum DocWsMsg {
|
|
|
|
|
|
+pub enum WSActorMessage {
|
|
ClientData {
|
|
ClientData {
|
|
- client_data: WsClientData,
|
|
|
|
|
|
+ client_data: WSClientData,
|
|
pool: Data<PgPool>,
|
|
pool: Data<PgPool>,
|
|
ret: oneshot::Sender<Result<()>>,
|
|
ret: oneshot::Sender<Result<()>>,
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
|
|
-pub struct DocWsActor {
|
|
|
|
- receiver: Option<mpsc::Receiver<DocWsMsg>>,
|
|
|
|
|
|
+pub struct DocumentWebSocketActor {
|
|
|
|
+ receiver: Option<mpsc::Receiver<WSActorMessage>>,
|
|
doc_manager: Arc<ServerDocManager>,
|
|
doc_manager: Arc<ServerDocManager>,
|
|
}
|
|
}
|
|
|
|
|
|
-impl DocWsActor {
|
|
|
|
- pub fn new(receiver: mpsc::Receiver<DocWsMsg>, manager: Arc<ServerDocManager>) -> Self {
|
|
|
|
|
|
+impl DocumentWebSocketActor {
|
|
|
|
+ pub fn new(receiver: mpsc::Receiver<WSActorMessage>, manager: Arc<ServerDocManager>) -> Self {
|
|
Self {
|
|
Self {
|
|
receiver: Some(receiver),
|
|
receiver: Some(receiver),
|
|
doc_manager: manager,
|
|
doc_manager: manager,
|
|
@@ -61,16 +58,16 @@ impl DocWsActor {
|
|
stream.for_each(|msg| self.handle_message(msg)).await;
|
|
stream.for_each(|msg| self.handle_message(msg)).await;
|
|
}
|
|
}
|
|
|
|
|
|
- async fn handle_message(&self, msg: DocWsMsg) {
|
|
|
|
|
|
+ async fn handle_message(&self, msg: WSActorMessage) {
|
|
match msg {
|
|
match msg {
|
|
- DocWsMsg::ClientData { client_data, pool, ret } => {
|
|
|
|
|
|
+ WSActorMessage::ClientData { client_data, pool, ret } => {
|
|
let _ = ret.send(self.handle_client_data(client_data, pool).await);
|
|
let _ = ret.send(self.handle_client_data(client_data, pool).await);
|
|
},
|
|
},
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- async fn handle_client_data(&self, client_data: WsClientData, pg_pool: Data<PgPool>) -> Result<()> {
|
|
|
|
- let WsClientData { user, socket, data } = client_data;
|
|
|
|
|
|
+ async fn handle_client_data(&self, client_data: WSClientData, pg_pool: Data<PgPool>) -> Result<()> {
|
|
|
|
+ let WSClientData { user, socket, data } = client_data;
|
|
let document_data = spawn_blocking(move || {
|
|
let document_data = spawn_blocking(move || {
|
|
let document_data: DocumentWSData = parse_from_bytes(&data)?;
|
|
let document_data: DocumentWSData = parse_from_bytes(&data)?;
|
|
Result::Ok(document_data)
|
|
Result::Ok(document_data)
|
|
@@ -127,7 +124,7 @@ impl DocWsActor {
|
|
|
|
|
|
async fn handle_revision(&self, user: Arc<ServerDocUser>, mut revision: Revision) -> Result<()> {
|
|
async fn handle_revision(&self, user: Arc<ServerDocUser>, mut revision: Revision) -> Result<()> {
|
|
let revision: lib_ot::revision::Revision = (&mut revision).try_into().map_err(internal_error)?;
|
|
let revision: lib_ot::revision::Revision = (&mut revision).try_into().map_err(internal_error)?;
|
|
- // Create the doc if it doesn't exist
|
|
|
|
|
|
+ // Create the document if it doesn't exist
|
|
let handler = match self.doc_manager.get(&revision.doc_id).await {
|
|
let handler = match self.doc_manager.get(&revision.doc_id).await {
|
|
None => self
|
|
None => self
|
|
.doc_manager
|
|
.doc_manager
|
|
@@ -149,3 +146,54 @@ fn verify_md5(revision: &Revision) -> Result<()> {
|
|
}
|
|
}
|
|
Ok(())
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+#[derive(Clone, Debug)]
|
|
|
|
+pub struct ServerDocUser {
|
|
|
|
+ pub user: Arc<WSUser>,
|
|
|
|
+ pub(crate) socket: Socket,
|
|
|
|
+ pub pg_pool: Data<PgPool>,
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+impl RevisionUser for ServerDocUser {
|
|
|
|
+ fn user_id(&self) -> String { self.user.id().to_string() }
|
|
|
|
+
|
|
|
|
+ fn receive(&self, resp: SyncResponse) {
|
|
|
|
+ let result = match resp {
|
|
|
|
+ SyncResponse::Pull(data) => {
|
|
|
|
+ let msg: WSMessageAdaptor = data.into();
|
|
|
|
+ self.socket.try_send(msg).map_err(internal_error)
|
|
|
|
+ },
|
|
|
|
+ SyncResponse::Push(data) => {
|
|
|
|
+ let msg: WSMessageAdaptor = data.into();
|
|
|
|
+ self.socket.try_send(msg).map_err(internal_error)
|
|
|
|
+ },
|
|
|
|
+ SyncResponse::Ack(data) => {
|
|
|
|
+ let msg: WSMessageAdaptor = data.into();
|
|
|
|
+ self.socket.try_send(msg).map_err(internal_error)
|
|
|
|
+ },
|
|
|
|
+ SyncResponse::NewRevision {
|
|
|
|
+ rev_id,
|
|
|
|
+ doc_id,
|
|
|
|
+ doc_json,
|
|
|
|
+ } => {
|
|
|
|
+ let pg_pool = self.pg_pool.clone();
|
|
|
|
+ tokio::task::spawn(async move {
|
|
|
|
+ let mut params = UpdateDocParams::new();
|
|
|
|
+ params.set_doc_id(doc_id);
|
|
|
|
+ params.set_data(doc_json);
|
|
|
|
+ params.set_rev_id(rev_id);
|
|
|
|
+ match update_doc(pg_pool.get_ref(), params).await {
|
|
|
|
+ Ok(_) => {},
|
|
|
|
+ Err(e) => log::error!("{}", e),
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ Ok(())
|
|
|
|
+ },
|
|
|
|
+ };
|
|
|
|
+
|
|
|
|
+ match result {
|
|
|
|
+ Ok(_) => {},
|
|
|
|
+ Err(e) => log::error!("[ServerDocUser]: {}", e),
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|