dispatch.rs 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. use crate::{
  2. errors::{DispatchError, Error, InternalError},
  3. module::{as_module_map, Module, ModuleMap, ModuleRequest},
  4. response::{EventResponse, Responder},
  5. service::{Service, ServiceFactory},
  6. util::tokio_default_runtime,
  7. };
  8. use derivative::*;
  9. use futures_core::future::BoxFuture;
  10. use futures_util::task::Context;
  11. use lazy_static::lazy_static;
  12. use pin_project::pin_project;
  13. use std::{future::Future, sync::RwLock};
  14. use tokio::macros::support::{Pin, Poll};
  15. lazy_static! {
  16. pub static ref EVENT_DISPATCH: RwLock<Option<EventDispatch>> = RwLock::new(None);
  17. }
  18. pub struct EventDispatch {
  19. module_map: ModuleMap,
  20. runtime: tokio::runtime::Runtime,
  21. }
  22. impl EventDispatch {
  23. pub fn construct<F>(module_factory: F)
  24. where
  25. F: FnOnce() -> Vec<Module>,
  26. {
  27. let modules = module_factory();
  28. log::trace!("{}", module_info(&modules));
  29. let module_map = as_module_map(modules);
  30. let runtime = tokio_default_runtime().unwrap();
  31. let dispatch = EventDispatch {
  32. module_map,
  33. runtime,
  34. };
  35. *(EVENT_DISPATCH.write().unwrap()) = Some(dispatch);
  36. }
  37. pub fn async_send<Req>(request: Req) -> DispatchFuture<EventResponse>
  38. where
  39. Req: std::convert::Into<ModuleRequest>,
  40. {
  41. EventDispatch::async_send_with_callback(request, |_| Box::pin(async {}))
  42. }
  43. pub fn async_send_with_callback<Req, Callback>(
  44. request: Req,
  45. callback: Callback,
  46. ) -> DispatchFuture<EventResponse>
  47. where
  48. Req: std::convert::Into<ModuleRequest>,
  49. Callback: FnOnce(EventResponse) -> BoxFuture<'static, ()> + 'static + Send + Sync,
  50. {
  51. let request: ModuleRequest = request.into();
  52. match EVENT_DISPATCH.read() {
  53. Ok(dispatch) => {
  54. let dispatch = dispatch.as_ref().unwrap();
  55. let module_map = dispatch.module_map.clone();
  56. let service = Box::new(DispatchService { module_map });
  57. log::trace!("Async event: {:?}", &request.event);
  58. let service_ctx = DispatchContext {
  59. request,
  60. callback: Some(Box::new(callback)),
  61. };
  62. let join_handle = dispatch.runtime.spawn(async move {
  63. service
  64. .call(service_ctx)
  65. .await
  66. .unwrap_or_else(|e| InternalError::new(format!("{:?}", e)).as_response())
  67. });
  68. DispatchFuture {
  69. fut: Box::pin(async move {
  70. join_handle.await.unwrap_or_else(|e| {
  71. InternalError::new(format!("EVENT_DISPATCH join error: {:?}", e))
  72. .as_response()
  73. })
  74. }),
  75. }
  76. },
  77. Err(e) => {
  78. let msg = format!("EVENT_DISPATCH read failed. {:?}", e);
  79. log::error!("{}", msg);
  80. DispatchFuture {
  81. fut: Box::pin(async { InternalError::new(msg).as_response() }),
  82. }
  83. },
  84. }
  85. }
  86. pub fn sync_send(request: ModuleRequest) -> EventResponse {
  87. futures::executor::block_on(async {
  88. EventDispatch::async_send_with_callback(request, |_| Box::pin(async {})).await
  89. })
  90. }
  91. }
  92. #[pin_project]
  93. pub struct DispatchFuture<T: Responder + Send + Sync> {
  94. #[pin]
  95. pub fut: Pin<Box<dyn Future<Output = T> + Sync + Send>>,
  96. }
  97. impl<T> Future for DispatchFuture<T>
  98. where
  99. T: Responder + Send + Sync,
  100. {
  101. type Output = T;
  102. fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
  103. let this = self.as_mut().project();
  104. loop {
  105. return Poll::Ready(futures_core::ready!(this.fut.poll(cx)));
  106. }
  107. }
  108. }
  109. pub type BoxFutureCallback =
  110. Box<dyn FnOnce(EventResponse) -> BoxFuture<'static, ()> + 'static + Send + Sync>;
  111. #[derive(Derivative)]
  112. #[derivative(Debug)]
  113. pub struct DispatchContext {
  114. pub request: ModuleRequest,
  115. #[derivative(Debug = "ignore")]
  116. pub callback: Option<BoxFutureCallback>,
  117. }
  118. impl DispatchContext {
  119. pub(crate) fn into_parts(self) -> (ModuleRequest, Option<BoxFutureCallback>) {
  120. let DispatchContext { request, callback } = self;
  121. (request, callback)
  122. }
  123. }
  124. pub(crate) struct DispatchService {
  125. pub(crate) module_map: ModuleMap,
  126. }
  127. impl Service<DispatchContext> for DispatchService {
  128. type Response = EventResponse;
  129. type Error = DispatchError;
  130. type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
  131. #[cfg_attr(
  132. feature = "use_tracing",
  133. tracing::instrument(name = "DispatchService", level = "debug", skip(self, ctx))
  134. )]
  135. fn call(&self, ctx: DispatchContext) -> Self::Future {
  136. let module_map = self.module_map.clone();
  137. let (request, callback) = ctx.into_parts();
  138. Box::pin(async move {
  139. let result = {
  140. match module_map.get(&request.event) {
  141. Some(module) => {
  142. let fut = module.new_service(());
  143. let service_fut = fut.await?.call(request);
  144. service_fut.await
  145. },
  146. None => {
  147. let msg = format!("Can not find the event handler. {:?}", request);
  148. log::trace!("{}", msg);
  149. Err(InternalError::new(msg).into())
  150. },
  151. }
  152. };
  153. let response = result.unwrap_or_else(|e| e.into());
  154. log::trace!("Dispatch result: {:?}", response);
  155. if let Some(callback) = callback {
  156. callback(response.clone()).await;
  157. }
  158. Ok(response)
  159. })
  160. }
  161. }
  162. fn module_info(modules: &Vec<Module>) -> String {
  163. let mut info = format!("{} modules loaded\n", modules.len());
  164. for module in modules {
  165. info.push_str(&format!("-> {} loaded \n", module.name));
  166. }
  167. info
  168. }