瀏覽代碼

setup actix-web server

appflowy 3 年之前
父節點
當前提交
7adb75e85d

+ 1 - 1
rust-lib/flowy-observable/src/dart/stream_sender.rs

@@ -39,7 +39,7 @@ impl RustStreamSender {
         }
     }
 
-    pub fn post(observable_subject: ObservableSubject) -> Result<(), String> {
+    pub fn post(_observable_subject: ObservableSubject) -> Result<(), String> {
         #[cfg(feature = "dart")]
         match R2F_STREAM_SENDER.read() {
             Ok(stream) => stream.inner_post(observable_subject),

+ 0 - 1
rust-lib/flowy-ot/src/client/document/data.rs

@@ -1,6 +1,5 @@
 use crate::{client::DocumentData, errors::OTError};
 use serde::{Deserialize, Serialize};
-use serde_json::Error;
 
 impl<T: AsRef<str>> DocumentData for T {
     fn into_string(self) -> Result<String, OTError> { Ok(self.as_ref().to_string()) }

+ 30 - 0
server/Cargo.toml

@@ -0,0 +1,30 @@
+[package]
+name = "server"
+version = "0.1.0"
+edition = "2018"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+actix = "0.10"
+actix-web = "3"
+actix-http = "2.2.1"
+actix-web-actors = "3"
+actix-codec = "0.3"
+
+
+futures = "0.3.15"
+bytes = "0.5"
+toml = "0.5.8"
+dashmap = "4.0"
+log = "0.4.14"
+serde_json = "1.0"
+serde = { version = "1.0", features = ["derive"] }
+serde_repr = "0.1"
+
+[lib]
+path = "src/lib.rs"
+
+[[bin]]
+name = "flowy_server"
+path = "src/main.rs"

+ 18 - 0
server/rustfmt.toml

@@ -0,0 +1,18 @@
+# https://rust-lang.github.io/rustfmt/?version=master&search=
+max_width = 100
+tab_spaces = 4
+fn_single_line = true
+match_block_trailing_comma = true
+normalize_comments = true
+wrap_comments = true
+use_field_init_shorthand = true
+use_try_shorthand = true
+normalize_doc_attributes = true
+report_todo = "Always"
+report_fixme = "Always"
+imports_layout = "HorizontalVertical"
+merge_imports = true
+reorder_modules = true
+reorder_imports = true
+enum_discrim_align_threshold = 20
+edition = "2018"

+ 41 - 0
server/src/config/config.rs

@@ -0,0 +1,41 @@
+use std::convert::TryFrom;
+
+pub struct Config {
+    pub http_port: u16,
+}
+
+impl Config {
+    pub fn new() -> Self { Config { http_port: 3030 } }
+
+    pub fn server_addr(&self) -> String { format!("0.0.0.0:{}", self.http_port) }
+}
+
+pub enum Environment {
+    Local,
+    Production,
+}
+
+impl Environment {
+    #[allow(dead_code)]
+    pub fn as_str(&self) -> &'static str {
+        match self {
+            Environment::Local => "local",
+            Environment::Production => "production",
+        }
+    }
+}
+
+impl TryFrom<String> for Environment {
+    type Error = String;
+
+    fn try_from(s: String) -> Result<Self, Self::Error> {
+        match s.to_lowercase().as_str() {
+            "local" => Ok(Self::Local),
+            "production" => Ok(Self::Production),
+            other => Err(format!(
+                "{} is not a supported environment. Use either `local` or `production`.",
+                other
+            )),
+        }
+    }
+}

+ 4 - 0
server/src/config/const_define.rs

@@ -0,0 +1,4 @@
+use std::time::Duration;
+
+pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(8);
+pub const PING_TIMEOUT: Duration = Duration::from_secs(60);

+ 5 - 0
server/src/config/mod.rs

@@ -0,0 +1,5 @@
+mod config;
+mod const_define;
+
+pub use config::*;
+pub use const_define::*;

+ 19 - 0
server/src/context.rs

@@ -0,0 +1,19 @@
+use crate::{config::Config, ws::WSServer};
+use actix::Addr;
+use std::sync::Arc;
+
+pub struct AppContext {
+    pub config: Arc<Config>,
+    pub server: Addr<WSServer>,
+}
+
+impl AppContext {
+    pub fn new(server: Addr<WSServer>) -> Self {
+        AppContext {
+            config: Arc::new(Config::new()),
+            server,
+        }
+    }
+
+    pub fn ws_server(&self) -> Addr<WSServer> { self.server.clone() }
+}

+ 3 - 0
server/src/errors.rs

@@ -0,0 +1,3 @@
+pub struct ServerError {}
+
+// pub enum ErrorCode {}

+ 6 - 0
server/src/lib.rs

@@ -0,0 +1,6 @@
+mod config;
+mod context;
+mod errors;
+mod routers;
+pub mod startup;
+mod ws;

+ 10 - 0
server/src/main.rs

@@ -0,0 +1,10 @@
+use server::startup::{init_app_context, run};
+use std::net::TcpListener;
+
+#[actix_web::main]
+async fn main() -> std::io::Result<()> {
+    let app_ctx = init_app_context().await;
+    let listener =
+        TcpListener::bind(app_ctx.config.server_addr()).expect("Failed to bind server address");
+    run(app_ctx, listener)?.await
+}

+ 3 - 0
server/src/routers/mod.rs

@@ -0,0 +1,3 @@
+pub(crate) mod ws;
+
+pub use ws::*;

+ 22 - 0
server/src/routers/ws.rs

@@ -0,0 +1,22 @@
+use crate::ws::{entities::SessionId, WSServer, WSSession};
+use actix::Addr;
+use actix_web::{
+    get,
+    web::{Data, Path, Payload},
+    Error,
+    HttpRequest,
+    HttpResponse,
+};
+use actix_web_actors::ws;
+
+#[get("/{token}")]
+pub async fn start_connection(
+    request: HttpRequest,
+    payload: Payload,
+    Path(token): Path<String>,
+    server: Data<Addr<WSServer>>,
+) -> Result<HttpResponse, Error> {
+    let ws = WSSession::new(SessionId::new(token), server.get_ref().clone());
+    let response = ws::start(ws, &request, payload)?;
+    Ok(response.into())
+}

+ 25 - 0
server/src/startup.rs

@@ -0,0 +1,25 @@
+use crate::{context::AppContext, routers::*, ws::WSServer};
+use actix::Actor;
+use actix_web::{dev::Server, middleware, web, App, HttpServer, Scope};
+use std::{net::TcpListener, sync::Arc};
+
+pub fn run(app_ctx: Arc<AppContext>, listener: TcpListener) -> Result<Server, std::io::Error> {
+    let server = HttpServer::new(move || {
+        App::new()
+            .wrap(middleware::Logger::default())
+            .data(web::JsonConfig::default().limit(4096))
+            .service(ws_scope())
+            .data(app_ctx.ws_server())
+    })
+    .listen(listener)?
+    .run();
+    Ok(server)
+}
+
+fn ws_scope() -> Scope { web::scope("/ws").service(ws::start_connection) }
+
+pub async fn init_app_context() -> Arc<AppContext> {
+    let ws_server = WSServer::new().start();
+    let ctx = AppContext::new(ws_server);
+    Arc::new(ctx)
+}

+ 33 - 0
server/src/ws/entities/connect.rs

@@ -0,0 +1,33 @@
+use crate::{errors::ServerError, ws::Packet};
+use actix::{Message, Recipient};
+use serde::{Deserialize, Serialize};
+use std::fmt::Formatter;
+
+#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq)]
+pub struct SessionId {
+    pub id: String,
+}
+
+impl SessionId {
+    pub fn new(id: String) -> Self { SessionId { id } }
+}
+
+impl std::fmt::Display for SessionId {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let desc = format!("{}", &self.id);
+        f.write_str(&desc)
+    }
+}
+
+#[derive(Debug, Message, Clone)]
+#[rtype(result = "Result<(), ServerError>")]
+pub struct Connect {
+    pub socket: Recipient<Packet>,
+    pub sid: SessionId,
+}
+
+#[derive(Debug, Message, Clone)]
+#[rtype(result = "Result<(), ServerError>")]
+pub struct Disconnect {
+    pub sid: SessionId,
+}

+ 5 - 0
server/src/ws/entities/mod.rs

@@ -0,0 +1,5 @@
+pub use connect::*;
+pub use packet::*;
+
+mod connect;
+pub mod packet;

+ 37 - 0
server/src/ws/entities/packet.rs

@@ -0,0 +1,37 @@
+use crate::ws::entities::SessionId;
+use actix::Message;
+use bytes::Bytes;
+use std::fmt::Formatter;
+
+#[derive(Debug, Clone)]
+pub enum Frame {
+    Text(String),
+    Binary(Bytes),
+    Connect(SessionId),
+    Disconnect(String),
+}
+
+#[derive(Debug, Message, Clone)]
+#[rtype(result = "()")]
+pub struct Packet {
+    pub sid: SessionId,
+    pub frame: Frame,
+}
+
+impl Packet {
+    pub fn new(sid: SessionId, frame: Frame) -> Self { Packet { sid, frame } }
+}
+
+impl std::fmt::Display for Packet {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let content = match &self.frame {
+            Frame::Text(t) => format!("[Text]: {}", t),
+            Frame::Binary(_) => "[Binary message]".to_owned(),
+            Frame::Connect(_) => "Connect".to_owned(),
+            Frame::Disconnect(_) => "Disconnect".to_owned(),
+        };
+
+        let desc = format!("{}:{}", &self.sid, content);
+        f.write_str(&desc)
+    }
+}

+ 7 - 0
server/src/ws/mod.rs

@@ -0,0 +1,7 @@
+pub use entities::packet::*;
+pub use ws_server::*;
+pub use ws_session::*;
+
+pub(crate) mod entities;
+mod ws_server;
+mod ws_session;

+ 57 - 0
server/src/ws/ws_server.rs

@@ -0,0 +1,57 @@
+use crate::{
+    errors::ServerError,
+    ws::{
+        entities::{Connect, Disconnect, SessionId},
+        Packet,
+        WSSession,
+    },
+};
+use actix::{Actor, Context, Handler};
+use dashmap::DashMap;
+
+pub struct WSServer {
+    session_map: DashMap<SessionId, WSSession>,
+}
+
+impl WSServer {
+    pub fn new() -> Self {
+        Self {
+            session_map: DashMap::new(),
+        }
+    }
+
+    pub fn send(&self, _packet: Packet) { unimplemented!() }
+}
+
+impl Actor for WSServer {
+    type Context = Context<Self>;
+    fn started(&mut self, _ctx: &mut Self::Context) {}
+}
+
+impl Handler<Connect> for WSServer {
+    type Result = Result<(), ServerError>;
+    fn handle(&mut self, _msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
+        unimplemented!()
+    }
+}
+
+impl Handler<Disconnect> for WSServer {
+    type Result = Result<(), ServerError>;
+    fn handle(&mut self, _msg: Disconnect, _: &mut Context<Self>) -> Self::Result {
+        unimplemented!()
+    }
+}
+
+impl Handler<Packet> for WSServer {
+    type Result = ();
+
+    fn handle(&mut self, _packet: Packet, _ctx: &mut Context<Self>) -> Self::Result {
+        unimplemented!()
+    }
+}
+
+impl actix::Supervised for WSServer {
+    fn restarting(&mut self, _ctx: &mut Context<WSServer>) {
+        log::warn!("restarting");
+    }
+}

+ 163 - 0
server/src/ws/ws_session.rs

@@ -0,0 +1,163 @@
+use crate::{
+    config::{HEARTBEAT_INTERVAL, PING_TIMEOUT},
+    ws::{
+        entities::{Connect, Disconnect, SessionId},
+        Frame,
+        Packet,
+        WSServer,
+    },
+};
+use actix::{
+    fut,
+    Actor,
+    ActorContext,
+    ActorFuture,
+    Addr,
+    AsyncContext,
+    ContextFutureSpawner,
+    Handler,
+    Running,
+    StreamHandler,
+    WrapFuture,
+};
+
+use actix_web_actors::{ws, ws::Message::Text};
+use std::time::Instant;
+
+pub struct WSSession {
+    sid: SessionId,
+    server: Addr<WSServer>,
+    hb: Instant,
+}
+
+impl WSSession {
+    pub fn new(sid: SessionId, server: Addr<WSServer>) -> Self {
+        Self {
+            sid,
+            hb: Instant::now(),
+            server,
+        }
+    }
+
+    fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
+        ctx.run_interval(HEARTBEAT_INTERVAL, |ws_session, ctx| {
+            if Instant::now().duration_since(ws_session.hb) > PING_TIMEOUT {
+                ws_session.server.do_send(Disconnect {
+                    sid: ws_session.sid.clone(),
+                });
+                ctx.stop();
+                return;
+            }
+            ctx.ping(b"");
+        });
+    }
+
+    fn connect(&self, ctx: &mut ws::WebsocketContext<Self>) {
+        self.hb(ctx);
+        let socket = ctx.address().recipient();
+        let connect = Connect {
+            socket,
+            sid: self.sid.clone(),
+        };
+        self.server
+            .send(connect)
+            .into_actor(self)
+            .then(|res, _ws_session, _ctx| {
+                match res {
+                    Ok(Ok(_)) => {},
+                    Ok(Err(_e)) => {
+                        unimplemented!()
+                    },
+                    Err(_e) => unimplemented!(),
+                }
+                fut::ready(())
+            })
+            .wait(ctx);
+    }
+
+    fn send(&self, frame: Frame) {
+        let msg = Packet::new(self.sid.clone(), frame);
+        self.server.do_send(msg);
+    }
+}
+
+impl Actor for WSSession {
+    type Context = ws::WebsocketContext<Self>;
+
+    fn started(&mut self, ctx: &mut Self::Context) { self.connect(ctx); }
+
+    fn stopping(&mut self, _: &mut Self::Context) -> Running {
+        self.server.do_send(Disconnect {
+            sid: self.sid.clone(),
+        });
+
+        Running::Stop
+    }
+}
+
+impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSSession {
+    fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
+        match msg {
+            Ok(ws::Message::Ping(msg)) => {
+                log::debug!("Receive {} ping {:?}", &self.sid, &msg);
+                self.hb = Instant::now();
+                ctx.pong(&msg);
+            },
+            Ok(ws::Message::Pong(msg)) => {
+                log::debug!("Receive {} pong {:?}", &self.sid, &msg);
+                self.send(Frame::Connect(self.sid.clone()));
+                self.hb = Instant::now();
+            },
+            Ok(ws::Message::Binary(bin)) => {
+                log::debug!(" Receive {} binary", &self.sid);
+                self.send(Frame::Binary(bin));
+            },
+            Ok(ws::Message::Close(reason)) => {
+                log::debug!("Receive {} close {:?}", &self.sid, &reason);
+                ctx.close(reason);
+                ctx.stop();
+            },
+            Ok(ws::Message::Continuation(c)) => {
+                log::debug!("Receive {} continues message {:?}", &self.sid, &c);
+            },
+            Ok(ws::Message::Nop) => {
+                log::debug!("Receive Nop message");
+            },
+            Ok(Text(s)) => {
+                log::debug!("Receive {} text {:?}", &self.sid, &s);
+                self.send(Frame::Text(s));
+            },
+
+            Err(e) => {
+                let msg = format!("{} error: {:?}", &self.sid, e);
+                ctx.text(&msg);
+                log::error!("stream {}", msg);
+                ctx.stop();
+            },
+        }
+    }
+}
+
+impl Handler<Packet> for WSSession {
+    type Result = ();
+
+    fn handle(&mut self, msg: Packet, ctx: &mut Self::Context) {
+        match msg.frame {
+            Frame::Text(text) => {
+                ctx.text(text);
+            },
+            Frame::Binary(binary) => {
+                ctx.binary(binary);
+            },
+            Frame::Connect(sid) => {
+                let connect_msg = format!("{} connect", &sid);
+                ctx.text(connect_msg);
+            },
+            Frame::Disconnect(text) => {
+                log::debug!("Session start disconnecting {}", self.sid);
+                ctx.text(text);
+                ctx.stop();
+            },
+        }
+    }
+}