Browse Source

add system

appflowy 3 years ago
parent
commit
85e8e4b8af

+ 8 - 1
rust-lib/flowy-sys/Cargo.toml

@@ -14,6 +14,13 @@ futures = "0.3.15"
 futures-util = "0.3.15"
 bytes = "0.5"
 tokio = { version = "1", features = ["sync"] }
+uuid = { version = "0.8", features = ["serde", "v4"] }
+log = "0.4.14"
+env_logger = "0.8"
+serde = { version = "1.0", features = ["derive"] }
+serde_json = "1.0"
+serde_with = "1.9.4"
 
 [dev-dependencies]
-tokio = { version = "1", features = ["full"] }
+tokio = { version = "1", features = ["full"] }
+futures-util = "0.3.15"

+ 1 - 0
rust-lib/flowy-sys/src/lib.rs

@@ -3,5 +3,6 @@ mod error;
 mod module;
 mod request;
 mod response;
+mod rt;
 mod service;
 mod util;

+ 103 - 34
rust-lib/flowy-sys/src/module/module.rs

@@ -7,43 +7,52 @@ use crate::{
     service::{BoxService, Handler, Service, ServiceFactory, ServiceRequest, ServiceResponse},
 };
 
+use crate::{
+    request::{payload::Payload, FlowyRequest},
+    response::{FlowyResponse, FlowyResponseBuilder},
+    service::{factory, BoxServiceFactory, HandlerService},
+};
 use futures_core::{future::LocalBoxFuture, ready};
+use pin_project::pin_project;
 use std::{
+    cell::RefCell,
     collections::HashMap,
+    fmt::Debug,
     future::Future,
     hash::Hash,
     marker::PhantomData,
     pin::Pin,
     rc::Rc,
+    sync::Arc,
     task::{Context, Poll},
 };
-use tokio::sync::{mpsc, mpsc::UnboundedReceiver};
-
-use crate::{
-    request::{payload::Payload, FlowyRequest},
-    service::{factory, BoxServiceFactory, HandlerService},
+use tokio::sync::{
+    mpsc,
+    mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
 };
-use pin_project::pin_project;
-use std::fmt::Debug;
 
 pub type Command = String;
-pub type ModuleServiceFactory = BoxServiceFactory<(), ServiceRequest, ServiceResponse, SystemError>;
+pub type CommandServiceFactory = BoxServiceFactory<(), ServiceRequest, ServiceResponse, SystemError>;
 
-#[pin_project::pin_project]
 pub struct Module {
     name: String,
     data: DataContainer,
-    fact_map: HashMap<Command, ModuleServiceFactory>,
-    cmd_rx: UnboundedReceiver<FlowyRequest>,
+    factory_map: HashMap<Command, CommandServiceFactory>,
+    req_tx: UnboundedSender<FlowyRequest>,
+    req_rx: UnboundedReceiver<FlowyRequest>,
+    resp_tx: UnboundedSender<FlowyResponse>,
 }
 
 impl Module {
-    pub fn new(cmd_rx: UnboundedReceiver<FlowyRequest>) -> Self {
+    pub fn new(resp_tx: UnboundedSender<FlowyResponse>) -> Self {
+        let (req_tx, req_rx) = unbounded_channel::<FlowyRequest>();
         Self {
             name: "".to_owned(),
             data: DataContainer::new(),
-            fact_map: HashMap::new(),
-            cmd_rx,
+            factory_map: HashMap::new(),
+            req_tx,
+            req_rx,
+            resp_tx,
         }
     }
 
@@ -65,51 +74,111 @@ impl Module {
         R: Future + 'static,
         R::Output: Responder + 'static,
     {
-        self.fact_map.insert(command, factory(HandlerService::new(handler)));
+        self.factory_map.insert(command, factory(HandlerService::new(handler)));
         self
     }
+
+    pub fn can_handle(&self, cmd: &Command) -> bool { self.factory_map.contains_key(cmd) }
+
+    pub fn req_tx(&self) -> UnboundedSender<FlowyRequest> { self.req_tx.clone() }
+
+    pub fn handle(&self, request: FlowyRequest) {
+        match self.req_tx.send(request) {
+            Ok(_) => {},
+            Err(e) => {
+                log::error!("{:?}", e);
+            },
+        }
+    }
 }
 
 impl Future for Module {
     type Output = ();
     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
         loop {
-            match ready!(Pin::new(&mut self.cmd_rx).poll_recv(cx)) {
+            match ready!(Pin::new(&mut self.req_rx).poll_recv(cx)) {
                 None => return Poll::Ready(()),
-                Some(request) => match self.fact_map.get(request.get_id()) {
+                Some(request) => match self.factory_map.get(request.get_cmd()) {
                     Some(factory) => {
-                        let service_future = factory.new_service(());
-                        tokio::task::spawn_local(ModuleServiceFuture {
+                        let fut = ModuleServiceFuture {
                             request,
-                            service_future,
+                            fut: factory.new_service(()),
+                        };
+                        let resp_tx = self.resp_tx.clone();
+                        tokio::task::spawn_local(async move {
+                            let resp = fut.await.unwrap_or_else(|e| panic!());
+                            if let Err(e) = resp_tx.send(resp) {
+                                log::error!("{:?}", e);
+                            }
                         });
                     },
-                    None => {},
+                    None => {
+                        log::error!("Command: {} handler not found", request.get_cmd());
+                    },
                 },
             }
         }
     }
 }
 
-#[pin_project(project = HandlerServiceProj)]
-pub struct ModuleServiceFuture<Service, Error> {
+type BoxModuleService = BoxService<ServiceRequest, ServiceResponse, SystemError>;
+#[pin_project]
+pub struct ModuleServiceFuture {
     request: FlowyRequest,
     #[pin]
-    service_future: LocalBoxFuture<'static, Result<Service, Error>>,
+    fut: LocalBoxFuture<'static, Result<BoxModuleService, SystemError>>,
 }
 
-impl<Service, Error> Future for ModuleServiceFuture<Service, Error> {
-    type Output = ();
+impl Future for ModuleServiceFuture {
+    type Output = Result<FlowyResponse, SystemError>;
 
-    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { unimplemented!() }
+    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        loop {
+            let service = ready!(self.as_mut().project().fut.poll(cx))?;
+            let req = ServiceRequest::new(self.as_mut().request.clone(), Payload::None);
+            let (_, resp) = ready!(Pin::new(&mut service.call(req)).poll(cx))?.into_parts();
+            return Poll::Ready(Ok(resp));
+        }
+    }
 }
 
-impl ServiceFactory<ServiceRequest> for Module {
-    type Response = ServiceResponse;
-    type Error = SystemError;
-    type Service = BoxService<ServiceRequest, ServiceResponse, SystemError>;
-    type Config = ();
-    type Future = LocalBoxFuture<'static, Result<Self::Service, Self::Error>>;
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::rt::Runtime;
+    use futures_util::{future, pin_mut};
+    use tokio::sync::mpsc::unbounded_channel;
 
-    fn new_service(&self, cfg: Self::Config) -> Self::Future { unimplemented!() }
+    pub async fn hello_service() -> String {
+        println!("no params");
+        "hello".to_string()
+    }
+
+    // #[tokio::test]
+
+    #[test]
+    fn test() {
+        let mut runtime = Runtime::new().unwrap();
+        runtime.block_on(async {
+            let (resp_tx, mut resp_rx) = unbounded_channel::<FlowyResponse>();
+            let command = "hello".to_string();
+            let mut module = Module::new(resp_tx).event(command.clone(), hello_service);
+            assert_eq!(module.can_handle(&command), true);
+            let req_tx = module.req_tx();
+            let mut event = async move {
+                let request = FlowyRequest::new(command.clone());
+                req_tx.send(request).unwrap();
+
+                match resp_rx.recv().await {
+                    Some(resp) => {
+                        println!("{}", resp);
+                    },
+                    None => panic!(""),
+                }
+            };
+
+            pin_mut!(module, event);
+            future::select(module, event).await;
+        });
+    }
 }

+ 10 - 3
rust-lib/flowy-sys/src/request/request.rs

@@ -7,16 +7,23 @@ use crate::{
 };
 use std::hash::Hash;
 
+#[derive(Clone, Debug)]
 pub struct FlowyRequest {
     id: String,
+    cmd: String,
 }
 
 impl FlowyRequest {
-    pub fn get_id(&self) -> &str { &self.id }
+    pub fn new(cmd: String) -> FlowyRequest {
+        Self {
+            id: uuid::Uuid::new_v4().to_string(),
+            cmd,
+        }
+    }
 }
 
-impl std::default::Default for FlowyRequest {
-    fn default() -> Self { Self { id: "".to_string() } }
+impl FlowyRequest {
+    pub fn get_cmd(&self) -> &str { &self.cmd }
 }
 
 pub trait FromRequest: Sized {

+ 13 - 0
rust-lib/flowy-sys/src/response/data.rs

@@ -1,8 +1,21 @@
+use serde::{Deserialize, Serialize};
+use std::{fmt, fmt::Formatter};
+
+#[derive(Debug, Serialize, Deserialize)]
 pub enum ResponseData {
     Bytes(Vec<u8>),
     None,
 }
 
+impl std::fmt::Display for ResponseData {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        match self {
+            ResponseData::Bytes(bytes) => f.write_fmt(format_args!("{} bytes", bytes.len())),
+            ResponseData::None => f.write_str("Empty"),
+        }
+    }
+}
+
 impl std::convert::Into<ResponseData> for String {
     fn into(self) -> ResponseData { ResponseData::Bytes(self.into_bytes()) }
 }

+ 17 - 1
rust-lib/flowy-sys/src/response/response.rs

@@ -3,16 +3,22 @@ use crate::{
     request::FlowyRequest,
     response::{data::ResponseData, Responder},
 };
+use serde::{Deserialize, Serialize};
+use serde_with::skip_serializing_none;
+use std::{fmt, fmt::Formatter};
 
-#[derive(Clone, Copy)]
+#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
 pub enum StatusCode {
     Success,
     Error,
 }
 
+#[skip_serializing_none]
+#[derive(Serialize, Deserialize, Debug)]
 pub struct FlowyResponse<T = ResponseData> {
     pub data: T,
     pub status: StatusCode,
+    #[serde(skip)]
     pub error: Option<SystemError>,
 }
 
@@ -26,6 +32,16 @@ impl FlowyResponse {
     }
 }
 
+impl std::fmt::Display for FlowyResponse {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        match serde_json::to_string(self) {
+            Ok(json) => f.write_fmt(format_args!("{:?}", json))?,
+            Err(e) => f.write_fmt(format_args!("{:?}", e))?,
+        }
+        Ok(())
+    }
+}
+
 impl Responder for FlowyResponse {
     #[inline]
     fn respond_to(self, _: &FlowyRequest) -> FlowyResponse { self }

+ 4 - 0
rust-lib/flowy-sys/src/rt/mod.rs

@@ -0,0 +1,4 @@
+mod runtime;
+mod system;
+
+pub use runtime::*;

+ 34 - 0
rust-lib/flowy-sys/src/rt/runtime.rs

@@ -0,0 +1,34 @@
+use std::{future::Future, io};
+use tokio::{runtime, task::LocalSet};
+
+#[derive(Debug)]
+pub struct Runtime {
+    local: LocalSet,
+    rt: runtime::Runtime,
+}
+
+impl Runtime {
+    pub fn new() -> io::Result<Runtime> {
+        let rt = runtime::Builder::new_multi_thread().enable_io().enable_time().build()?;
+
+        Ok(Runtime {
+            rt,
+            local: LocalSet::new(),
+        })
+    }
+
+    pub fn spawn<F>(&self, future: F) -> &Self
+    where
+        F: Future<Output = ()> + 'static,
+    {
+        self.local.spawn_local(future);
+        self
+    }
+
+    pub fn block_on<F>(&self, f: F) -> F::Output
+    where
+        F: Future + 'static,
+    {
+        self.local.block_on(&self.rt, f)
+    }
+}

+ 157 - 0
rust-lib/flowy-sys/src/rt/system.rs

@@ -0,0 +1,157 @@
+use crate::{
+    module::{Command, Module},
+    request::FlowyRequest,
+    response::FlowyResponse,
+    rt::Runtime,
+};
+use futures_core::{future::LocalBoxFuture, ready, task::Context};
+use futures_util::{future, pin_mut};
+use std::{cell::RefCell, future::Future, io, sync::Arc};
+use tokio::{
+    macros::support::{Pin, Poll},
+    sync::{
+        mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
+        oneshot,
+    },
+};
+
+thread_local!(
+    static CURRENT: RefCell<Option<Arc<FlowySystem>>> = RefCell::new(None);
+);
+
+pub struct FlowySystem {
+    resp_tx: UnboundedSender<FlowyResponse>,
+    modules: Vec<Module>,
+}
+
+impl FlowySystem {
+    pub fn construct<F>(module_factory: F) -> SystemRunner
+    where
+        F: FnOnce(UnboundedSender<FlowyResponse>) -> Vec<Module>,
+    {
+        let runtime = Runtime::new().unwrap();
+        let (resp_tx, mut resp_rx) = unbounded_channel::<FlowyResponse>();
+        let (stop_tx, stop_rx) = oneshot::channel();
+        let controller = SystemController { resp_rx, stop_tx };
+        runtime.spawn(controller);
+
+        let mut system = Self {
+            resp_tx: resp_tx.clone(),
+            modules: vec![],
+        };
+
+        let factory = module_factory(resp_tx.clone());
+        factory.into_iter().for_each(|m| {
+            runtime.spawn(m);
+            // system.add_module(m);
+        });
+
+        FlowySystem::set_current(system);
+
+        let runner = SystemRunner { rt: runtime, stop_rx };
+        runner
+    }
+
+    pub fn handle_command(&self, cmd: Command, request: FlowyRequest) {
+        self.modules.iter().for_each(|m| {
+            if m.can_handle(&cmd) {
+                m.handle(request.clone());
+            }
+        })
+    }
+
+    pub fn add_module(&mut self, module: Module) { self.modules.push(module); }
+
+    #[doc(hidden)]
+    pub fn set_current(sys: FlowySystem) {
+        CURRENT.with(|cell| {
+            *cell.borrow_mut() = Some(Arc::new(sys));
+        })
+    }
+
+    pub fn current() -> Arc<FlowySystem> {
+        CURRENT.with(|cell| match *cell.borrow() {
+            Some(ref sys) => sys.clone(),
+            None => panic!("System is not running"),
+        })
+    }
+
+    pub(crate) fn resp_tx(&self) -> UnboundedSender<FlowyResponse> { self.resp_tx.clone() }
+}
+
+struct SystemController {
+    resp_rx: UnboundedReceiver<FlowyResponse>,
+    stop_tx: oneshot::Sender<i32>,
+}
+
+impl Future for SystemController {
+    type Output = ();
+    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        loop {
+            match ready!(Pin::new(&mut self.resp_rx).poll_recv(cx)) {
+                None => return Poll::Ready(()),
+                Some(resp) => {
+                    // FFI
+                    println!("Receive response: {:?}", resp);
+                },
+            }
+        }
+    }
+}
+
+pub struct SystemRunner {
+    rt: Runtime,
+    stop_rx: oneshot::Receiver<i32>,
+}
+
+impl SystemRunner {
+    pub fn run(self) -> io::Result<()> {
+        let SystemRunner { rt, stop_rx } = self;
+        match rt.block_on(stop_rx) {
+            Ok(code) => {
+                if code != 0 {
+                    Err(io::Error::new(
+                        io::ErrorKind::Other,
+                        format!("Non-zero exit code: {}", code),
+                    ))
+                } else {
+                    Ok(())
+                }
+            },
+            Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
+        }
+    }
+
+    pub fn spawn<F>(self, future: F) -> Self
+    where
+        F: Future<Output = ()> + 'static,
+    {
+        self.rt.spawn(future);
+        self
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    pub async fn hello_service() -> String { "hello".to_string() }
+
+    #[test]
+    fn test() {
+        let command = "Hello".to_string();
+
+        FlowySystem::construct(|tx| {
+            vec![
+                Module::new(tx.clone()).event(command.clone(), hello_service),
+                // Module::new(tx.clone()).event(command.clone(), hello_service),
+            ]
+        })
+        .spawn(async {
+            let request = FlowyRequest::new(command.clone());
+            FlowySystem::current().handle_command(command, request);
+        })
+        .run()
+        .unwrap();
+    }
+}

+ 2 - 2
rust-lib/flowy-sys/src/service/handler.rs

@@ -125,8 +125,8 @@ where
             match self.as_mut().project() {
                 HandlerServiceProj::Extract(fut, req, handle) => {
                     match ready!(fut.poll(cx)) {
-                        Ok(item) => {
-                            let fut = handle.call(item);
+                        Ok(params) => {
+                            let fut = handle.call(params);
                             let state = HandlerServiceFuture::Handle(fut, req.take());
                             self.as_mut().set(state);
                         },

+ 2 - 0
rust-lib/flowy-sys/src/service/service.rs

@@ -42,4 +42,6 @@ pub struct ServiceResponse<T = ResponseData> {
 
 impl<T> ServiceResponse<T> {
     pub fn new(request: FlowyRequest, response: FlowyResponse<T>) -> Self { ServiceResponse { request, response } }
+
+    pub fn into_parts(self) -> (FlowyRequest, FlowyResponse<T>) { (self.request, self.response) }
 }