Przeglądaj źródła

refactor test module

appflowy 3 lat temu
rodzic
commit
07b4113dc1

+ 1 - 0
backend/Cargo.toml

@@ -57,6 +57,7 @@ tokio = { version = "1", features = ["full"] }
 flowy-user = { path = "../rust-lib/flowy-user" }
 flowy-workspace = { path = "../rust-lib/flowy-workspace" }
 flowy-document = { path = "../rust-lib/flowy-document" }
+flowy-ws = { path = "../rust-lib/flowy-ws" }
 flowy-net = { path = "../rust-lib/flowy-net", features = ["http_server"] }
 
 ormx = { version = "0.7", features = ["postgres"]}

+ 1 - 3
backend/src/service/user_service/logged_user.rs

@@ -1,10 +1,8 @@
 use crate::entities::token::{Claim, Token};
-
+use actix_web::http::HeaderValue;
 use chrono::{DateTime, Utc};
 use dashmap::DashMap;
 use flowy_net::errors::ServerError;
-
-use actix_web::http::HeaderValue;
 use lazy_static::lazy_static;
 
 lazy_static! {

+ 11 - 13
backend/src/service/ws_service/entities/connect.rs

@@ -7,8 +7,17 @@ use std::fmt::Formatter;
 pub type Socket = Recipient<ClientMessage>;
 
 #[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq)]
-pub struct SessionId {
-    pub id: String,
+pub struct SessionId(pub String);
+
+impl<T: AsRef<str>> std::convert::From<T> for SessionId {
+    fn from(s: T) -> Self { SessionId(s.as_ref().to_owned()) }
+}
+
+impl std::fmt::Display for SessionId {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let desc = format!("{}", &self.0);
+        f.write_str(&desc)
+    }
 }
 
 pub struct Session {
@@ -25,17 +34,6 @@ impl std::convert::From<Connect> for Session {
     }
 }
 
-impl SessionId {
-    pub fn new(id: String) -> Self { SessionId { id } }
-}
-
-impl std::fmt::Display for SessionId {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        let desc = format!("{}", &self.id);
-        f.write_str(&desc)
-    }
-}
-
 #[derive(Debug, Message, Clone)]
 #[rtype(result = "Result<(), ServerError>")]
 pub struct Connect {

+ 12 - 9
backend/src/service/ws_service/entities/message.rs

@@ -5,33 +5,36 @@ use std::fmt::Formatter;
 
 #[derive(Debug, Clone)]
 pub enum MessageData {
-    Text(String),
     Binary(Bytes),
     Connect(SessionId),
-    Disconnect(String),
+    Disconnect(SessionId),
 }
 
 #[derive(Debug, Message, Clone)]
 #[rtype(result = "()")]
 pub struct ClientMessage {
-    pub sid: SessionId,
+    pub session_id: SessionId,
     pub data: MessageData,
 }
 
 impl ClientMessage {
-    pub fn new(sid: SessionId, data: MessageData) -> Self { ClientMessage { sid, data } }
+    pub fn new<T: Into<SessionId>>(session_id: T, data: MessageData) -> Self {
+        ClientMessage {
+            session_id: session_id.into(),
+            data,
+        }
+    }
 }
 
 impl std::fmt::Display for ClientMessage {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
         let content = match &self.data {
-            MessageData::Text(t) => format!("[Text]: {}", t),
-            MessageData::Binary(_) => "[Binary message]".to_owned(),
-            MessageData::Connect(_) => "Connect".to_owned(),
-            MessageData::Disconnect(_) => "Disconnect".to_owned(),
+            MessageData::Binary(_) => "[Binary]".to_owned(),
+            MessageData::Connect(_) => "[Connect]".to_owned(),
+            MessageData::Disconnect(_) => "[Disconnect]".to_owned(),
         };
 
-        let desc = format!("{}:{}", &self.sid, content);
+        let desc = format!("{}:{}", &self.session_id, content);
         f.write_str(&desc)
     }
 }

+ 1 - 4
backend/src/service/ws_service/router.rs

@@ -20,10 +20,7 @@ pub async fn establish_ws_connection(
 ) -> Result<HttpResponse, Error> {
     match LoggedUser::from_token(token.clone()) {
         Ok(user) => {
-            let client = WSClient::new(
-                SessionId::new(user.user_id.clone()),
-                server.get_ref().clone(),
-            );
+            let client = WSClient::new(&user.user_id, server.get_ref().clone());
             let result = ws::start(client, &request, payload);
             match result {
                 Ok(response) => Ok(response.into()),

+ 51 - 66
backend/src/service/ws_service/ws_client.rs

@@ -1,20 +1,3 @@
-use std::time::Instant;
-
-use actix::{
-    fut,
-    Actor,
-    ActorContext,
-    ActorFutureExt,
-    Addr,
-    AsyncContext,
-    ContextFutureSpawner,
-    Handler,
-    Running,
-    StreamHandler,
-    WrapFuture,
-};
-use actix_web_actors::{ws, ws::Message::Text};
-
 use crate::{
     config::{HEARTBEAT_INTERVAL, PING_TIMEOUT},
     service::ws_service::{
@@ -24,37 +7,57 @@ use crate::{
         WSServer,
     },
 };
+use actix::*;
+use actix_web_actors::{ws, ws::Message::Text};
+use std::time::Instant;
 
+//    Frontend          │                       Backend
+//
+//                      │
+// ┌──────────┐   WsMessage   ┌───────────┐  ClientMessage    ┌──────────┐
+// │  user 1  │─────────┼────▶│ws_client_1│──────────────────▶│ws_server │
+// └──────────┘               └───────────┘                   └──────────┘
+//                      │                                           │
+//                WsMessage                                         ▼
+// ┌──────────┐         │     ┌───────────┐    ClientMessage     Group
+// │  user 2  │◀──────────────│ws_client_2│◀───────┐        ┌───────────────┐
+// └──────────┘         │     └───────────┘        │        │  ws_user_1    │
+//                                                 │        │               │
+//                      │                          └────────│  ws_user_2    │
+// ┌──────────┐               ┌───────────┐                 │               │
+// │  user 3  │─────────┼────▶│ws_client_3│                 └───────────────┘
+// └──────────┘               └───────────┘
+//                      │
 pub struct WSClient {
-    sid: SessionId,
+    session_id: SessionId,
     server: Addr<WSServer>,
     hb: Instant,
 }
 
 impl WSClient {
-    pub fn new(sid: SessionId, server: Addr<WSServer>) -> Self {
+    pub fn new<T: Into<SessionId>>(session_id: T, server: Addr<WSServer>) -> Self {
         Self {
-            sid,
+            session_id: session_id.into(),
             hb: Instant::now(),
             server,
         }
     }
 
     fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
-        ctx.run_interval(HEARTBEAT_INTERVAL, |ws_session, ctx| {
-            if Instant::now().duration_since(ws_session.hb) > PING_TIMEOUT {
-                ws_session.server.do_send(Disconnect {
-                    sid: ws_session.sid.clone(),
+        ctx.run_interval(HEARTBEAT_INTERVAL, |client, ctx| {
+            if Instant::now().duration_since(client.hb) > PING_TIMEOUT {
+                client.server.do_send(Disconnect {
+                    sid: client.session_id.clone(),
                 });
                 ctx.stop();
-                return;
+            } else {
+                ctx.ping(b"");
             }
-            ctx.ping(b"");
         });
     }
 
     fn send(&self, data: MessageData) {
-        let msg = ClientMessage::new(self.sid.clone(), data);
+        let msg = ClientMessage::new(self.session_id.clone(), data);
         self.server.do_send(msg);
     }
 }
@@ -67,18 +70,16 @@ impl Actor for WSClient {
         let socket = ctx.address().recipient();
         let connect = Connect {
             socket,
-            sid: self.sid.clone(),
+            sid: self.session_id.clone(),
         };
         self.server
             .send(connect)
             .into_actor(self)
-            .then(|res, _ws_session, _ctx| {
+            .then(|res, client, _ctx| {
                 match res {
-                    Ok(Ok(_)) => {},
-                    Ok(Err(_e)) => {
-                        unimplemented!()
-                    },
-                    Err(_e) => unimplemented!(),
+                    Ok(Ok(_)) => log::trace!("Send connect message to server success"),
+                    Ok(Err(e)) => log::error!("Send connect message to server failed: {:?}", e),
+                    Err(e) => log::error!("Send connect message to server failed: {:?}", e),
                 }
                 fut::ready(())
             })
@@ -87,7 +88,7 @@ impl Actor for WSClient {
 
     fn stopping(&mut self, _: &mut Self::Context) -> Running {
         self.server.do_send(Disconnect {
-            sid: self.sid.clone(),
+            sid: self.session_id.clone(),
         });
 
         Running::Stop
@@ -98,39 +99,33 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSClient {
     fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
         match msg {
             Ok(ws::Message::Ping(msg)) => {
-                log::debug!("Receive {} ping {:?}", &self.sid, &msg);
                 self.hb = Instant::now();
                 ctx.pong(&msg);
             },
             Ok(ws::Message::Pong(msg)) => {
-                log::debug!("Receive {} pong {:?}", &self.sid, &msg);
-                self.send(MessageData::Connect(self.sid.clone()));
+                log::debug!("Receive {} pong {:?}", &self.session_id, &msg);
                 self.hb = Instant::now();
             },
             Ok(ws::Message::Binary(bin)) => {
-                log::debug!(" Receive {} binary", &self.sid);
+                log::debug!(" Receive {} binary", &self.session_id);
                 self.send(MessageData::Binary(bin));
             },
+            Ok(Text(_)) => {
+                log::warn!("Receive unexpected text message");
+            },
             Ok(ws::Message::Close(reason)) => {
-                log::debug!("Receive {} close {:?}", &self.sid, &reason);
+                self.send(MessageData::Disconnect(self.session_id.clone()));
                 ctx.close(reason);
                 ctx.stop();
             },
-            Ok(ws::Message::Continuation(c)) => {
-                log::debug!("Receive {} continues message {:?}", &self.sid, &c);
-            },
-            Ok(ws::Message::Nop) => {
-                log::debug!("Receive Nop message");
-            },
-            Ok(Text(s)) => {
-                log::debug!("Receive {} text {:?}", &self.sid, &s);
-                self.send(MessageData::Text(s.to_string()));
-            },
-
+            Ok(ws::Message::Continuation(_)) => {},
+            Ok(ws::Message::Nop) => {},
             Err(e) => {
-                let msg = format!("{} error: {:?}", &self.sid, e);
-                log::error!("stream {}", msg);
-                ctx.text(msg);
+                log::error!(
+                    "[{}]: WebSocketStream protocol error {:?}",
+                    self.session_id,
+                    e
+                );
                 ctx.stop();
             },
         }
@@ -142,21 +137,11 @@ impl Handler<ClientMessage> for WSClient {
 
     fn handle(&mut self, msg: ClientMessage, ctx: &mut Self::Context) {
         match msg.data {
-            MessageData::Text(text) => {
-                ctx.text(text);
-            },
             MessageData::Binary(binary) => {
                 ctx.binary(binary);
             },
-            MessageData::Connect(sid) => {
-                let connect_msg = format!("{} connect", &sid);
-                ctx.text(connect_msg);
-            },
-            MessageData::Disconnect(text) => {
-                log::debug!("Session start disconnecting {}", self.sid);
-                ctx.text(text);
-                ctx.stop();
-            },
+            MessageData::Connect(_) => {},
+            MessageData::Disconnect(_) => {},
         }
     }
 }

+ 8 - 1
backend/src/service/ws_service/ws_server.rs

@@ -1,6 +1,7 @@
 use crate::service::ws_service::{
     entities::{Connect, Disconnect, Session, SessionId},
     ClientMessage,
+    MessageData,
 };
 use actix::{Actor, Context, Handler};
 use dashmap::DashMap;
@@ -46,7 +47,13 @@ impl Handler<Disconnect> for WSServer {
 impl Handler<ClientMessage> for WSServer {
     type Result = ();
 
-    fn handle(&mut self, _msg: ClientMessage, _ctx: &mut Context<Self>) -> Self::Result {}
+    fn handle(&mut self, msg: ClientMessage, _ctx: &mut Context<Self>) -> Self::Result {
+        match msg.data {
+            MessageData::Binary(_) => {},
+            MessageData::Connect(_) => {},
+            MessageData::Disconnect(_) => {},
+        }
+    }
 }
 
 impl actix::Supervised for WSServer {

+ 0 - 2
backend/tests/api/main.rs → backend/tests/api/mod.rs

@@ -1,5 +1,3 @@
 mod auth;
 mod doc;
-mod helper;
 mod workspace;
-mod ws;

+ 7 - 7
backend/tests/api/helper.rs → backend/tests/helper.rs

@@ -164,7 +164,7 @@ impl TestServer {
         doc
     }
 
-    pub(crate) async fn register_user(&self) -> SignUpResponse {
+    pub async fn register_user(&self) -> SignUpResponse {
         let params = SignUpParams {
             email: "[email protected]".to_string(),
             name: "annie".to_string(),
@@ -174,15 +174,15 @@ impl TestServer {
         self.register(params).await
     }
 
-    pub(crate) async fn register(&self, params: SignUpParams) -> SignUpResponse {
+    pub async fn register(&self, params: SignUpParams) -> SignUpResponse {
         let url = format!("{}/api/register", self.http_addr());
         let response = user_sign_up_request(params, &url).await.unwrap();
         response
     }
 
-    pub(crate) fn http_addr(&self) -> String { format!("http://{}", self.host) }
+    pub fn http_addr(&self) -> String { format!("http://{}", self.host) }
 
-    pub(crate) fn ws_addr(&self) -> String {
+    pub fn ws_addr(&self) -> String {
         format!(
             "ws://{}/ws/{}",
             self.host,
@@ -265,7 +265,7 @@ async fn drop_test_database(database_name: String) {
         .expect("Failed to drop database.");
 }
 
-pub(crate) async fn create_test_workspace(server: &TestServer) -> Workspace {
+pub async fn create_test_workspace(server: &TestServer) -> Workspace {
     let params = CreateWorkspaceParams {
         name: "My first workspace".to_string(),
         desc: "This is my first workspace".to_string(),
@@ -275,7 +275,7 @@ pub(crate) async fn create_test_workspace(server: &TestServer) -> Workspace {
     workspace
 }
 
-pub(crate) async fn create_test_app(server: &TestServer, workspace_id: &str) -> App {
+pub async fn create_test_app(server: &TestServer, workspace_id: &str) -> App {
     let params = CreateAppParams {
         workspace_id: workspace_id.to_owned(),
         name: "My first app".to_string(),
@@ -287,7 +287,7 @@ pub(crate) async fn create_test_app(server: &TestServer, workspace_id: &str) ->
     app
 }
 
-pub(crate) async fn create_test_view(application: &TestServer, app_id: &str) -> View {
+pub async fn create_test_view(application: &TestServer, app_id: &str) -> View {
     let name = "My first view".to_string();
     let desc = "This is my first view".to_string();
     let thumbnail = "http://1.png".to_string();

+ 3 - 0
backend/tests/main.rs

@@ -0,0 +1,3 @@
+mod api;
+pub mod helper;
+mod ws;

+ 0 - 12
backend/tests/api/ws.rs → backend/tests/ws/helper.rs

@@ -69,15 +69,3 @@ impl WsScriptRunner {
         }
     }
 }
-
-#[actix_rt::test]
-async fn ws_connect() {
-    let mut ws = WsTest::new(vec![
-        WsScript::SendText("abc"),
-        WsScript::SendText("abc"),
-        WsScript::SendText("abc"),
-        WsScript::Disconnect("abc"),
-    ])
-    .await;
-    ws.run_scripts().await
-}

+ 2 - 0
backend/tests/ws/mod.rs

@@ -0,0 +1,2 @@
+mod helper;
+mod ws;

+ 13 - 0
backend/tests/ws/ws.rs

@@ -0,0 +1,13 @@
+use crate::ws::helper::{WsScript, WsTest};
+
+#[actix_rt::test]
+async fn ws_connect() {
+    let mut ws = WsTest::new(vec![
+        WsScript::SendText("abc"),
+        WsScript::SendText("abc"),
+        WsScript::SendText("abc"),
+        WsScript::Disconnect("close by user"),
+    ])
+    .await;
+    ws.run_scripts().await
+}

+ 2 - 2
rust-lib/flowy-user/src/services/user/user_session.rs

@@ -18,10 +18,10 @@ use flowy_database::{
 };
 use flowy_infra::kv::KV;
 use flowy_sqlite::ConnectionPool;
-use flowy_ws::{connect::Retry, WsController, WsMessage, WsMessageHandler};
+use flowy_ws::{connect::Retry, WsController, WsMessageHandler};
 use parking_lot::RwLock;
 use serde::{Deserialize, Serialize};
-use std::{sync::Arc, time::Duration};
+use std::sync::Arc;
 
 pub struct UserSessionConfig {
     root_dir: String,

+ 3 - 5
rust-lib/flowy-ws/src/connect.rs

@@ -1,19 +1,17 @@
 use crate::{errors::WsError, MsgReceiver, MsgSender};
-use flowy_net::errors::ServerError;
 use futures_core::{future::BoxFuture, ready};
-use futures_util::{FutureExt, StreamExt, TryStreamExt};
+use futures_util::{FutureExt, StreamExt};
 use pin_project::pin_project;
 use std::{
     fmt,
     future::Future,
     pin::Pin,
-    sync::Arc,
     task::{Context, Poll},
 };
 use tokio::net::TcpStream;
 use tokio_tungstenite::{
     connect_async,
-    tungstenite::{handshake::client::Response, http::StatusCode, Error, Message},
+    tungstenite::{handshake::client::Response, Error, Message},
     MaybeTlsStream,
     WebSocketStream,
 };
@@ -160,7 +158,7 @@ where
 {
     type Output = ();
 
-    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
+    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
         (self.f)(&self.addr);
 
         Poll::Ready(())

+ 9 - 26
rust-lib/flowy-ws/src/ws.rs

@@ -1,28 +1,23 @@
-use crate::{connect::WsConnection, errors::WsError, WsMessage};
+use crate::{
+    connect::{Retry, WsConnection},
+    errors::WsError,
+    WsMessage,
+};
 use flowy_net::errors::ServerError;
 use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
-use futures_core::{ready, Stream};
-
-use crate::connect::Retry;
-use bytes::Buf;
-use futures_core::future::BoxFuture;
+use futures_core::{future::BoxFuture, ready, Stream};
 use pin_project::pin_project;
 use std::{
     collections::HashMap,
     future::Future,
-    marker::PhantomData,
     pin::Pin,
     sync::Arc,
     task::{Context, Poll},
 };
 use tokio::{sync::RwLock, task::JoinHandle};
-use tokio_tungstenite::{
-    tungstenite::{
-        protocol::{frame::coding::CloseCode, CloseFrame},
-        Message,
-    },
-    MaybeTlsStream,
-    WebSocketStream,
+use tokio_tungstenite::tungstenite::{
+    protocol::{frame::coding::CloseCode, CloseFrame},
+    Message,
 };
 
 pub type MsgReceiver = UnboundedReceiver<Message>;
@@ -188,18 +183,6 @@ impl Future for WsHandlers {
     }
 }
 
-// impl WsSender for WsController {
-//     fn send_msg(&self, msg: WsMessage) -> Result<(), WsError> {
-//         match self.ws_tx.as_ref() {
-//             None => Err(WsError::internal().context("Should call make_connect
-// first")),             Some(sender) => {
-//                 let _ = sender.unbounded_send(msg.into()).map_err(|e|
-// WsError::internal().context(e))?;                 Ok(())
-//             },
-//         }
-//     }
-// }
-
 #[derive(Debug, Clone)]
 pub struct WsSender {
     ws_tx: MsgSender,