|
@@ -15,27 +15,66 @@ use tokio::{
|
|
|
macro_rules! service_factor_impl {
|
|
|
($name:ident) => {
|
|
|
#[allow(non_snake_case, missing_docs)]
|
|
|
- impl<T> ServiceFactory<StreamData<T>> for $name<T>
|
|
|
+ impl<T> ServiceFactory<CommandData<T>> for $name<T>
|
|
|
where
|
|
|
T: 'static,
|
|
|
{
|
|
|
type Response = EventResponse;
|
|
|
type Error = SystemError;
|
|
|
- type Service = BoxService<StreamData<T>, Self::Response, Self::Error>;
|
|
|
+ type Service = BoxService<CommandData<T>, Self::Response, Self::Error>;
|
|
|
type Config = ();
|
|
|
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::Error>>;
|
|
|
|
|
|
fn new_service(&self, _cfg: Self::Config) -> Self::Future {
|
|
|
let module_map = self.module_map.clone();
|
|
|
- let service = Box::new(CommandStreamService { module_map });
|
|
|
+ let service = Box::new(CommandSenderService { module_map });
|
|
|
Box::pin(async move { Ok(service as Self::Service) })
|
|
|
}
|
|
|
}
|
|
|
};
|
|
|
}
|
|
|
|
|
|
+struct CommandSenderService {
|
|
|
+ module_map: ModuleMap,
|
|
|
+}
|
|
|
+
|
|
|
+impl<T: 'static> Service<CommandData<T>> for CommandSenderService {
|
|
|
+ type Response = EventResponse;
|
|
|
+ type Error = SystemError;
|
|
|
+ type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
|
|
|
+
|
|
|
+ fn call(&self, mut data: CommandData<T>) -> Self::Future {
|
|
|
+ let module_map = self.module_map.clone();
|
|
|
+ let request = data.request.take().unwrap();
|
|
|
+ let fut = async move {
|
|
|
+ let result = {
|
|
|
+ match module_map.get(request.get_event()) {
|
|
|
+ Some(module) => {
|
|
|
+ let config = request.get_id().to_owned();
|
|
|
+ let fut = module.new_service(config);
|
|
|
+ let service_fut = fut.await?.call(request);
|
|
|
+ service_fut.await
|
|
|
+ },
|
|
|
+ None => {
|
|
|
+ let msg = format!("Can not find the module to handle the request:{:?}", request);
|
|
|
+ Err(InternalError::new(msg).into())
|
|
|
+ },
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ let response = result.unwrap_or_else(|e| e.into());
|
|
|
+ if let Some(callback) = data.callback {
|
|
|
+ callback(data.config, response.clone());
|
|
|
+ }
|
|
|
+
|
|
|
+ Ok(response)
|
|
|
+ };
|
|
|
+ Box::pin(fut)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
pub type BoxStreamCallback<T> = Box<dyn FnOnce(T, EventResponse) + 'static + Send + Sync>;
|
|
|
-pub struct StreamData<T>
|
|
|
+pub struct CommandData<T>
|
|
|
where
|
|
|
T: 'static,
|
|
|
{
|
|
@@ -44,7 +83,7 @@ where
|
|
|
callback: Option<BoxStreamCallback<T>>,
|
|
|
}
|
|
|
|
|
|
-impl<T> StreamData<T> {
|
|
|
+impl<T> CommandData<T> {
|
|
|
pub fn new(config: T, request: Option<EventRequest>) -> Self {
|
|
|
Self {
|
|
|
config,
|
|
@@ -59,20 +98,20 @@ impl<T> StreamData<T> {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-pub struct CommandStream<T>
|
|
|
+pub struct CommandSender<T>
|
|
|
where
|
|
|
T: 'static,
|
|
|
{
|
|
|
module_map: ModuleMap,
|
|
|
- data_tx: UnboundedSender<StreamData<T>>,
|
|
|
- data_rx: Option<UnboundedReceiver<StreamData<T>>>,
|
|
|
+ data_tx: UnboundedSender<CommandData<T>>,
|
|
|
+ data_rx: Option<UnboundedReceiver<CommandData<T>>>,
|
|
|
}
|
|
|
|
|
|
-service_factor_impl!(CommandStream);
|
|
|
+service_factor_impl!(CommandSender);
|
|
|
|
|
|
-impl<T> CommandStream<T> {
|
|
|
+impl<T> CommandSender<T> {
|
|
|
pub fn new(module_map: ModuleMap) -> Self {
|
|
|
- let (data_tx, data_rx) = unbounded_channel::<StreamData<T>>();
|
|
|
+ let (data_tx, data_rx) = unbounded_channel::<CommandData<T>>();
|
|
|
Self {
|
|
|
module_map,
|
|
|
data_tx,
|
|
@@ -80,9 +119,9 @@ impl<T> CommandStream<T> {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- pub fn async_send(&self, data: StreamData<T>) { let _ = self.data_tx.send(data); }
|
|
|
+ pub fn async_send(&self, data: CommandData<T>) { let _ = self.data_tx.send(data); }
|
|
|
|
|
|
- pub fn sync_send(&self, data: StreamData<T>) -> EventResponse {
|
|
|
+ pub fn sync_send(&self, data: CommandData<T>) -> EventResponse {
|
|
|
let factory = self.new_service(());
|
|
|
|
|
|
futures::executor::block_on(async {
|
|
@@ -91,25 +130,25 @@ impl<T> CommandStream<T> {
|
|
|
})
|
|
|
}
|
|
|
|
|
|
- pub fn tx(&self) -> UnboundedSender<StreamData<T>> { self.data_tx.clone() }
|
|
|
+ pub fn tx(&self) -> UnboundedSender<CommandData<T>> { self.data_tx.clone() }
|
|
|
|
|
|
- pub fn take_data_rx(&mut self) -> UnboundedReceiver<StreamData<T>> { self.data_rx.take().unwrap() }
|
|
|
+ pub fn take_data_rx(&mut self) -> UnboundedReceiver<CommandData<T>> { self.data_rx.take().unwrap() }
|
|
|
}
|
|
|
|
|
|
-pub struct CommandStreamFuture<T: 'static> {
|
|
|
+pub struct CommandSenderRunner<T: 'static> {
|
|
|
module_map: ModuleMap,
|
|
|
- data_rx: UnboundedReceiver<StreamData<T>>,
|
|
|
+ data_rx: UnboundedReceiver<CommandData<T>>,
|
|
|
}
|
|
|
|
|
|
-service_factor_impl!(CommandStreamFuture);
|
|
|
+service_factor_impl!(CommandSenderRunner);
|
|
|
|
|
|
-impl<T: 'static> CommandStreamFuture<T> {
|
|
|
- pub fn new(module_map: ModuleMap, data_rx: UnboundedReceiver<StreamData<T>>) -> Self {
|
|
|
+impl<T: 'static> CommandSenderRunner<T> {
|
|
|
+ pub fn new(module_map: ModuleMap, data_rx: UnboundedReceiver<CommandData<T>>) -> Self {
|
|
|
Self { module_map, data_rx }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-impl<T> Future for CommandStreamFuture<T>
|
|
|
+impl<T> Future for CommandSenderRunner<T>
|
|
|
where
|
|
|
T: 'static,
|
|
|
{
|
|
@@ -129,42 +168,3 @@ where
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
-pub struct CommandStreamService {
|
|
|
- module_map: ModuleMap,
|
|
|
-}
|
|
|
-
|
|
|
-impl<T: 'static> Service<StreamData<T>> for CommandStreamService {
|
|
|
- type Response = EventResponse;
|
|
|
- type Error = SystemError;
|
|
|
- type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
|
|
|
-
|
|
|
- fn call(&self, mut data: StreamData<T>) -> Self::Future {
|
|
|
- let module_map = self.module_map.clone();
|
|
|
- let request = data.request.take().unwrap();
|
|
|
- let fut = async move {
|
|
|
- let result = {
|
|
|
- match module_map.get(request.get_event()) {
|
|
|
- Some(module) => {
|
|
|
- let config = request.get_id().to_owned();
|
|
|
- let fut = module.new_service(config);
|
|
|
- let service_fut = fut.await?.call(request);
|
|
|
- service_fut.await
|
|
|
- },
|
|
|
- None => {
|
|
|
- let msg = format!("Can not find the module to handle the request:{:?}", request);
|
|
|
- Err(InternalError::new(msg).into())
|
|
|
- },
|
|
|
- }
|
|
|
- };
|
|
|
-
|
|
|
- let response = result.unwrap_or_else(|e| e.into());
|
|
|
- if let Some(callback) = data.callback {
|
|
|
- callback(data.config, response.clone());
|
|
|
- }
|
|
|
-
|
|
|
- Ok(response)
|
|
|
- };
|
|
|
- Box::pin(fut)
|
|
|
- }
|
|
|
-}
|