|
@@ -1,14 +1,14 @@
|
|
|
use crate::{
|
|
|
error::{InternalError, SystemError},
|
|
|
- module::{Event, ModuleRequest},
|
|
|
+ module::ModuleRequest,
|
|
|
request::{EventRequest, Payload},
|
|
|
response::EventResponse,
|
|
|
- sender::{SenderData, SenderPayload},
|
|
|
+ sender::SenderRequest,
|
|
|
service::{BoxService, Service, ServiceFactory},
|
|
|
system::ModuleMap,
|
|
|
};
|
|
|
use futures_core::{future::LocalBoxFuture, ready, task::Context};
|
|
|
-use std::future::Future;
|
|
|
+use std::{fmt::Debug, future::Future};
|
|
|
use tokio::{
|
|
|
macros::support::{Pin, Poll},
|
|
|
sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
|
|
@@ -17,13 +17,13 @@ use tokio::{
|
|
|
macro_rules! service_factor_impl {
|
|
|
($name:ident) => {
|
|
|
#[allow(non_snake_case, missing_docs)]
|
|
|
- impl<T> ServiceFactory<SenderData<T>> for $name<T>
|
|
|
+ impl<T> ServiceFactory<SenderRequest<T>> for $name<T>
|
|
|
where
|
|
|
- T: 'static,
|
|
|
+ T: 'static + Debug,
|
|
|
{
|
|
|
type Response = EventResponse;
|
|
|
type Error = SystemError;
|
|
|
- type Service = BoxService<SenderData<T>, Self::Response, Self::Error>;
|
|
|
+ type Service = BoxService<SenderRequest<T>, Self::Response, Self::Error>;
|
|
|
type Context = ();
|
|
|
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::Error>>;
|
|
|
|
|
@@ -40,24 +40,27 @@ struct SenderService {
|
|
|
module_map: ModuleMap,
|
|
|
}
|
|
|
|
|
|
-impl<T> Service<SenderData<T>> for SenderService
|
|
|
+impl<T> Service<SenderRequest<T>> for SenderService
|
|
|
where
|
|
|
- T: 'static,
|
|
|
+ T: 'static + Debug,
|
|
|
{
|
|
|
type Response = EventResponse;
|
|
|
type Error = SystemError;
|
|
|
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
|
|
|
|
|
|
- fn call(&self, data: SenderData<T>) -> Self::Future {
|
|
|
+ fn call(&self, data: SenderRequest<T>) -> Self::Future {
|
|
|
let module_map = self.module_map.clone();
|
|
|
- let SenderData {
|
|
|
+ let SenderRequest {
|
|
|
config,
|
|
|
+ event,
|
|
|
payload,
|
|
|
callback,
|
|
|
} = data;
|
|
|
|
|
|
- let event = payload.event.clone();
|
|
|
- let request = payload.into();
|
|
|
+ let mut request = ModuleRequest::new(event.clone());
|
|
|
+ if let Some(payload) = payload {
|
|
|
+ request = request.payload(payload);
|
|
|
+ }
|
|
|
|
|
|
let fut = async move {
|
|
|
let result = {
|
|
@@ -87,21 +90,21 @@ where
|
|
|
|
|
|
pub struct Sender<T>
|
|
|
where
|
|
|
- T: 'static,
|
|
|
+ T: 'static + Debug,
|
|
|
{
|
|
|
module_map: ModuleMap,
|
|
|
- data_tx: UnboundedSender<SenderData<T>>,
|
|
|
- data_rx: Option<UnboundedReceiver<SenderData<T>>>,
|
|
|
+ data_tx: UnboundedSender<SenderRequest<T>>,
|
|
|
+ data_rx: Option<UnboundedReceiver<SenderRequest<T>>>,
|
|
|
}
|
|
|
|
|
|
service_factor_impl!(Sender);
|
|
|
|
|
|
impl<T> Sender<T>
|
|
|
where
|
|
|
- T: 'static,
|
|
|
+ T: 'static + Debug,
|
|
|
{
|
|
|
pub fn new(module_map: ModuleMap) -> Self {
|
|
|
- let (data_tx, data_rx) = unbounded_channel::<SenderData<T>>();
|
|
|
+ let (data_tx, data_rx) = unbounded_channel::<SenderRequest<T>>();
|
|
|
Self {
|
|
|
module_map,
|
|
|
data_tx,
|
|
@@ -109,9 +112,9 @@ where
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- pub fn async_send(&self, data: SenderData<T>) { let _ = self.data_tx.send(data); }
|
|
|
+ pub fn async_send(&self, data: SenderRequest<T>) { let _ = self.data_tx.send(data); }
|
|
|
|
|
|
- pub fn sync_send(&self, data: SenderData<T>) -> EventResponse {
|
|
|
+ pub fn sync_send(&self, data: SenderRequest<T>) -> EventResponse {
|
|
|
let factory = self.new_service(());
|
|
|
|
|
|
futures::executor::block_on(async {
|
|
@@ -120,31 +123,31 @@ where
|
|
|
})
|
|
|
}
|
|
|
|
|
|
- pub fn take_rx(&mut self) -> UnboundedReceiver<SenderData<T>> { self.data_rx.take().unwrap() }
|
|
|
+ pub fn take_rx(&mut self) -> UnboundedReceiver<SenderRequest<T>> { self.data_rx.take().unwrap() }
|
|
|
}
|
|
|
|
|
|
pub struct SenderRunner<T>
|
|
|
where
|
|
|
- T: 'static,
|
|
|
+ T: 'static + Debug,
|
|
|
{
|
|
|
module_map: ModuleMap,
|
|
|
- data_rx: UnboundedReceiver<SenderData<T>>,
|
|
|
+ data_rx: UnboundedReceiver<SenderRequest<T>>,
|
|
|
}
|
|
|
|
|
|
service_factor_impl!(SenderRunner);
|
|
|
|
|
|
impl<T> SenderRunner<T>
|
|
|
where
|
|
|
- T: 'static,
|
|
|
+ T: 'static + Debug,
|
|
|
{
|
|
|
- pub fn new(module_map: ModuleMap, data_rx: UnboundedReceiver<SenderData<T>>) -> Self {
|
|
|
+ pub fn new(module_map: ModuleMap, data_rx: UnboundedReceiver<SenderRequest<T>>) -> Self {
|
|
|
Self { module_map, data_rx }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
impl<T> Future for SenderRunner<T>
|
|
|
where
|
|
|
- T: 'static,
|
|
|
+ T: 'static + Debug,
|
|
|
{
|
|
|
type Output = ();
|
|
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|