|
@@ -8,44 +8,35 @@ use crate::{
|
|
};
|
|
};
|
|
|
|
|
|
use crate::{
|
|
use crate::{
|
|
- request::{payload::Payload, FlowyRequest},
|
|
|
|
- response::{FlowyResponse, FlowyResponseBuilder},
|
|
|
|
|
|
+ request::{payload::Payload, EventRequest},
|
|
|
|
+ response::EventResponse,
|
|
service::{factory, BoxServiceFactory, HandlerService},
|
|
service::{factory, BoxServiceFactory, HandlerService},
|
|
};
|
|
};
|
|
use futures_core::{future::LocalBoxFuture, ready};
|
|
use futures_core::{future::LocalBoxFuture, ready};
|
|
use pin_project::pin_project;
|
|
use pin_project::pin_project;
|
|
use std::{
|
|
use std::{
|
|
- cell::RefCell,
|
|
|
|
collections::HashMap,
|
|
collections::HashMap,
|
|
- fmt::Debug,
|
|
|
|
future::Future,
|
|
future::Future,
|
|
- hash::Hash,
|
|
|
|
- marker::PhantomData,
|
|
|
|
pin::Pin,
|
|
pin::Pin,
|
|
- rc::Rc,
|
|
|
|
- sync::Arc,
|
|
|
|
task::{Context, Poll},
|
|
task::{Context, Poll},
|
|
};
|
|
};
|
|
-use tokio::sync::{
|
|
|
|
- mpsc,
|
|
|
|
- mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
|
|
|
|
-};
|
|
|
|
|
|
+use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
|
|
|
|
|
|
-pub type Command = String;
|
|
|
|
-pub type CommandServiceFactory = BoxServiceFactory<(), ServiceRequest, ServiceResponse, SystemError>;
|
|
|
|
|
|
+pub type Event = String;
|
|
|
|
+pub type EventServiceFactory = BoxServiceFactory<(), ServiceRequest, ServiceResponse, SystemError>;
|
|
|
|
|
|
pub struct Module {
|
|
pub struct Module {
|
|
name: String,
|
|
name: String,
|
|
data: DataContainer,
|
|
data: DataContainer,
|
|
- service_map: HashMap<Command, CommandServiceFactory>,
|
|
|
|
- req_tx: UnboundedSender<FlowyRequest>,
|
|
|
|
- req_rx: UnboundedReceiver<FlowyRequest>,
|
|
|
|
- resp_tx: UnboundedSender<FlowyResponse>,
|
|
|
|
|
|
+ service_map: HashMap<Event, EventServiceFactory>,
|
|
|
|
+ req_tx: UnboundedSender<EventRequest>,
|
|
|
|
+ req_rx: UnboundedReceiver<EventRequest>,
|
|
|
|
+ resp_tx: UnboundedSender<EventResponse>,
|
|
}
|
|
}
|
|
|
|
|
|
impl Module {
|
|
impl Module {
|
|
- pub fn new(resp_tx: UnboundedSender<FlowyResponse>) -> Self {
|
|
|
|
- let (req_tx, req_rx) = unbounded_channel::<FlowyRequest>();
|
|
|
|
|
|
+ pub fn new(resp_tx: UnboundedSender<EventResponse>) -> Self {
|
|
|
|
+ let (req_tx, req_rx) = unbounded_channel::<EventRequest>();
|
|
Self {
|
|
Self {
|
|
name: "".to_owned(),
|
|
name: "".to_owned(),
|
|
data: DataContainer::new(),
|
|
data: DataContainer::new(),
|
|
@@ -62,36 +53,38 @@ impl Module {
|
|
}
|
|
}
|
|
|
|
|
|
pub fn data<D: 'static>(mut self, data: D) -> Self {
|
|
pub fn data<D: 'static>(mut self, data: D) -> Self {
|
|
- let module_data = ModuleData::new(data);
|
|
|
|
- self.data.insert(module_data);
|
|
|
|
|
|
+ self.data.insert(ModuleData::new(data));
|
|
self
|
|
self
|
|
}
|
|
}
|
|
|
|
|
|
- pub fn event<H, T, R>(mut self, command: Command, handler: H) -> Self
|
|
|
|
|
|
+ pub fn event<H, T, R>(mut self, event: Event, handler: H) -> Self
|
|
where
|
|
where
|
|
H: Handler<T, R>,
|
|
H: Handler<T, R>,
|
|
T: FromRequest + 'static,
|
|
T: FromRequest + 'static,
|
|
R: Future + 'static,
|
|
R: Future + 'static,
|
|
R::Output: Responder + 'static,
|
|
R::Output: Responder + 'static,
|
|
{
|
|
{
|
|
- self.service_map.insert(command, factory(HandlerService::new(handler)));
|
|
|
|
|
|
+ if self.service_map.contains_key(&event) {
|
|
|
|
+ log::error!("Duplicate Event: {}", &event);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ self.service_map.insert(event, factory(HandlerService::new(handler)));
|
|
self
|
|
self
|
|
}
|
|
}
|
|
|
|
|
|
- pub fn can_handle(&self, cmd: &Command) -> bool { self.service_map.contains_key(cmd) }
|
|
|
|
|
|
+ pub fn req_tx(&self) -> UnboundedSender<EventRequest> { self.req_tx.clone() }
|
|
|
|
|
|
- pub fn req_tx(&self) -> UnboundedSender<FlowyRequest> { self.req_tx.clone() }
|
|
|
|
-
|
|
|
|
- pub fn handle(&self, request: FlowyRequest) {
|
|
|
|
|
|
+ pub fn handle(&self, request: EventRequest) {
|
|
|
|
+ log::trace!("Module: {} receive request: {:?}", self.name, request);
|
|
match self.req_tx.send(request) {
|
|
match self.req_tx.send(request) {
|
|
Ok(_) => {},
|
|
Ok(_) => {},
|
|
Err(e) => {
|
|
Err(e) => {
|
|
- log::error!("{:?}", e);
|
|
|
|
|
|
+ log::error!("Module: {} with error: {:?}", self.name, e);
|
|
},
|
|
},
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- pub fn service_sender_map(&self) -> HashMap<Command, UnboundedSender<FlowyRequest>> {
|
|
|
|
|
|
+ pub fn forward_map(&self) -> HashMap<Event, UnboundedSender<EventRequest>> {
|
|
self.service_map
|
|
self.service_map
|
|
.keys()
|
|
.keys()
|
|
.map(|key| (key.clone(), self.req_tx()))
|
|
.map(|key| (key.clone(), self.req_tx()))
|
|
@@ -105,7 +98,7 @@ impl Future for Module {
|
|
loop {
|
|
loop {
|
|
match ready!(Pin::new(&mut self.req_rx).poll_recv(cx)) {
|
|
match ready!(Pin::new(&mut self.req_rx).poll_recv(cx)) {
|
|
None => return Poll::Ready(()),
|
|
None => return Poll::Ready(()),
|
|
- Some(request) => match self.service_map.get(request.get_cmd()) {
|
|
|
|
|
|
+ Some(request) => match self.service_map.get(request.get_event()) {
|
|
Some(factory) => {
|
|
Some(factory) => {
|
|
let fut = ModuleServiceFuture {
|
|
let fut = ModuleServiceFuture {
|
|
request,
|
|
request,
|
|
@@ -113,14 +106,14 @@ impl Future for Module {
|
|
};
|
|
};
|
|
let resp_tx = self.resp_tx.clone();
|
|
let resp_tx = self.resp_tx.clone();
|
|
tokio::task::spawn_local(async move {
|
|
tokio::task::spawn_local(async move {
|
|
- let resp = fut.await.unwrap_or_else(|e| panic!());
|
|
|
|
|
|
+ let resp = fut.await.unwrap_or_else(|_e| panic!());
|
|
if let Err(e) = resp_tx.send(resp) {
|
|
if let Err(e) = resp_tx.send(resp) {
|
|
log::error!("{:?}", e);
|
|
log::error!("{:?}", e);
|
|
}
|
|
}
|
|
});
|
|
});
|
|
},
|
|
},
|
|
None => {
|
|
None => {
|
|
- log::error!("Command: {} handler not found", request.get_cmd());
|
|
|
|
|
|
+ log::error!("Event: {} handler not found", request.get_event());
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
}
|
|
@@ -131,18 +124,19 @@ impl Future for Module {
|
|
type BoxModuleService = BoxService<ServiceRequest, ServiceResponse, SystemError>;
|
|
type BoxModuleService = BoxService<ServiceRequest, ServiceResponse, SystemError>;
|
|
#[pin_project]
|
|
#[pin_project]
|
|
pub struct ModuleServiceFuture {
|
|
pub struct ModuleServiceFuture {
|
|
- request: FlowyRequest,
|
|
|
|
|
|
+ request: EventRequest,
|
|
#[pin]
|
|
#[pin]
|
|
fut: LocalBoxFuture<'static, Result<BoxModuleService, SystemError>>,
|
|
fut: LocalBoxFuture<'static, Result<BoxModuleService, SystemError>>,
|
|
}
|
|
}
|
|
|
|
|
|
impl Future for ModuleServiceFuture {
|
|
impl Future for ModuleServiceFuture {
|
|
- type Output = Result<FlowyResponse, SystemError>;
|
|
|
|
|
|
+ type Output = Result<EventResponse, SystemError>;
|
|
|
|
|
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
loop {
|
|
loop {
|
|
let service = ready!(self.as_mut().project().fut.poll(cx))?;
|
|
let service = ready!(self.as_mut().project().fut.poll(cx))?;
|
|
let req = ServiceRequest::new(self.as_mut().request.clone(), Payload::None);
|
|
let req = ServiceRequest::new(self.as_mut().request.clone(), Payload::None);
|
|
|
|
+ log::trace!("Call service to handle request {:?}", self.request);
|
|
let (_, resp) = ready!(Pin::new(&mut service.call(req)).poll(cx))?.into_parts();
|
|
let (_, resp) = ready!(Pin::new(&mut service.call(req)).poll(cx))?.into_parts();
|
|
return Poll::Ready(Ok(resp));
|
|
return Poll::Ready(Ok(resp));
|
|
}
|
|
}
|
|
@@ -156,29 +150,23 @@ mod tests {
|
|
use futures_util::{future, pin_mut};
|
|
use futures_util::{future, pin_mut};
|
|
use tokio::sync::mpsc::unbounded_channel;
|
|
use tokio::sync::mpsc::unbounded_channel;
|
|
|
|
|
|
- pub async fn hello_service() -> String {
|
|
|
|
- println!("no params");
|
|
|
|
- "hello".to_string()
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // #[tokio::test]
|
|
|
|
|
|
+ pub async fn hello_service() -> String { "hello".to_string() }
|
|
|
|
|
|
#[test]
|
|
#[test]
|
|
fn test() {
|
|
fn test() {
|
|
let mut runtime = Runtime::new().unwrap();
|
|
let mut runtime = Runtime::new().unwrap();
|
|
runtime.block_on(async {
|
|
runtime.block_on(async {
|
|
- let (resp_tx, mut resp_rx) = unbounded_channel::<FlowyResponse>();
|
|
|
|
- let command = "hello".to_string();
|
|
|
|
- let mut module = Module::new(resp_tx).event(command.clone(), hello_service);
|
|
|
|
- assert_eq!(module.can_handle(&command), true);
|
|
|
|
|
|
+ let (resp_tx, mut resp_rx) = unbounded_channel::<EventResponse>();
|
|
|
|
+ let event = "hello".to_string();
|
|
|
|
+ let mut module = Module::new(resp_tx).event(event.clone(), hello_service);
|
|
let req_tx = module.req_tx();
|
|
let req_tx = module.req_tx();
|
|
let mut event = async move {
|
|
let mut event = async move {
|
|
- let request = FlowyRequest::new(command.clone());
|
|
|
|
|
|
+ let request = EventRequest::new(event.clone());
|
|
req_tx.send(request).unwrap();
|
|
req_tx.send(request).unwrap();
|
|
|
|
|
|
match resp_rx.recv().await {
|
|
match resp_rx.recv().await {
|
|
Some(resp) => {
|
|
Some(resp) => {
|
|
- println!("{}", resp);
|
|
|
|
|
|
+ log::info!("{}", resp);
|
|
},
|
|
},
|
|
None => panic!(""),
|
|
None => panic!(""),
|
|
}
|
|
}
|