Jelajahi Sumber

send acked to client using ws

appflowy 3 tahun lalu
induk
melakukan
a26f588409

+ 1 - 1
app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pb.dart

@@ -16,7 +16,7 @@ export 'ws.pbenum.dart';
 class WsDocumentData extends $pb.GeneratedMessage {
   static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'WsDocumentData', createEmptyInstance: create)
     ..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'id')
-    ..e<WsDataType>(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'ty', $pb.PbFieldType.OE, defaultOrMaker: WsDataType.Command, valueOf: WsDataType.valueOf, enumValues: WsDataType.values)
+    ..e<WsDataType>(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'ty', $pb.PbFieldType.OE, defaultOrMaker: WsDataType.Acked, valueOf: WsDataType.valueOf, enumValues: WsDataType.values)
     ..a<$core.List<$core.int>>(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'data', $pb.PbFieldType.OY)
     ..hasRequiredFields = false
   ;

+ 2 - 2
app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbenum.dart

@@ -10,11 +10,11 @@ import 'dart:core' as $core;
 import 'package:protobuf/protobuf.dart' as $pb;
 
 class WsDataType extends $pb.ProtobufEnum {
-  static const WsDataType Command = WsDataType._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Command');
+  static const WsDataType Acked = WsDataType._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Acked');
   static const WsDataType Delta = WsDataType._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Delta');
 
   static const $core.List<WsDataType> values = <WsDataType> [
-    Command,
+    Acked,
     Delta,
   ];
 

+ 2 - 2
app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbjson.dart

@@ -12,13 +12,13 @@ import 'dart:typed_data' as $typed_data;
 const WsDataType$json = const {
   '1': 'WsDataType',
   '2': const [
-    const {'1': 'Command', '2': 0},
+    const {'1': 'Acked', '2': 0},
     const {'1': 'Delta', '2': 1},
   ],
 };
 
 /// Descriptor for `WsDataType`. Decode as a `google.protobuf.EnumDescriptorProto`.
-final $typed_data.Uint8List wsDataTypeDescriptor = $convert.base64Decode('CgpXc0RhdGFUeXBlEgsKB0NvbW1hbmQQABIJCgVEZWx0YRAB');
+final $typed_data.Uint8List wsDataTypeDescriptor = $convert.base64Decode('CgpXc0RhdGFUeXBlEgkKBUFja2VkEAASCQoFRGVsdGEQAQ==');
 @$core.Deprecated('Use wsDocumentDataDescriptor instead')
 const WsDocumentData$json = const {
   '1': 'WsDocumentData',

+ 1 - 0
backend/Cargo.toml

@@ -57,6 +57,7 @@ parking_lot = "0.11"
 md5 = "0.7.0"
 futures-core = { version = "0.3", default-features = false }
 pin-project = "1.0.0"
+byteorder = {version = "1.3.4"}
 
 flowy-user = { path = "../rust-lib/flowy-user" }
 flowy-workspace = { path = "../rust-lib/flowy-workspace" }

+ 2 - 2
backend/src/application.rs

@@ -21,7 +21,7 @@ use crate::{
         view::router as view,
         workspace::router as workspace,
         ws,
-        ws::WSServer,
+        ws::WsServer,
     },
 };
 
@@ -142,7 +142,7 @@ async fn init_app_context(configuration: &Settings) -> AppContext {
             configuration.database
         ));
 
-    let ws_server = WSServer::new().start();
+    let ws_server = WsServer::new().start();
 
     AppContext::new(ws_server, pg_pool)
 }

+ 3 - 3
backend/src/context.rs

@@ -1,15 +1,15 @@
-use crate::service::ws::WSServer;
+use crate::service::ws::WsServer;
 use actix::Addr;
 
 use sqlx::PgPool;
 
 pub struct AppContext {
-    pub ws_server: Addr<WSServer>,
+    pub ws_server: Addr<WsServer>,
     pub pg_pool: PgPool,
 }
 
 impl AppContext {
-    pub fn new(ws_server: Addr<WSServer>, db_pool: PgPool) -> Self {
+    pub fn new(ws_server: Addr<WsServer>, db_pool: PgPool) -> Self {
         AppContext {
             ws_server,
             pg_pool: db_pool,

+ 30 - 6
backend/src/service/doc/edit_doc.rs

@@ -1,14 +1,22 @@
-use crate::service::doc::update_doc;
+use crate::service::{
+    doc::update_doc,
+    ws::{entities::Socket, WsClientData, WsMessageAdaptor},
+};
 use actix_web::web::Data;
+use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
+use bytes::Bytes;
 use flowy_document::{
+    entities::ws::{WsDataType, WsDocumentData},
     protobuf::{Doc, Revision, UpdateDocParams},
     services::doc::Document,
 };
 use flowy_net::errors::{internal_error, ServerError};
 use flowy_ot::core::Delta;
+use flowy_ws::{protobuf::WsModule, WsMessage};
 use parking_lot::RwLock;
+use protobuf::Message;
 use sqlx::PgPool;
-use std::{sync::Arc, time::Duration};
+use std::{convert::TryInto, sync::Arc, time::Duration};
 
 pub(crate) struct EditDoc {
     doc_id: String,
@@ -27,15 +35,31 @@ impl EditDoc {
         })
     }
 
-    #[tracing::instrument(level = "debug", skip(self, revision))]
-    pub(crate) async fn apply_revision(&self, revision: Revision) -> Result<(), ServerError> {
+    #[tracing::instrument(level = "debug", skip(self, socket, revision))]
+    pub(crate) async fn apply_revision(
+        &self,
+        socket: Socket,
+        revision: Revision,
+    ) -> Result<(), ServerError> {
         let delta = Delta::from_bytes(revision.delta).map_err(internal_error)?;
         match self.document.try_write_for(Duration::from_millis(300)) {
             None => {
                 log::error!("Failed to acquire write lock of document");
             },
-            Some(mut w) => {
-                let _ = w.apply_delta(delta).map_err(internal_error)?;
+            Some(mut write_guard) => {
+                let _ = write_guard.apply_delta(delta).map_err(internal_error)?;
+                let mut wtr = vec![];
+                let _ = wtr.write_i64::<BigEndian>(revision.rev_id);
+
+                let data = WsDocumentData {
+                    id: self.doc_id.clone(),
+                    ty: WsDataType::Acked,
+                    data: wtr,
+                };
+
+                let msg: WsMessage = data.into();
+                let bytes: Bytes = msg.try_into().unwrap();
+                socket.do_send(WsMessageAdaptor(bytes));
             },
         }
 

+ 22 - 12
backend/src/service/doc/ws_handler.rs

@@ -1,5 +1,9 @@
 use super::edit_doc::EditDoc;
-use crate::service::{doc::read_doc, util::parse_from_bytes, ws::WsBizHandler};
+use crate::service::{
+    doc::read_doc,
+    util::parse_from_bytes,
+    ws::{WsBizHandler, WsClientData},
+};
 use actix_web::web::Data;
 use bytes::Bytes;
 use flowy_document::{
@@ -13,22 +17,22 @@ use sqlx::PgPool;
 use std::{collections::HashMap, sync::Arc};
 
 pub struct DocWsBizHandler {
-    inner: Arc<Inner>,
+    doc_manager: Arc<EditDocManager>,
 }
 
 impl DocWsBizHandler {
     pub fn new(pg_pool: Data<PgPool>) -> Self {
         Self {
-            inner: Arc::new(Inner::new(pg_pool)),
+            doc_manager: Arc::new(EditDocManager::new(pg_pool)),
         }
     }
 }
 
 impl WsBizHandler for DocWsBizHandler {
-    fn receive_data(&self, data: Bytes) {
-        let inner = self.inner.clone();
+    fn receive_data(&self, client_data: WsClientData) {
+        let doc_manager = self.doc_manager.clone();
         actix_rt::spawn(async move {
-            let result = inner.handle(data).await;
+            let result = doc_manager.handle(client_data).await;
             match result {
                 Ok(_) => {},
                 Err(e) => log::error!("WsBizHandler handle data error: {:?}", e),
@@ -37,12 +41,12 @@ impl WsBizHandler for DocWsBizHandler {
     }
 }
 
-struct Inner {
+struct EditDocManager {
     pg_pool: Data<PgPool>,
     edit_docs: RwLock<HashMap<String, Arc<EditDoc>>>,
 }
 
-impl Inner {
+impl EditDocManager {
     fn new(pg_pool: Data<PgPool>) -> Self {
         Self {
             pg_pool,
@@ -50,16 +54,22 @@ impl Inner {
         }
     }
 
-    async fn handle(&self, data: Bytes) -> Result<(), ServerError> {
-        let document_data: WsDocumentData = parse_from_bytes(&data)?;
+    async fn handle(&self, client_data: WsClientData) -> Result<(), ServerError> {
+        let document_data: WsDocumentData = parse_from_bytes(&client_data.data)?;
 
         match document_data.ty {
-            WsDataType::Command => {},
+            WsDataType::Acked => {},
             WsDataType::Delta => {
                 let revision: Revision = parse_from_bytes(&document_data.data)?;
                 let edited_doc = self.get_edit_doc(&revision.doc_id).await?;
                 tokio::spawn(async move {
-                    edited_doc.apply_revision(revision).await.unwrap();
+                    match edited_doc
+                        .apply_revision(client_data.socket, revision)
+                        .await
+                    {
+                        Ok(_) => {},
+                        Err(e) => log::error!("Doc apply revision failed: {:?}", e),
+                    }
                 });
             },
         }

+ 2 - 1
backend/src/service/ws/biz_handler.rs

@@ -1,9 +1,10 @@
+use crate::service::ws::WsClientData;
 use bytes::Bytes;
 use flowy_ws::WsModule;
 use std::{collections::HashMap, sync::Arc};
 
 pub trait WsBizHandler: Send + Sync {
-    fn receive_data(&self, data: Bytes);
+    fn receive_data(&self, client_data: WsClientData);
 }
 
 pub type BizHandler = Arc<dyn WsBizHandler>;

+ 2 - 2
backend/src/service/ws/entities/connect.rs

@@ -1,10 +1,10 @@
-use crate::service::ws::ClientMessage;
+use crate::service::ws::WsMessageAdaptor;
 use actix::{Message, Recipient};
 use flowy_net::errors::ServerError;
 use serde::{Deserialize, Serialize};
 use std::fmt::Formatter;
 
-pub type Socket = Recipient<ClientMessage>;
+pub type Socket = Recipient<WsMessageAdaptor>;
 
 #[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq)]
 pub struct SessionId(pub String);

+ 4 - 30
backend/src/service/ws/entities/message.rs

@@ -3,38 +3,12 @@ use actix::Message;
 use bytes::Bytes;
 use std::fmt::Formatter;
 
-#[derive(Debug, Clone)]
-pub enum MessageData {
-    Binary(Bytes),
-    Connect(SessionId),
-    Disconnect(SessionId),
-}
-
 #[derive(Debug, Message, Clone)]
 #[rtype(result = "()")]
-pub struct ClientMessage {
-    pub session_id: SessionId,
-    pub data: MessageData,
-}
-
-impl ClientMessage {
-    pub fn new<T: Into<SessionId>>(session_id: T, data: MessageData) -> Self {
-        ClientMessage {
-            session_id: session_id.into(),
-            data,
-        }
-    }
-}
+pub struct WsMessageAdaptor(pub Bytes);
 
-impl std::fmt::Display for ClientMessage {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        let content = match &self.data {
-            MessageData::Binary(_) => "[Binary]".to_owned(),
-            MessageData::Connect(_) => "[Connect]".to_owned(),
-            MessageData::Disconnect(_) => "[Disconnect]".to_owned(),
-        };
+impl std::ops::Deref for WsMessageAdaptor {
+    type Target = Bytes;
 
-        let desc = format!("{}:{}", &self.session_id, content);
-        f.write_str(&desc)
-    }
+    fn deref(&self) -> &Self::Target { &self.0 }
 }

+ 3 - 3
backend/src/service/ws/router.rs

@@ -1,4 +1,4 @@
-use crate::service::ws::{WSClient, WSServer, WsBizHandlers};
+use crate::service::ws::{WsBizHandlers, WsClient, WsServer};
 use actix::Addr;
 
 use crate::service::user::LoggedUser;
@@ -16,12 +16,12 @@ pub async fn establish_ws_connection(
     request: HttpRequest,
     payload: Payload,
     token: Path<String>,
-    server: Data<Addr<WSServer>>,
+    server: Data<Addr<WsServer>>,
     biz_handlers: Data<WsBizHandlers>,
 ) -> Result<HttpResponse, Error> {
     match LoggedUser::from_token(token.clone()) {
         Ok(user) => {
-            let client = WSClient::new(&user.user_id, server.get_ref().clone(), biz_handlers);
+            let client = WsClient::new(&user.user_id, server.get_ref().clone(), biz_handlers);
             let result = ws::start(client, &request, payload);
             match result {
                 Ok(response) => Ok(response.into()),

+ 26 - 29
backend/src/service/ws/ws_client.rs

@@ -1,12 +1,11 @@
 use crate::{
     config::{HEARTBEAT_INTERVAL, PING_TIMEOUT},
     service::ws::{
-        entities::{Connect, Disconnect, SessionId},
-        ClientMessage,
-        MessageData,
-        WSServer,
+        entities::{Connect, Disconnect, SessionId, Socket},
         WsBizHandler,
         WsBizHandlers,
+        WsMessageAdaptor,
+        WsServer,
     },
 };
 use actix::*;
@@ -16,17 +15,22 @@ use bytes::Bytes;
 use flowy_ws::WsMessage;
 use std::{convert::TryFrom, time::Instant};
 
-pub struct WSClient {
+pub struct WsClientData {
+    pub(crate) socket: Socket,
+    pub(crate) data: Bytes,
+}
+
+pub struct WsClient {
     session_id: SessionId,
-    server: Addr<WSServer>,
+    server: Addr<WsServer>,
     biz_handlers: Data<WsBizHandlers>,
     hb: Instant,
 }
 
-impl WSClient {
+impl WsClient {
     pub fn new<T: Into<SessionId>>(
         session_id: T,
-        server: Addr<WSServer>,
+        server: Addr<WsServer>,
         biz_handlers: Data<WsBizHandlers>,
     ) -> Self {
         Self {
@@ -50,24 +54,25 @@ impl WSClient {
         });
     }
 
-    fn send(&self, data: MessageData) {
-        let msg = ClientMessage::new(self.session_id.clone(), data);
-        self.server.do_send(msg);
-    }
-
-    fn handle_binary_message(&self, bytes: Bytes) {
+    fn handle_binary_message(&self, bytes: Bytes, socket: Socket) {
         // TODO: ok to unwrap?
         let message: WsMessage = WsMessage::try_from(bytes).unwrap();
         match self.biz_handlers.get(&message.module) {
             None => {
                 log::error!("Can't find the handler for {:?}", message.module);
             },
-            Some(handler) => handler.receive_data(Bytes::from(message.data)),
+            Some(handler) => {
+                let client_data = WsClientData {
+                    socket,
+                    data: Bytes::from(message.data),
+                };
+                handler.receive_data(client_data)
+            },
         }
     }
 }
 
-impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSClient {
+impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsClient {
     fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
         match msg {
             Ok(ws::Message::Ping(msg)) => {
@@ -80,13 +85,13 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSClient {
             },
             Ok(ws::Message::Binary(bytes)) => {
                 log::debug!(" Receive {} binary", &self.session_id);
-                self.handle_binary_message(bytes);
+                let socket = ctx.address().recipient();
+                self.handle_binary_message(bytes, socket);
             },
             Ok(Text(_)) => {
                 log::warn!("Receive unexpected text message");
             },
             Ok(ws::Message::Close(reason)) => {
-                self.send(MessageData::Disconnect(self.session_id.clone()));
                 ctx.close(reason);
                 ctx.stop();
             },
@@ -104,21 +109,13 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSClient {
     }
 }
 
-impl Handler<ClientMessage> for WSClient {
+impl Handler<WsMessageAdaptor> for WsClient {
     type Result = ();
 
-    fn handle(&mut self, msg: ClientMessage, ctx: &mut Self::Context) {
-        match msg.data {
-            MessageData::Binary(binary) => {
-                ctx.binary(binary);
-            },
-            MessageData::Connect(_) => {},
-            MessageData::Disconnect(_) => {},
-        }
-    }
+    fn handle(&mut self, msg: WsMessageAdaptor, ctx: &mut Self::Context) { ctx.binary(msg.0); }
 }
 
-impl Actor for WSClient {
+impl Actor for WsClient {
     type Context = ws::WebsocketContext<Self>;
 
     fn started(&mut self, ctx: &mut Self::Context) {

+ 12 - 17
backend/src/service/ws/ws_server.rs

@@ -1,32 +1,31 @@
 use crate::service::ws::{
     entities::{Connect, Disconnect, Session, SessionId},
-    ClientMessage,
-    MessageData,
+    WsMessageAdaptor,
 };
 use actix::{Actor, Context, Handler};
 use dashmap::DashMap;
 use flowy_net::errors::ServerError;
 
-pub struct WSServer {
+pub struct WsServer {
     sessions: DashMap<SessionId, Session>,
 }
 
-impl WSServer {
+impl WsServer {
     pub fn new() -> Self {
         Self {
             sessions: DashMap::new(),
         }
     }
 
-    pub fn send(&self, _msg: ClientMessage) { unimplemented!() }
+    pub fn send(&self, _msg: WsMessageAdaptor) { unimplemented!() }
 }
 
-impl Actor for WSServer {
+impl Actor for WsServer {
     type Context = Context<Self>;
     fn started(&mut self, _ctx: &mut Self::Context) {}
 }
 
-impl Handler<Connect> for WSServer {
+impl Handler<Connect> for WsServer {
     type Result = Result<(), ServerError>;
     fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
         let session: Session = msg.into();
@@ -36,7 +35,7 @@ impl Handler<Connect> for WSServer {
     }
 }
 
-impl Handler<Disconnect> for WSServer {
+impl Handler<Disconnect> for WsServer {
     type Result = Result<(), ServerError>;
     fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) -> Self::Result {
         self.sessions.remove(&msg.sid);
@@ -44,20 +43,16 @@ impl Handler<Disconnect> for WSServer {
     }
 }
 
-impl Handler<ClientMessage> for WSServer {
+impl Handler<WsMessageAdaptor> for WsServer {
     type Result = ();
 
-    fn handle(&mut self, msg: ClientMessage, _ctx: &mut Context<Self>) -> Self::Result {
-        match msg.data {
-            MessageData::Binary(_) => {},
-            MessageData::Connect(_) => {},
-            MessageData::Disconnect(_) => {},
-        }
+    fn handle(&mut self, _msg: WsMessageAdaptor, _ctx: &mut Context<Self>) -> Self::Result {
+        unimplemented!()
     }
 }
 
-impl actix::Supervised for WSServer {
-    fn restarting(&mut self, _ctx: &mut Context<WSServer>) {
+impl actix::Supervised for WsServer {
+    fn restarting(&mut self, _ctx: &mut Context<WsServer>) {
         log::warn!("restarting");
     }
 }

+ 2 - 0
rust-lib/flowy-document/Cargo.toml

@@ -14,6 +14,7 @@ flowy-database = { path = "../flowy-database" }
 flowy-infra = { path = "../flowy-infra" }
 flowy-observable = { path = "../flowy-observable" }
 flowy-ot = { path = "../flowy-ot" }
+flowy-ws = { path = "../flowy-ws" }
 flowy-net = { path = "../flowy-net", features = ["flowy_request"] }
 
 
@@ -37,6 +38,7 @@ serde_json = {version = "1.0"}
 chrono = "0.4.19"
 futures-core = { version = "0.3", default-features = false }
 md5 = "0.7.0"
+byteorder = {version = "1.3.4"}
 
 [dev-dependencies]
 flowy-test = { path = "../flowy-test" }

+ 15 - 3
rust-lib/flowy-document/src/entities/ws/ws.rs

@@ -1,16 +1,17 @@
 use crate::entities::doc::Revision;
 use bytes::Bytes;
 use flowy_derive::{ProtoBuf, ProtoBuf_Enum};
+use flowy_ws::{WsMessage, WsModule};
 use std::convert::TryInto;
 
 #[derive(Debug, Clone, ProtoBuf_Enum, Eq, PartialEq, Hash)]
 pub enum WsDataType {
-    Command = 0,
-    Delta   = 1,
+    Acked = 0,
+    Delta = 1,
 }
 
 impl std::default::Default for WsDataType {
-    fn default() -> Self { WsDataType::Command }
+    fn default() -> Self { WsDataType::Acked }
 }
 
 #[derive(ProtoBuf, Default, Debug, Clone)]
@@ -37,3 +38,14 @@ impl std::convert::From<Revision> for WsDocumentData {
         }
     }
 }
+
+impl std::convert::Into<WsMessage> for WsDocumentData {
+    fn into(self) -> WsMessage {
+        let bytes: Bytes = self.try_into().unwrap();
+        let msg = WsMessage {
+            module: WsModule::Doc,
+            data: bytes.to_vec(),
+        };
+        msg
+    }
+}

+ 14 - 14
rust-lib/flowy-document/src/protobuf/model/ws.rs

@@ -78,7 +78,7 @@ impl WsDocumentData {
         self.ty
     }
     pub fn clear_ty(&mut self) {
-        self.ty = WsDataType::Command;
+        self.ty = WsDataType::Acked;
     }
 
     // Param is passed by value, moved
@@ -146,7 +146,7 @@ impl ::protobuf::Message for WsDocumentData {
         if !self.id.is_empty() {
             my_size += ::protobuf::rt::string_size(1, &self.id);
         }
-        if self.ty != WsDataType::Command {
+        if self.ty != WsDataType::Acked {
             my_size += ::protobuf::rt::enum_size(2, self.ty);
         }
         if !self.data.is_empty() {
@@ -161,7 +161,7 @@ impl ::protobuf::Message for WsDocumentData {
         if !self.id.is_empty() {
             os.write_string(1, &self.id)?;
         }
-        if self.ty != WsDataType::Command {
+        if self.ty != WsDataType::Acked {
             os.write_enum(2, ::protobuf::ProtobufEnum::value(&self.ty))?;
         }
         if !self.data.is_empty() {
@@ -237,7 +237,7 @@ impl ::protobuf::Message for WsDocumentData {
 impl ::protobuf::Clear for WsDocumentData {
     fn clear(&mut self) {
         self.id.clear();
-        self.ty = WsDataType::Command;
+        self.ty = WsDataType::Acked;
         self.data.clear();
         self.unknown_fields.clear();
     }
@@ -257,7 +257,7 @@ impl ::protobuf::reflect::ProtobufValue for WsDocumentData {
 
 #[derive(Clone,PartialEq,Eq,Debug,Hash)]
 pub enum WsDataType {
-    Command = 0,
+    Acked = 0,
     Delta = 1,
 }
 
@@ -268,7 +268,7 @@ impl ::protobuf::ProtobufEnum for WsDataType {
 
     fn from_i32(value: i32) -> ::std::option::Option<WsDataType> {
         match value {
-            0 => ::std::option::Option::Some(WsDataType::Command),
+            0 => ::std::option::Option::Some(WsDataType::Acked),
             1 => ::std::option::Option::Some(WsDataType::Delta),
             _ => ::std::option::Option::None
         }
@@ -276,7 +276,7 @@ impl ::protobuf::ProtobufEnum for WsDataType {
 
     fn values() -> &'static [Self] {
         static values: &'static [WsDataType] = &[
-            WsDataType::Command,
+            WsDataType::Acked,
             WsDataType::Delta,
         ];
         values
@@ -295,7 +295,7 @@ impl ::std::marker::Copy for WsDataType {
 
 impl ::std::default::Default for WsDataType {
     fn default() -> Self {
-        WsDataType::Command
+        WsDataType::Acked
     }
 }
 
@@ -308,8 +308,8 @@ impl ::protobuf::reflect::ProtobufValue for WsDataType {
 static file_descriptor_proto_data: &'static [u8] = b"\
     \n\x08ws.proto\"Q\n\x0eWsDocumentData\x12\x0e\n\x02id\x18\x01\x20\x01(\t\
     R\x02id\x12\x1b\n\x02ty\x18\x02\x20\x01(\x0e2\x0b.WsDataTypeR\x02ty\x12\
-    \x12\n\x04data\x18\x03\x20\x01(\x0cR\x04data*$\n\nWsDataType\x12\x0b\n\
-    \x07Command\x10\0\x12\t\n\x05Delta\x10\x01J\xb9\x02\n\x06\x12\x04\0\0\n\
+    \x12\n\x04data\x18\x03\x20\x01(\x0cR\x04data*\"\n\nWsDataType\x12\t\n\
+    \x05Acked\x10\0\x12\t\n\x05Delta\x10\x01J\xb9\x02\n\x06\x12\x04\0\0\n\
     \x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x06\
     \x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x16\n\x0b\n\x04\x04\0\x02\0\x12\
     \x03\x03\x04\x12\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\n\n\x0c\n\
@@ -321,10 +321,10 @@ static file_descriptor_proto_data: &'static [u8] = b"\
     \x05\x12\x03\x05\x04\t\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\x05\n\x0e\n\
     \x0c\n\x05\x04\0\x02\x02\x03\x12\x03\x05\x11\x12\n\n\n\x02\x05\0\x12\x04\
     \x07\0\n\x01\n\n\n\x03\x05\0\x01\x12\x03\x07\x05\x0f\n\x0b\n\x04\x05\0\
-    \x02\0\x12\x03\x08\x04\x10\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x08\x04\
-    \x0b\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x08\x0e\x0f\n\x0b\n\x04\x05\0\
-    \x02\x01\x12\x03\t\x04\x0e\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\t\x04\t\
-    \n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\t\x0c\rb\x06proto3\
+    \x02\0\x12\x03\x08\x04\x0e\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x08\x04\t\
+    \n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x08\x0c\r\n\x0b\n\x04\x05\0\x02\x01\
+    \x12\x03\t\x04\x0e\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\t\x04\t\n\x0c\n\
+    \x05\x05\0\x02\x01\x02\x12\x03\t\x0c\rb\x06proto3\
 ";
 
 static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;

+ 1 - 1
rust-lib/flowy-document/src/protobuf/proto/ws.proto

@@ -6,6 +6,6 @@ message WsDocumentData {
     bytes data = 3;
 }
 enum WsDataType {
-    Command = 0;
+    Acked = 0;
     Delta = 1;
 }

+ 14 - 9
rust-lib/flowy-document/src/services/doc/edit_context.rs

@@ -67,7 +67,7 @@ impl EditDocContext {
         // Opti: it is necessary to save the rev if send success?
         let md5 = format!("{:x}", md5::compute(json));
         let revision = Revision::new(base_rev_id, rev_id, data.to_vec(), md5, self.id.clone().into());
-        self.save_revision(revision.clone(), pool.clone());
+        let _ = self.save_revision(revision.clone(), pool.clone())?;
         match self.ws.send(revision.into()) {
             Ok(_) => {
                 // TODO: remove the rev if send success
@@ -84,11 +84,11 @@ impl EditDocContext {
 impl EditDocContext {
     fn save_revision(&self, revision: Revision, pool: Arc<ConnectionPool>) -> Result<(), DocError> {
         let conn = &*pool.get().map_err(internal_error)?;
-        conn.immediate_transaction::<_, DocError, _>(|| {
-            let op_table: OpTable = revision.into();
-            let _ = self.op_sql.create_op_table(op_table, conn)?;
-            Ok(())
-        })?;
+        // conn.immediate_transaction::<_, DocError, _>(|| {
+        //     let op_table: OpTable = revision.into();
+        //     let _ = self.op_sql.create_op_table(op_table, conn)?;
+        //     Ok(())
+        // })?;
 
         Ok(())
     }
@@ -103,11 +103,16 @@ impl EditDocContext {
     }
 }
 
+use byteorder::{BigEndian, ReadBytesExt};
+use std::io::Cursor;
 impl WsDocumentHandler for EditDocContext {
-    fn receive(&self, data: WsDocumentData) {
-        match data.ty {
+    fn receive(&self, doc_data: WsDocumentData) {
+        match doc_data.ty {
             WsDataType::Delta => {},
-            WsDataType::Command => {},
+            WsDataType::Acked => {
+                let mut rdr = Cursor::new(doc_data.data);
+                let rev = rdr.read_i64::<BigEndian>().unwrap();
+            },
         }
     }
 }

+ 1 - 1
rust-lib/flowy-infra/src/macros.rs

@@ -1,5 +1,5 @@
 #[macro_export]
-macro_rules! dispatch_future {
+macro_rules! wrap_future_fn {
     ($fut:expr) => {
         ClosureFuture {
             fut: Box::pin(async move { $fut.await }),

+ 1 - 5
rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs

@@ -73,11 +73,7 @@ struct WsSenderImpl {
 
 impl WsDocumentSender for WsSenderImpl {
     fn send(&self, data: WsDocumentData) -> Result<(), DocError> {
-        let bytes: Bytes = data.try_into().unwrap();
-        let msg = WsMessage {
-            module: WsModule::Doc,
-            data: bytes.to_vec(),
-        };
+        let msg: WsMessage = data.into();
         let _ = self.user.send_ws_msg(msg).map_err(|e| DocError::internal().context(e))?;
         Ok(())
     }

+ 3 - 1
rust-lib/flowy-ws/src/connect.rs

@@ -130,7 +130,9 @@ fn post_message(tx: MsgSender, message: Result<Message, Error>) {
             Err(e) => log::error!("tx send error: {:?}", e),
         },
         Ok(_) => {},
-        Err(e) => log::error!("ws read error: {:?}", e),
+        Err(e) => {
+            log::error!("ws read error: {:?}", e)
+        },
     }
 }
 

+ 2 - 13
rust-lib/flowy-ws/src/msg.rs

@@ -1,7 +1,8 @@
+use crate::errors::WsError;
 use bytes::Bytes;
 use flowy_derive::{ProtoBuf, ProtoBuf_Enum};
 use std::convert::{TryFrom, TryInto};
-use tokio_tungstenite::tungstenite::Message as TokioMessage;
+use tokio_tungstenite::tungstenite::{Message as TokioMessage, Message};
 
 // Opti: using four bytes of the data to represent the source
 #[derive(ProtoBuf, Debug, Clone, Default)]
@@ -42,15 +43,3 @@ impl std::convert::Into<TokioMessage> for WsMessage {
         }
     }
 }
-
-impl std::convert::From<TokioMessage> for WsMessage {
-    fn from(value: TokioMessage) -> Self {
-        match value {
-            TokioMessage::Binary(bytes) => WsMessage::try_from(Bytes::from(bytes)).unwrap(),
-            _ => {
-                log::error!("WsMessage deserialize failed. Unsupported message");
-                WsMessage::default()
-            },
-        }
-    }
-}

+ 23 - 7
rust-lib/flowy-ws/src/ws.rs

@@ -4,12 +4,14 @@ use crate::{
     WsMessage,
     WsModule,
 };
+use bytes::Bytes;
 use flowy_net::errors::ServerError;
 use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
 use futures_core::{future::BoxFuture, ready, Stream};
 use pin_project::pin_project;
 use std::{
     collections::HashMap,
+    convert::{Infallible, TryFrom},
     future::Future,
     pin::Pin,
     sync::Arc,
@@ -169,6 +171,26 @@ pub struct WsHandlerFuture {
 
 impl WsHandlerFuture {
     fn new(handlers: HashMap<WsModule, Arc<dyn WsMessageHandler>>, msg_rx: MsgReceiver) -> Self { Self { msg_rx, handlers } }
+
+    fn handler_ws_message(&self, message: Message) {
+        match message {
+            Message::Binary(bytes) => self.handle_binary_message(bytes),
+            _ => {},
+        }
+    }
+
+    fn handle_binary_message(&self, bytes: Vec<u8>) {
+        let bytes = Bytes::from(bytes);
+        match WsMessage::try_from(bytes) {
+            Ok(message) => match self.handlers.get(&message.module) {
+                None => log::error!("Can't find any handler for message: {:?}", message),
+                Some(handler) => handler.receive_message(message.clone()),
+            },
+            Err(e) => {
+                log::error!("Deserialize binary ws message failed: {:?}", e);
+            },
+        }
+    }
 }
 
 impl Future for WsHandlerFuture {
@@ -179,13 +201,7 @@ impl Future for WsHandlerFuture {
                 None => {
                     return Poll::Ready(());
                 },
-                Some(message) => {
-                    let message = WsMessage::from(message);
-                    match self.handlers.get(&message.module) {
-                        None => log::error!("Can't find any handler for message: {:?}", message),
-                        Some(handler) => handler.receive_message(message.clone()),
-                    }
-                },
+                Some(message) => self.handler_ws_message(message),
             }
         }
     }