|
@@ -8,11 +8,19 @@ use crate::{
|
|
};
|
|
};
|
|
use derivative::*;
|
|
use derivative::*;
|
|
use futures_core::future::BoxFuture;
|
|
use futures_core::future::BoxFuture;
|
|
|
|
+use futures_util::task::Context;
|
|
use lazy_static::lazy_static;
|
|
use lazy_static::lazy_static;
|
|
|
|
+use pin_project::pin_project;
|
|
use std::{
|
|
use std::{
|
|
fmt::{Debug, Display},
|
|
fmt::{Debug, Display},
|
|
|
|
+ future::Future,
|
|
hash::Hash,
|
|
hash::Hash,
|
|
sync::RwLock,
|
|
sync::RwLock,
|
|
|
|
+ thread::JoinHandle,
|
|
|
|
+};
|
|
|
|
+use tokio::{
|
|
|
|
+ macros::support::{Pin, Poll},
|
|
|
|
+ task::JoinError,
|
|
};
|
|
};
|
|
|
|
|
|
lazy_static! {
|
|
lazy_static! {
|
|
@@ -30,9 +38,9 @@ impl EventDispatch {
|
|
F: FnOnce() -> Vec<Module>,
|
|
F: FnOnce() -> Vec<Module>,
|
|
{
|
|
{
|
|
let modules = module_factory();
|
|
let modules = module_factory();
|
|
|
|
+ log::debug!("{}", module_info(&modules));
|
|
let module_map = as_module_map(modules);
|
|
let module_map = as_module_map(modules);
|
|
let runtime = tokio_default_runtime().unwrap();
|
|
let runtime = tokio_default_runtime().unwrap();
|
|
-
|
|
|
|
let dispatch = EventDispatch {
|
|
let dispatch = EventDispatch {
|
|
module_map,
|
|
module_map,
|
|
runtime,
|
|
runtime,
|
|
@@ -41,115 +49,133 @@ impl EventDispatch {
|
|
*(EVENT_DISPATCH.write().unwrap()) = Some(dispatch);
|
|
*(EVENT_DISPATCH.write().unwrap()) = Some(dispatch);
|
|
}
|
|
}
|
|
|
|
|
|
- pub async fn async_send<T>(request: DispatchRequest<T>) -> Result<EventResponse, SystemError>
|
|
|
|
- where
|
|
|
|
- T: 'static + Debug + Send + Sync,
|
|
|
|
- {
|
|
|
|
|
|
+ pub fn async_send(request: DispatchRequest) -> DispatchFuture {
|
|
match EVENT_DISPATCH.read() {
|
|
match EVENT_DISPATCH.read() {
|
|
Ok(dispatch) => {
|
|
Ok(dispatch) => {
|
|
let dispatch = dispatch.as_ref().unwrap();
|
|
let dispatch = dispatch.as_ref().unwrap();
|
|
let module_map = dispatch.module_map.clone();
|
|
let module_map = dispatch.module_map.clone();
|
|
let service = Box::new(DispatchService { module_map });
|
|
let service = Box::new(DispatchService { module_map });
|
|
- dispatch
|
|
|
|
- .runtime
|
|
|
|
- .spawn(async move { service.call(request).await })
|
|
|
|
- .await
|
|
|
|
- .unwrap_or_else(|e| {
|
|
|
|
- let msg = format!("{:?}", e);
|
|
|
|
- Ok(InternalError::new(msg).as_response())
|
|
|
|
- })
|
|
|
|
|
|
+ log::trace!("{}: dispatch {:?} to runtime", &request.id, &request.event);
|
|
|
|
+ let join_handle = dispatch.runtime.spawn(async move {
|
|
|
|
+ service
|
|
|
|
+ .call(request)
|
|
|
|
+ .await
|
|
|
|
+ .unwrap_or_else(|e| InternalError::new(format!("{:?}", e)).as_response())
|
|
|
|
+ });
|
|
|
|
+
|
|
|
|
+ DispatchFuture {
|
|
|
|
+ fut: Box::pin(async move {
|
|
|
|
+ join_handle.await.unwrap_or_else(|e| {
|
|
|
|
+ InternalError::new(format!("Dispatch join error: {:?}", e))
|
|
|
|
+ .as_response()
|
|
|
|
+ })
|
|
|
|
+ }),
|
|
|
|
+ }
|
|
},
|
|
},
|
|
|
|
|
|
Err(e) => {
|
|
Err(e) => {
|
|
- let msg = format!("{:?}", e);
|
|
|
|
- Err(InternalError::new(msg).into())
|
|
|
|
|
|
+ let msg = format!("Dispatch runtime error: {:?}", e);
|
|
|
|
+ log::trace!("{}", msg);
|
|
|
|
+ DispatchFuture {
|
|
|
|
+ fut: Box::pin(async { InternalError::new(msg).as_response() }),
|
|
|
|
+ }
|
|
},
|
|
},
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- pub fn sync_send<T>(request: DispatchRequest<T>) -> Result<EventResponse, SystemError>
|
|
|
|
- where
|
|
|
|
- T: 'static + Debug + Send + Sync,
|
|
|
|
- {
|
|
|
|
|
|
+ pub fn sync_send(request: DispatchRequest) -> EventResponse {
|
|
futures::executor::block_on(async { EventDispatch::async_send(request).await })
|
|
futures::executor::block_on(async { EventDispatch::async_send(request).await })
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-pub type BoxStreamCallback<T> = Box<dyn FnOnce(T, EventResponse) + 'static + Send + Sync>;
|
|
|
|
|
|
+#[pin_project]
|
|
|
|
+pub struct DispatchFuture {
|
|
|
|
+ #[pin]
|
|
|
|
+ fut: BoxFuture<'static, EventResponse>,
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+impl Future for DispatchFuture {
|
|
|
|
+ type Output = EventResponse;
|
|
|
|
+
|
|
|
|
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
|
|
+ let this = self.as_mut().project();
|
|
|
|
+ loop {
|
|
|
|
+ return Poll::Ready(futures_core::ready!(this.fut.poll(cx)));
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+pub type BoxFutureCallback =
|
|
|
|
+ Box<dyn FnOnce(EventResponse) -> BoxFuture<'static, ()> + 'static + Send + Sync>;
|
|
|
|
|
|
#[derive(Derivative)]
|
|
#[derive(Derivative)]
|
|
#[derivative(Debug)]
|
|
#[derivative(Debug)]
|
|
-pub struct DispatchRequest<T>
|
|
|
|
-where
|
|
|
|
- T: 'static + Debug,
|
|
|
|
-{
|
|
|
|
- pub config: T,
|
|
|
|
|
|
+pub struct DispatchRequest {
|
|
|
|
+ pub id: String,
|
|
pub event: Event,
|
|
pub event: Event,
|
|
- pub payload: Option<Payload>,
|
|
|
|
|
|
+ pub payload: Payload,
|
|
#[derivative(Debug = "ignore")]
|
|
#[derivative(Debug = "ignore")]
|
|
- pub callback: Option<BoxStreamCallback<T>>,
|
|
|
|
|
|
+ pub callback: Option<BoxFutureCallback>,
|
|
}
|
|
}
|
|
|
|
|
|
-impl<T> DispatchRequest<T>
|
|
|
|
-where
|
|
|
|
- T: 'static + Debug,
|
|
|
|
-{
|
|
|
|
- pub fn new<E>(config: T, event: E) -> Self
|
|
|
|
|
|
+impl DispatchRequest {
|
|
|
|
+ pub fn new<E>(event: E) -> Self
|
|
where
|
|
where
|
|
E: Eq + Hash + Debug + Clone + Display,
|
|
E: Eq + Hash + Debug + Clone + Display,
|
|
{
|
|
{
|
|
Self {
|
|
Self {
|
|
- config,
|
|
|
|
- payload: None,
|
|
|
|
|
|
+ payload: Payload::None,
|
|
event: event.into(),
|
|
event: event.into(),
|
|
|
|
+ id: uuid::Uuid::new_v4().to_string(),
|
|
callback: None,
|
|
callback: None,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
pub fn payload(mut self, payload: Payload) -> Self {
|
|
pub fn payload(mut self, payload: Payload) -> Self {
|
|
- self.payload = Some(payload);
|
|
|
|
|
|
+ self.payload = payload;
|
|
self
|
|
self
|
|
}
|
|
}
|
|
|
|
|
|
- pub fn callback<F>(mut self, callback: F) -> Self
|
|
|
|
- where
|
|
|
|
- F: FnOnce(T, EventResponse) + 'static + Send + Sync,
|
|
|
|
- {
|
|
|
|
- self.callback = Some(Box::new(callback));
|
|
|
|
|
|
+ pub fn callback(mut self, callback: BoxFutureCallback) -> Self {
|
|
|
|
+ self.callback = Some(callback);
|
|
self
|
|
self
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ pub(crate) fn into_parts(self) -> (ModuleRequest, Option<BoxFutureCallback>) {
|
|
|
|
+ let DispatchRequest {
|
|
|
|
+ event,
|
|
|
|
+ payload,
|
|
|
|
+ id,
|
|
|
|
+ callback,
|
|
|
|
+ } = self;
|
|
|
|
+
|
|
|
|
+ (ModuleRequest::new(event.clone(), id, payload), callback)
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
pub(crate) struct DispatchService {
|
|
pub(crate) struct DispatchService {
|
|
pub(crate) module_map: ModuleMap,
|
|
pub(crate) module_map: ModuleMap,
|
|
}
|
|
}
|
|
|
|
|
|
-impl<T> Service<DispatchRequest<T>> for DispatchService
|
|
|
|
-where
|
|
|
|
- T: 'static + Debug + Send + Sync,
|
|
|
|
-{
|
|
|
|
|
|
+impl Service<DispatchRequest> for DispatchService {
|
|
type Response = EventResponse;
|
|
type Response = EventResponse;
|
|
type Error = SystemError;
|
|
type Error = SystemError;
|
|
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
|
|
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
|
|
|
|
|
|
- fn call(&self, dispatch_request: DispatchRequest<T>) -> Self::Future {
|
|
|
|
|
|
+ fn call(&self, dispatch_request: DispatchRequest) -> Self::Future {
|
|
let module_map = self.module_map.clone();
|
|
let module_map = self.module_map.clone();
|
|
- let DispatchRequest {
|
|
|
|
- config,
|
|
|
|
- event,
|
|
|
|
- payload,
|
|
|
|
- callback,
|
|
|
|
- } = dispatch_request;
|
|
|
|
-
|
|
|
|
- let mut request = ModuleRequest::new(event.clone());
|
|
|
|
- if let Some(payload) = payload {
|
|
|
|
- request = request.payload(payload);
|
|
|
|
- };
|
|
|
|
|
|
+ let (request, callback) = dispatch_request.into_parts();
|
|
Box::pin(async move {
|
|
Box::pin(async move {
|
|
let result = {
|
|
let result = {
|
|
- match module_map.get(&event) {
|
|
|
|
|
|
+ match module_map.get(&request.event()) {
|
|
Some(module) => {
|
|
Some(module) => {
|
|
let fut = module.new_service(());
|
|
let fut = module.new_service(());
|
|
|
|
+ log::trace!(
|
|
|
|
+ "{}: handle event: {:?} by {}",
|
|
|
|
+ request.id(),
|
|
|
|
+ request.event(),
|
|
|
|
+ module.name
|
|
|
|
+ );
|
|
let service_fut = fut.await?.call(request);
|
|
let service_fut = fut.await?.call(request);
|
|
service_fut.await
|
|
service_fut.await
|
|
},
|
|
},
|
|
@@ -158,17 +184,27 @@ where
|
|
"Can not find the module to handle the request:{:?}",
|
|
"Can not find the module to handle the request:{:?}",
|
|
request
|
|
request
|
|
);
|
|
);
|
|
|
|
+ log::trace!("{}", msg);
|
|
Err(InternalError::new(msg).into())
|
|
Err(InternalError::new(msg).into())
|
|
},
|
|
},
|
|
}
|
|
}
|
|
};
|
|
};
|
|
|
|
|
|
let response = result.unwrap_or_else(|e| e.into());
|
|
let response = result.unwrap_or_else(|e| e.into());
|
|
|
|
+ log::trace!("Dispatch result: {:?}", response);
|
|
if let Some(callback) = callback {
|
|
if let Some(callback) = callback {
|
|
- callback(config, response.clone());
|
|
|
|
|
|
+ callback(response.clone()).await;
|
|
}
|
|
}
|
|
|
|
|
|
Ok(response)
|
|
Ok(response)
|
|
})
|
|
})
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+fn module_info(modules: &Vec<Module>) -> String {
|
|
|
|
+ let mut info = format!("{} modules loaded\n", modules.len());
|
|
|
|
+ for module in modules {
|
|
|
|
+ info.push_str(&format!("-> {} loaded \n", module.name));
|
|
|
|
+ }
|
|
|
|
+ info
|
|
|
|
+}
|