|
@@ -1,7 +1,6 @@
|
|
use crate::{
|
|
use crate::{
|
|
error::{Error, InternalError, SystemError},
|
|
error::{Error, InternalError, SystemError},
|
|
- module::{as_module_map, Event, Module, ModuleMap, ModuleRequest},
|
|
|
|
- request::Payload,
|
|
|
|
|
|
+ module::{as_module_map, Module, ModuleMap, ModuleRequest},
|
|
response::EventResponse,
|
|
response::EventResponse,
|
|
service::{Service, ServiceFactory},
|
|
service::{Service, ServiceFactory},
|
|
util::tokio_default_runtime,
|
|
util::tokio_default_runtime,
|
|
@@ -11,13 +10,7 @@ use futures_core::future::BoxFuture;
|
|
use futures_util::task::Context;
|
|
use futures_util::task::Context;
|
|
use lazy_static::lazy_static;
|
|
use lazy_static::lazy_static;
|
|
use pin_project::pin_project;
|
|
use pin_project::pin_project;
|
|
-use std::{
|
|
|
|
- convert::TryInto,
|
|
|
|
- fmt::{Debug, Display},
|
|
|
|
- future::Future,
|
|
|
|
- hash::Hash,
|
|
|
|
- sync::RwLock,
|
|
|
|
-};
|
|
|
|
|
|
+use std::{future::Future, sync::RwLock};
|
|
use tokio::macros::support::{Pin, Poll};
|
|
use tokio::macros::support::{Pin, Poll};
|
|
|
|
|
|
lazy_static! {
|
|
lazy_static! {
|
|
@@ -48,21 +41,27 @@ impl EventDispatch {
|
|
|
|
|
|
pub fn async_send<Req, Callback>(request: Req, callback: Callback) -> DispatchFuture
|
|
pub fn async_send<Req, Callback>(request: Req, callback: Callback) -> DispatchFuture
|
|
where
|
|
where
|
|
- Req: std::convert::Into<DispatchRequest>,
|
|
|
|
|
|
+ Req: std::convert::Into<ModuleRequest>,
|
|
Callback: FnOnce(EventResponse) -> BoxFuture<'static, ()> + 'static + Send + Sync,
|
|
Callback: FnOnce(EventResponse) -> BoxFuture<'static, ()> + 'static + Send + Sync,
|
|
{
|
|
{
|
|
- let mut request = request.into();
|
|
|
|
- request.callback = Some(Box::new(callback));
|
|
|
|
-
|
|
|
|
|
|
+ let request: ModuleRequest = request.into();
|
|
match EVENT_DISPATCH.read() {
|
|
match EVENT_DISPATCH.read() {
|
|
Ok(dispatch) => {
|
|
Ok(dispatch) => {
|
|
let dispatch = dispatch.as_ref().unwrap();
|
|
let dispatch = dispatch.as_ref().unwrap();
|
|
let module_map = dispatch.module_map.clone();
|
|
let module_map = dispatch.module_map.clone();
|
|
let service = Box::new(DispatchService { module_map });
|
|
let service = Box::new(DispatchService { module_map });
|
|
- log::trace!("{}: dispatch {:?} to runtime", &request.id, &request.event);
|
|
|
|
|
|
+ log::trace!(
|
|
|
|
+ "{}: dispatch {:?} to runtime",
|
|
|
|
+ &request.id(),
|
|
|
|
+ &request.event()
|
|
|
|
+ );
|
|
|
|
+ let service_ctx = DispatchContext {
|
|
|
|
+ request,
|
|
|
|
+ callback: Some(Box::new(callback)),
|
|
|
|
+ };
|
|
let join_handle = dispatch.runtime.spawn(async move {
|
|
let join_handle = dispatch.runtime.spawn(async move {
|
|
service
|
|
service
|
|
- .call(request)
|
|
|
|
|
|
+ .call(service_ctx)
|
|
.await
|
|
.await
|
|
.unwrap_or_else(|e| InternalError::new(format!("{:?}", e)).as_response())
|
|
.unwrap_or_else(|e| InternalError::new(format!("{:?}", e)).as_response())
|
|
});
|
|
});
|
|
@@ -87,7 +86,7 @@ impl EventDispatch {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- pub fn sync_send(request: DispatchRequest) -> EventResponse {
|
|
|
|
|
|
+ pub fn sync_send(request: ModuleRequest) -> EventResponse {
|
|
futures::executor::block_on(async {
|
|
futures::executor::block_on(async {
|
|
EventDispatch::async_send(request, |response| {
|
|
EventDispatch::async_send(request, |response| {
|
|
dbg!(&response);
|
|
dbg!(&response);
|
|
@@ -120,49 +119,16 @@ pub type BoxFutureCallback =
|
|
|
|
|
|
#[derive(Derivative)]
|
|
#[derive(Derivative)]
|
|
#[derivative(Debug)]
|
|
#[derivative(Debug)]
|
|
-pub struct DispatchRequest {
|
|
|
|
- pub id: String,
|
|
|
|
- pub event: Event,
|
|
|
|
- pub payload: Payload,
|
|
|
|
|
|
+pub struct DispatchContext {
|
|
|
|
+ pub request: ModuleRequest,
|
|
#[derivative(Debug = "ignore")]
|
|
#[derivative(Debug = "ignore")]
|
|
pub callback: Option<BoxFutureCallback>,
|
|
pub callback: Option<BoxFutureCallback>,
|
|
}
|
|
}
|
|
|
|
|
|
-impl DispatchRequest {
|
|
|
|
- pub fn new<E>(event: E) -> Self
|
|
|
|
- where
|
|
|
|
- E: Eq + Hash + Debug + Clone + Display,
|
|
|
|
- {
|
|
|
|
- Self {
|
|
|
|
- payload: Payload::None,
|
|
|
|
- event: event.into(),
|
|
|
|
- id: uuid::Uuid::new_v4().to_string(),
|
|
|
|
- callback: None,
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- pub fn payload<P>(mut self, payload: P) -> Self
|
|
|
|
- where
|
|
|
|
- P: Into<Payload>,
|
|
|
|
- {
|
|
|
|
- self.payload = payload.into();
|
|
|
|
- self
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- pub fn callback(mut self, callback: BoxFutureCallback) -> Self {
|
|
|
|
- self.callback = Some(callback);
|
|
|
|
- self
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
|
|
+impl DispatchContext {
|
|
pub(crate) fn into_parts(self) -> (ModuleRequest, Option<BoxFutureCallback>) {
|
|
pub(crate) fn into_parts(self) -> (ModuleRequest, Option<BoxFutureCallback>) {
|
|
- let DispatchRequest {
|
|
|
|
- event,
|
|
|
|
- payload,
|
|
|
|
- id,
|
|
|
|
- callback,
|
|
|
|
- } = self;
|
|
|
|
-
|
|
|
|
- (ModuleRequest::new(event.clone(), id, payload), callback)
|
|
|
|
|
|
+ let DispatchContext { request, callback } = self;
|
|
|
|
+ (request, callback)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -170,22 +136,19 @@ pub(crate) struct DispatchService {
|
|
pub(crate) module_map: ModuleMap,
|
|
pub(crate) module_map: ModuleMap,
|
|
}
|
|
}
|
|
|
|
|
|
-impl Service<DispatchRequest> for DispatchService {
|
|
|
|
|
|
+impl Service<DispatchContext> for DispatchService {
|
|
type Response = EventResponse;
|
|
type Response = EventResponse;
|
|
type Error = SystemError;
|
|
type Error = SystemError;
|
|
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
|
|
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
|
|
|
|
|
|
#[cfg_attr(
|
|
#[cfg_attr(
|
|
feature = "use_tracing",
|
|
feature = "use_tracing",
|
|
- tracing::instrument(
|
|
|
|
- name = "DispatchService",
|
|
|
|
- level = "debug",
|
|
|
|
- skip(self, dispatch_request)
|
|
|
|
- )
|
|
|
|
|
|
+ tracing::instrument(name = "DispatchService", level = "debug", skip(self, ctx))
|
|
)]
|
|
)]
|
|
- fn call(&self, dispatch_request: DispatchRequest) -> Self::Future {
|
|
|
|
|
|
+ fn call(&self, ctx: DispatchContext) -> Self::Future {
|
|
let module_map = self.module_map.clone();
|
|
let module_map = self.module_map.clone();
|
|
- let (request, callback) = dispatch_request.into_parts();
|
|
|
|
|
|
+ let (request, callback) = ctx.into_parts();
|
|
|
|
+
|
|
Box::pin(async move {
|
|
Box::pin(async move {
|
|
let result = {
|
|
let result = {
|
|
match module_map.get(&request.event()) {
|
|
match module_map.get(&request.event()) {
|