Browse Source

replace error type string with dispatch error

appflowy 3 năm trước cách đây
mục cha
commit
fed6117d04

+ 10 - 6
backend/Cargo.toml

@@ -6,14 +6,17 @@ edition = "2018"
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
 
 [dependencies]
-actix = "0.10"
-actix-web = "3"
-actix-http = "2.2.1"
-actix-web-actors = "3"
+actix = "0.12"
+#actix-web = "3"
+#actix-http = "2.2.1"
+#actix-web-actors = "3"
 actix-codec = "0.3"
+actix-web = "4.0.0-beta.8"
+actix-http = "3.0.0-beta.8"
+actix-web-actors = { version = "4.0.0-beta.6" }
 
 futures = "0.3.15"
-bytes = "0.5"
+bytes = "1"
 toml = "0.5.8"
 dashmap = "4.0"
 log = "0.4.14"
@@ -22,12 +25,13 @@ serde = { version = "1.0", features = ["derive"] }
 serde_repr = "0.1"
 derive_more = {version = "0.99", features = ["display"]}
 protobuf = {version = "2.20.0"}
+
 flowy-log = { path = "../rust-lib/flowy-log" }
 flowy-user = { path = "../rust-lib/flowy-user" }
 flowy-net = { path = "../rust-lib/flowy-net", features = ["http"] }
 
 [dependencies.sqlx]
-version = "0.5.2"
+version = "0.5.6"
 default-features = false
 features = [
     "runtime-actix-rustls",

+ 2 - 2
backend/src/routers/ws.rs

@@ -14,10 +14,10 @@ use actix_web_actors::ws;
 pub async fn start_connection(
     request: HttpRequest,
     payload: Payload,
-    Path(token): Path<String>,
+    path: Path<String>,
     server: Data<Addr<WSServer>>,
 ) -> Result<HttpResponse, Error> {
-    let client = WSClient::new(SessionId::new(token), server.get_ref().clone());
+    let client = WSClient::new(SessionId::new(path.clone()), server.get_ref().clone());
     let result = ws::start(client, &request, payload);
 
     match result {

+ 3 - 2
backend/src/ws_service/ws_client.rs

@@ -12,6 +12,7 @@ use actix::{
     Actor,
     ActorContext,
     ActorFuture,
+    ActorFutureExt,
     Addr,
     AsyncContext,
     ContextFutureSpawner,
@@ -123,13 +124,13 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSClient {
             },
             Ok(Text(s)) => {
                 log::debug!("Receive {} text {:?}", &self.sid, &s);
-                self.send(MessageData::Text(s));
+                self.send(MessageData::Text(s.to_string()));
             },
 
             Err(e) => {
                 let msg = format!("{} error: {:?}", &self.sid, e);
-                ctx.text(&msg);
                 log::error!("stream {}", msg);
+                ctx.text(msg);
                 ctx.stop();
             },
         }

+ 4 - 9
rust-lib/flowy-derive/src/proto_buf/deserialize.rs

@@ -23,20 +23,15 @@ pub fn make_de_token_steam(ctxt: &Ctxt, ast: &ASTContainer) -> Option<TokenStrea
 
     let de_token_stream: TokenStream = quote! {
         impl std::convert::TryFrom<&bytes::Bytes> for #struct_ident {
-            type Error = String;
+            type Error = ::protobuf::ProtobufError;
             fn try_from(bytes: &bytes::Bytes) -> Result<Self, Self::Error> {
-                let result: ::protobuf::ProtobufResult<crate::protobuf::#pb_ty> = ::protobuf::Message::parse_from_bytes(&bytes);
-                match result {
-                    Ok(mut pb) => {
-                        #struct_ident::try_from(&mut pb)
-                    }
-                    Err(e) => Err(format!("{:?}", e)),
-                }
+                let mut pb: crate::protobuf::#pb_ty = ::protobuf::Message::parse_from_bytes(&bytes)?;
+                #struct_ident::try_from(&mut pb)
             }
         }
 
         impl std::convert::TryFrom<&mut crate::protobuf::#pb_ty> for #struct_ident {
-            type Error = String;
+            type Error = ::protobuf::ProtobufError;
             fn try_from(pb: &mut crate::protobuf::#pb_ty) -> Result<Self, Self::Error> {
                 let mut o = Self::default();
                 #(#build_take_fields)*

+ 4 - 7
rust-lib/flowy-derive/src/proto_buf/serialize.rs

@@ -18,20 +18,17 @@ pub fn make_se_token_stream(ctxt: &Ctxt, ast: &ASTContainer) -> Option<TokenStre
     let se_token_stream: TokenStream = quote! {
 
         impl std::convert::TryInto<bytes::Bytes> for #struct_ident {
-            type Error = String;
+            type Error = ::protobuf::ProtobufError;
             fn try_into(self) -> Result<bytes::Bytes, Self::Error> {
                 use protobuf::Message;
                 let pb: crate::protobuf::#pb_ty = self.try_into()?;
-                let result: ::protobuf::ProtobufResult<Vec<u8>> = pb.write_to_bytes();
-                match result {
-                    Ok(bytes) => { Ok(bytes::Bytes::from(bytes)) },
-                    Err(e) => { Err(format!("{:?}", e)) }
-                }
+                let bytes = pb.write_to_bytes()?;
+                Ok(bytes::Bytes::from(bytes))
             }
         }
 
         impl std::convert::TryInto<crate::protobuf::#pb_ty> for #struct_ident {
-            type Error = String;
+            type Error = ::protobuf::ProtobufError;
             fn try_into(self) -> Result<crate::protobuf::#pb_ty, Self::Error> {
                 let mut pb = crate::protobuf::#pb_ty::new();
                 #(#build_set_pb_fields)*

+ 24 - 8
rust-lib/flowy-dispatch/src/byte_trait.rs

@@ -1,16 +1,29 @@
+use crate::errors::{DispatchError, InternalError};
 use bytes::Bytes;
+use protobuf::ProtobufError;
 
 // To bytes
 pub trait ToBytes {
-    fn into_bytes(self) -> Result<Bytes, String>;
+    fn into_bytes(self) -> Result<Bytes, DispatchError>;
 }
 
 #[cfg(feature = "use_protobuf")]
 impl<T> ToBytes for T
 where
-    T: std::convert::TryInto<Bytes, Error = String>,
+    T: std::convert::TryInto<Bytes, Error = protobuf::ProtobufError>,
 {
-    fn into_bytes(self) -> Result<Bytes, String> { self.try_into() }
+    fn into_bytes(self) -> Result<Bytes, DispatchError> {
+        match self.try_into() {
+            Ok(data) => Ok(data),
+            Err(e) => {
+                // let system_err: DispatchError = InternalError::new(format!("{:?}",
+                // e)).into(); system_err.into()
+                // Err(format!("{:?}", e))
+
+                Err(InternalError::ProtobufError(format!("{:?}", e)).into())
+            },
+        }
+    }
 }
 
 #[cfg(feature = "use_serde")]
@@ -18,10 +31,10 @@ impl<T> ToBytes for T
 where
     T: serde::Serialize,
 {
-    fn into_bytes(self) -> Result<Bytes, String> {
+    fn into_bytes(self) -> Result<Bytes, DispatchError> {
         match serde_json::to_string(&self.0) {
             Ok(s) => Ok(Bytes::from(s)),
-            Err(e) => Err(format!("{:?}", e)),
+            Err(e) => Err(InternalError::SerializeToBytes(format!("{:?}", e)).into()),
         }
     }
 }
@@ -29,16 +42,19 @@ where
 // From bytes
 
 pub trait FromBytes: Sized {
-    fn parse_from_bytes(bytes: Bytes) -> Result<Self, String>;
+    fn parse_from_bytes(bytes: Bytes) -> Result<Self, DispatchError>;
 }
 
 #[cfg(feature = "use_protobuf")]
 impl<T> FromBytes for T
 where
     // https://stackoverflow.com/questions/62871045/tryfromu8-trait-bound-in-trait
-    T: for<'a> std::convert::TryFrom<&'a Bytes, Error = String>,
+    T: for<'a> std::convert::TryFrom<&'a Bytes, Error = protobuf::ProtobufError>,
 {
-    fn parse_from_bytes(bytes: Bytes) -> Result<Self, String> { T::try_from(&bytes) }
+    fn parse_from_bytes(bytes: Bytes) -> Result<Self, DispatchError> {
+        let data = T::try_from(&bytes)?;
+        Ok(data)
+    }
 }
 
 #[cfg(feature = "use_serde")]

+ 15 - 14
rust-lib/flowy-dispatch/src/data.rs

@@ -37,7 +37,9 @@ where
             Payload::None => ready(Err(unexpected_none_payload(req))),
             Payload::Bytes(bytes) => match T::parse_from_bytes(bytes.clone()) {
                 Ok(data) => ready(Ok(Data(data))),
-                Err(e) => ready(Err(InternalError::new(format!("{}", e)).into())),
+                Err(e) => ready(Err(
+                    InternalError::DeserializeFromBytes(format!("{}", e)).into()
+                )),
             },
         }
     }
@@ -50,10 +52,7 @@ where
     fn respond_to(self, _request: &EventRequest) -> EventResponse {
         match self.into_inner().into_bytes() {
             Ok(bytes) => ResponseBuilder::Ok().data(bytes).build(),
-            Err(e) => {
-                let system_err: DispatchError = InternalError::new(format!("{}", e)).into();
-                system_err.into()
-            },
+            Err(e) => e.into(),
         }
     }
 }
@@ -62,7 +61,7 @@ impl<T> std::convert::TryFrom<&Payload> for Data<T>
 where
     T: FromBytes,
 {
-    type Error = String;
+    type Error = DispatchError;
     fn try_from(payload: &Payload) -> Result<Data<T>, Self::Error> { parse_payload(payload) }
 }
 
@@ -70,19 +69,21 @@ impl<T> std::convert::TryFrom<Payload> for Data<T>
 where
     T: FromBytes,
 {
-    type Error = String;
+    type Error = DispatchError;
     fn try_from(payload: Payload) -> Result<Data<T>, Self::Error> { parse_payload(&payload) }
 }
 
-fn parse_payload<T>(payload: &Payload) -> Result<Data<T>, String>
+fn parse_payload<T>(payload: &Payload) -> Result<Data<T>, DispatchError>
 where
     T: FromBytes,
 {
     match payload {
-        Payload::None => Err(format!("Parse fail, expected payload")),
-        Payload::Bytes(bytes) => match T::parse_from_bytes(bytes.clone()) {
-            Ok(data) => Ok(Data(data)),
-            Err(e) => Err(e),
+        Payload::None => {
+            Err(InternalError::UnexpectedNone(format!("Parse fail, expected payload")).into())
+        },
+        Payload::Bytes(bytes) => {
+            let data = T::parse_from_bytes(bytes.clone())?;
+            Ok(Data(data))
         },
     }
 }
@@ -91,7 +92,7 @@ impl<T> std::convert::TryInto<Payload> for Data<T>
 where
     T: ToBytes,
 {
-    type Error = String;
+    type Error = DispatchError;
 
     fn try_into(self) -> Result<Payload, Self::Error> {
         let inner = self.into_inner();
@@ -101,5 +102,5 @@ where
 }
 
 impl ToBytes for Data<String> {
-    fn into_bytes(self) -> Result<Bytes, String> { Ok(Bytes::from(self.0)) }
+    fn into_bytes(self) -> Result<Bytes, DispatchError> { Ok(Bytes::from(self.0)) }
 }

+ 8 - 6
rust-lib/flowy-dispatch/src/dispatch.rs

@@ -68,14 +68,17 @@ impl EventDispatch {
                     service
                         .call(service_ctx)
                         .await
-                        .unwrap_or_else(|e| InternalError::new(format!("{:?}", e)).as_response())
+                        .unwrap_or_else(|e| InternalError::Other(format!("{:?}", e)).as_response())
                 });
 
                 DispatchFuture {
                     fut: Box::pin(async move {
                         join_handle.await.unwrap_or_else(|e| {
-                            InternalError::new(format!("EVENT_DISPATCH join error: {:?}", e))
-                                .as_response()
+                            let error = InternalError::JoinError(format!(
+                                "EVENT_DISPATCH join error: {:?}",
+                                e
+                            ));
+                            error.as_response()
                         })
                     }),
                 }
@@ -83,9 +86,8 @@ impl EventDispatch {
 
             Err(e) => {
                 let msg = format!("EVENT_DISPATCH read failed. {:?}", e);
-                log::error!("{}", msg);
                 DispatchFuture {
-                    fut: Box::pin(async { InternalError::new(msg).as_response() }),
+                    fut: Box::pin(async { InternalError::Lock(msg).as_response() }),
                 }
             },
         }
@@ -165,7 +167,7 @@ impl Service<DispatchContext> for DispatchService {
                     None => {
                         let msg = format!("Can not find the event handler. {:?}", request);
                         log::error!("{}", msg);
-                        Err(InternalError::new(msg).into())
+                        Err(InternalError::HandleNotFound(msg).into())
                     },
                 }
             };

+ 52 - 43
rust-lib/flowy-dispatch/src/errors/errors.rs

@@ -5,9 +5,10 @@ use crate::{
 };
 use bytes::Bytes;
 use dyn_clone::DynClone;
+use protobuf::ProtobufError;
 use serde::{Serialize, Serializer};
 use std::{fmt, option::NoneError};
-use tokio::sync::mpsc::error::SendError;
+use tokio::{sync::mpsc::error::SendError, task::JoinError};
 
 pub trait Error: fmt::Debug + DynClone + Send + Sync {
     fn as_response(&self) -> EventResponse;
@@ -48,30 +49,31 @@ impl std::error::Error for DispatchError {
 
 impl From<SendError<EventRequest>> for DispatchError {
     fn from(err: SendError<EventRequest>) -> Self {
-        InternalError {
-            inner: format!("{}", err),
-        }
-        .into()
+        InternalError::Other(format!("{}", err)).into()
     }
 }
 
 impl From<NoneError> for DispatchError {
     fn from(s: NoneError) -> Self {
-        InternalError {
-            inner: format!("Unexpected none: {:?}", s),
-        }
-        .into()
+        InternalError::UnexpectedNone(format!("Unexpected none: {:?}", s)).into()
     }
 }
 
 impl From<String> for DispatchError {
-    fn from(s: String) -> Self { InternalError { inner: s }.into() }
+    fn from(s: String) -> Self { InternalError::Other(s).into() }
+}
+
+#[cfg(feature = "use_protobuf")]
+impl From<protobuf::ProtobufError> for DispatchError {
+    fn from(e: protobuf::ProtobufError) -> Self {
+        InternalError::ProtobufError(format!("{:?}", e)).into()
+    }
 }
 
 impl FromBytes for DispatchError {
-    fn parse_from_bytes(bytes: Bytes) -> Result<Self, String> {
+    fn parse_from_bytes(bytes: Bytes) -> Result<Self, DispatchError> {
         let s = String::from_utf8(bytes.to_vec()).unwrap();
-        Ok(InternalError { inner: s }.into())
+        Ok(InternalError::DeserializeFromBytes(s).into())
     }
 }
 
@@ -79,44 +81,51 @@ impl From<DispatchError> for EventResponse {
     fn from(err: DispatchError) -> Self { err.inner_error().as_response() }
 }
 
-#[derive(Clone)]
-pub(crate) struct InternalError<T: Clone> {
-    inner: T,
-}
-
-impl<T: Clone> InternalError<T> {
-    pub fn new(inner: T) -> Self { InternalError { inner } }
-}
-
-impl<T> fmt::Debug for InternalError<T>
-where
-    T: fmt::Debug + 'static + Clone + Send + Sync,
-{
-    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fmt::Debug::fmt(&self.inner, f) }
+impl Serialize for DispatchError {
+    fn serialize<S>(&self, serializer: S) -> Result<<S as Serializer>::Ok, <S as Serializer>::Error>
+    where
+        S: Serializer,
+    {
+        serializer.serialize_str(&format!("{}", self))
+    }
 }
 
-impl<T> fmt::Display for InternalError<T>
-where
-    T: fmt::Debug + fmt::Display + 'static + Clone + Send + Sync,
-{
-    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fmt::Display::fmt(&self.inner, f) }
+#[derive(Clone, Debug)]
+pub(crate) enum InternalError {
+    ProtobufError(String),
+    UnexpectedNone(String),
+    DeserializeFromBytes(String),
+    SerializeToBytes(String),
+    JoinError(String),
+    Lock(String),
+    ServiceNotFound(String),
+    HandleNotFound(String),
+    Other(String),
+}
+
+impl fmt::Display for InternalError {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        match self {
+            InternalError::ProtobufError(s) => fmt::Display::fmt(&s, f),
+            InternalError::UnexpectedNone(s) => fmt::Display::fmt(&s, f),
+            InternalError::DeserializeFromBytes(s) => fmt::Display::fmt(&s, f),
+            InternalError::SerializeToBytes(s) => fmt::Display::fmt(&s, f),
+            InternalError::JoinError(s) => fmt::Display::fmt(&s, f),
+            InternalError::Lock(s) => fmt::Display::fmt(&s, f),
+            InternalError::ServiceNotFound(s) => fmt::Display::fmt(&s, f),
+            InternalError::HandleNotFound(s) => fmt::Display::fmt(&s, f),
+            InternalError::Other(s) => fmt::Display::fmt(&s, f),
+        }
+    }
 }
 
-impl<T> Error for InternalError<T>
-where
-    T: fmt::Debug + fmt::Display + 'static + Clone + Send + Sync,
-{
+impl Error for InternalError {
     fn as_response(&self) -> EventResponse {
-        let error = format!("{}", self.inner).into_bytes();
+        let error = format!("{}", self).into_bytes();
         ResponseBuilder::Err().data(error).build()
     }
 }
 
-impl Serialize for DispatchError {
-    fn serialize<S>(&self, serializer: S) -> Result<<S as Serializer>::Ok, <S as Serializer>::Error>
-    where
-        S: Serializer,
-    {
-        serializer.serialize_str(&format!("{}", self))
-    }
+impl std::convert::From<JoinError> for InternalError {
+    fn from(e: JoinError) -> Self { InternalError::JoinError(format!("{}", e)) }
 }

+ 1 - 1
rust-lib/flowy-dispatch/src/module/data.rs

@@ -56,7 +56,7 @@ where
                 type_name::<T>()
             );
             log::error!("{}", msg,);
-            ready(Err(InternalError::new(msg).into()))
+            ready(Err(InternalError::Other(msg).into()))
         }
     }
 }

+ 7 - 1
rust-lib/flowy-dispatch/src/module/module.rs

@@ -192,7 +192,13 @@ impl Service<ModuleRequest> for ModuleService {
                 };
                 Box::pin(async move { Ok(fut.await.unwrap_or_else(|e| e.into())) })
             },
-            None => Box::pin(async { Err(InternalError::new("".to_string()).into()) }),
+            None => {
+                let msg = format!(
+                    "Can not find service factory for event: {:?}",
+                    request.event
+                );
+                Box::pin(async { Err(InternalError::ServiceNotFound(msg).into()) })
+            },
         }
     }
 }

+ 1 - 1
rust-lib/flowy-dispatch/src/request/request.rs

@@ -77,7 +77,7 @@ impl FromRequest for String {
 
 pub fn unexpected_none_payload(request: &EventRequest) -> DispatchError {
     log::warn!("{:?} expected payload", &request.event);
-    InternalError::new("Expected payload").into()
+    InternalError::UnexpectedNone("Expected payload".to_string()).into()
 }
 
 #[doc(hidden)]

+ 4 - 8
rust-lib/flowy-dispatch/src/response/response.rs

@@ -36,15 +36,11 @@ impl EventResponse {
         E: FromBytes,
     {
         if self.status_code == StatusCode::Err {
-            match <Data<E>>::try_from(self.payload) {
-                Ok(err) => Ok(Err(err.into_inner())),
-                Err(e) => Err(InternalError::new(e).into()),
-            }
+            let err = <Data<E>>::try_from(self.payload)?;
+            Ok(Err(err.into_inner()))
         } else {
-            match <Data<T>>::try_from(self.payload) {
-                Ok(a) => Ok(Ok(a.into_inner())),
-                Err(e) => Err(InternalError::new(e).into()),
-            }
+            let data = <Data<T>>::try_from(self.payload)?;
+            Ok(Ok(data.into_inner()))
         }
     }
 }

+ 3 - 1
rust-lib/flowy-net/Cargo.toml

@@ -11,12 +11,14 @@ protobuf = {version = "2.18.0"}
 serde = { version = "1.0", features = ["derive"] }
 serde_json = "1.0"
 serde_repr = "0.1"
-actix-web = {version = "3", optional = true}
 pin-project = "1.0.0"
 futures-core = { version = "0.3", default-features = false }
 log = "0.4"
 bytes = "1.0"
 lazy_static = "1.4.0"
+tokio = { version = "1", features = ["full"] }
+
+actix-web = {version = "4.0.0-beta.8", optional = true}
 
 [features]
 http = ["actix-web"]

+ 1 - 1
rust-lib/flowy-net/src/config.rs

@@ -1,6 +1,6 @@
 use lazy_static::lazy_static;
 
-pub const HOST: &'static str = "0.0.0.0:3030";
+pub const HOST: &'static str = "http://0.0.0.0:3030";
 
 lazy_static! {
     pub static ref SIGN_UP_URL: String = format!("{}/user/register", HOST);

+ 4 - 1
rust-lib/flowy-net/src/errors.rs

@@ -1,6 +1,5 @@
 use crate::response::FlowyResponse;
 use protobuf::ProtobufError;
-
 use std::fmt::{Formatter, Write};
 
 #[derive(Debug)]
@@ -36,3 +35,7 @@ impl std::convert::From<reqwest::Error> for NetworkError {
         NetworkError::InternalError(msg)
     }
 }
+
+impl std::convert::From<String> for NetworkError {
+    fn from(error: String) -> Self { NetworkError::InternalError(error) }
+}

+ 37 - 38
rust-lib/flowy-net/src/request/request.rs

@@ -1,47 +1,46 @@
-use crate::errors::NetworkError;
+use crate::{errors::NetworkError, future::ResultFuture};
 use bytes::Bytes;
-use protobuf::Message;
-use reqwest::{Client, Response};
-use std::{convert::TryFrom, time::Duration};
+use protobuf::{Message, ProtobufError};
+use reqwest::{Client, Error, Response};
+use std::{
+    convert::{TryFrom, TryInto},
+    time::Duration,
+};
+use tokio::sync::{oneshot, oneshot::error::RecvError};
 
-pub struct FlowyRequest {
-    client: Client,
+pub async fn http_post<T1, T2>(url: &str, data: T1) -> ResultFuture<T2, NetworkError>
+where
+    T1: TryInto<Bytes, Error = ProtobufError> + Send + Sync + 'static,
+    T2: TryFrom<Bytes, Error = ProtobufError> + Send + Sync + 'static,
+{
+    let url = url.to_owned();
+    ResultFuture::new(async move { post(url, data).await })
 }
 
-impl FlowyRequest {
-    pub fn new() -> Self {
-        let client = default_client();
-        Self { client }
-    }
-
-    pub async fn get<T>(&self, url: &str) -> Result<T, NetworkError>
-    where
-        T: Message,
-    {
-        let url = url.to_owned();
-        let response = self.client.get(&url).send().await?;
-        parse_response(response).await
-    }
+pub async fn post<T1, T2>(url: String, data: T1) -> Result<T2, NetworkError>
+where
+    T1: TryInto<Bytes, Error = ProtobufError>,
+    T2: TryFrom<Bytes, Error = ProtobufError>,
+{
+    let request_bytes: Bytes = data.try_into()?;
+    let (tx, rx) = oneshot::channel::<Result<Response, _>>();
 
-    pub async fn post<T>(&self, url: &str, data: T) -> Result<T, NetworkError>
-    where
-        T: Message,
-    {
-        let url = url.to_owned();
-        let body = data.write_to_bytes()?;
-        let response = self.client.post(&url).body(body).send().await?;
-        parse_response(response).await
-    }
+    tokio::spawn(async move {
+        let client = default_client();
+        let response = client.post(&url).body(request_bytes).send().await;
+        tx.send(response);
+    });
 
-    pub async fn post_data<T>(&self, url: &str, bytes: Vec<u8>) -> Result<T, NetworkError>
-    where
-        T: for<'a> TryFrom<&'a Vec<u8>>,
-    {
-        let url = url.to_owned();
-        let response = self.client.post(&url).body(bytes).send().await?;
-        let bytes = response.bytes().await?.to_vec();
-        let data = T::try_from(&bytes).map_err(|_e| panic!("")).unwrap();
-        Ok(data)
+    match rx.await {
+        Ok(response) => {
+            let response = response?;
+            let response_bytes = response.bytes().await?;
+            let data = T2::try_from(response_bytes)?;
+            Ok(data)
+        },
+        Err(e) => {
+            unimplemented!()
+        },
     }
 }
 

+ 7 - 3
rust-lib/flowy-net/src/response/response_http.rs

@@ -1,9 +1,9 @@
 use crate::{errors::NetworkError, response::*};
-use actix_web::{body::Body, error::ResponseError, HttpResponse};
+use actix_web::{body::Body, error::ResponseError, BaseHttpResponse, HttpResponse};
 use serde::Serialize;
 
-impl ResponseError for NetworkError {
-    fn error_response(&self) -> HttpResponse {
+impl NetworkError {
+    fn http_response(&self) -> HttpResponse {
         match self {
             NetworkError::InternalError(msg) => {
                 let resp = FlowyResponse::from_msg(&msg, ServerCode::InternalError);
@@ -18,6 +18,10 @@ impl ResponseError for NetworkError {
     }
 }
 
+impl ResponseError for NetworkError {
+    fn error_response(&self) -> HttpResponse { self.http_response().into() }
+}
+
 impl<T: Serialize> std::convert::Into<HttpResponse> for FlowyResponse<T> {
     fn into(self) -> HttpResponse {
         match serde_json::to_string(&self) {

+ 9 - 3
rust-lib/flowy-test/src/tester.rs

@@ -64,9 +64,15 @@ pub trait TesterTrait {
     where
         P: ToBytes,
     {
-        let bytes = payload.into_bytes().unwrap();
-        let module_request = self.mut_context().request.take().unwrap();
-        self.mut_context().request = Some(module_request.payload(bytes));
+        match payload.into_bytes() {
+            Ok(bytes) => {
+                let module_request = self.mut_context().request.take().unwrap();
+                self.mut_context().request = Some(module_request.payload(bytes));
+            },
+            Err(e) => {
+                log::error!("Set payload failed: {:?}", e);
+            },
+        }
     }
 
     fn sync_send(&mut self) {

+ 5 - 19
rust-lib/flowy-user/src/services/user/user_server.rs

@@ -3,10 +3,11 @@ use crate::{
     errors::{ErrorBuilder, UserErrCode, UserError},
 };
 
-use flowy_net::{future::ResultFuture, request::FlowyRequest};
+use bytes::Bytes;
+use flowy_net::{config::SIGN_UP_URL, future::ResultFuture, request::http_post};
 use std::sync::Arc;
 
-pub(crate) trait UserServer {
+pub trait UserServer {
     fn sign_up(&self, params: SignUpParams) -> ResultFuture<SignUpResponse, UserError>;
     fn sign_in(&self, params: SignInParams) -> ResultFuture<SignInResponse, UserError>;
     fn sign_out(&self, user_id: &str) -> ResultFuture<(), UserError>;
@@ -25,23 +26,8 @@ pub struct UserServerImpl {}
 impl UserServerImpl {}
 
 impl UserServer for UserServerImpl {
-    fn sign_up(&self, _params: SignUpParams) -> ResultFuture<SignUpResponse, UserError> {
-        // let bytes: Vec<u8> = params.try_into().unwrap();
-        // ResultFuture::new(async move {
-        //     match FlowyRequest::new()
-        //         .post_data::<SignUpResponse>("SIGN_UP_URL.as_ref()", bytes)
-        //         .await
-        //     {
-        //         Ok(a) => {},
-        //         Err(err) => {},
-        //     }
-        //
-        //     Ok(SignUpResponse {
-        //         uid: "".to_string(),
-        //         name: "".to_string(),
-        //         email: "".to_string(),
-        //     })
-        // })
+    fn sign_up(&self, params: SignUpParams) -> ResultFuture<SignUpResponse, UserError> {
+        // http_post(SIGN_UP_URL.as_ref(), params)
         unimplemented!()
     }
 

+ 8 - 3
rust-lib/flowy-workspace/src/observable/observable.rs

@@ -1,6 +1,6 @@
 use bytes::Bytes;
 use flowy_derive::ProtoBuf_Enum;
-use flowy_dispatch::prelude::ToBytes;
+use flowy_dispatch::prelude::{DispatchError, ToBytes};
 use flowy_observable::{dart::RustStreamSender, entities::ObservableSubject};
 
 const OBSERVABLE_CATEGORY: &'static str = "Workspace";
@@ -47,8 +47,13 @@ impl ObservableSender {
     where
         T: ToBytes,
     {
-        let bytes = payload.into_bytes().unwrap();
-        self.payload = Some(bytes);
+        match payload.into_bytes() {
+            Ok(bytes) => self.payload = Some(bytes),
+            Err(e) => {
+                log::error!("Set observable payload failed: {:?}", e);
+            },
+        }
+
         self
     }