dispatcher.rs 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. use crate::runtime::FlowyRuntime;
  2. use crate::{
  3. errors::{DispatchError, Error, InternalError},
  4. module::{as_module_map, Module, ModuleMap, ModuleRequest},
  5. response::EventResponse,
  6. service::{Service, ServiceFactory},
  7. };
  8. use derivative::*;
  9. use futures_core::future::BoxFuture;
  10. use futures_util::task::Context;
  11. use pin_project::pin_project;
  12. use std::{future::Future, sync::Arc};
  13. use tokio::macros::support::{Pin, Poll};
  14. pub struct EventDispatcher {
  15. module_map: ModuleMap,
  16. runtime: FlowyRuntime,
  17. }
  18. impl EventDispatcher {
  19. pub fn construct<F>(runtime: FlowyRuntime, module_factory: F) -> EventDispatcher
  20. where
  21. F: FnOnce() -> Vec<Module>,
  22. {
  23. let modules = module_factory();
  24. tracing::trace!("{}", module_info(&modules));
  25. let module_map = as_module_map(modules);
  26. EventDispatcher { module_map, runtime }
  27. }
  28. pub fn async_send<Req>(dispatch: Arc<EventDispatcher>, request: Req) -> DispatchFuture<EventResponse>
  29. where
  30. Req: std::convert::Into<ModuleRequest>,
  31. {
  32. EventDispatcher::async_send_with_callback(dispatch, request, |_| Box::pin(async {}))
  33. }
  34. pub fn async_send_with_callback<Req, Callback>(
  35. dispatch: Arc<EventDispatcher>,
  36. request: Req,
  37. callback: Callback,
  38. ) -> DispatchFuture<EventResponse>
  39. where
  40. Req: std::convert::Into<ModuleRequest>,
  41. Callback: FnOnce(EventResponse) -> BoxFuture<'static, ()> + 'static + Send + Sync,
  42. {
  43. let request: ModuleRequest = request.into();
  44. let module_map = dispatch.module_map.clone();
  45. let service = Box::new(DispatchService { module_map });
  46. tracing::trace!("Async event: {:?}", &request.event);
  47. let service_ctx = DispatchContext {
  48. request,
  49. callback: Some(Box::new(callback)),
  50. };
  51. let join_handle = dispatch.runtime.spawn(async move {
  52. service.call(service_ctx).await.unwrap_or_else(|e| {
  53. tracing::error!("Dispatch runtime error: {:?}", e);
  54. InternalError::Other(format!("{:?}", e)).as_response()
  55. })
  56. });
  57. DispatchFuture {
  58. fut: Box::pin(async move {
  59. join_handle.await.unwrap_or_else(|e| {
  60. let msg = format!("EVENT_DISPATCH join error: {:?}", e);
  61. tracing::error!("{}", msg);
  62. let error = InternalError::JoinError(msg);
  63. error.as_response()
  64. })
  65. }),
  66. }
  67. }
  68. pub fn sync_send(dispatch: Arc<EventDispatcher>, request: ModuleRequest) -> EventResponse {
  69. futures::executor::block_on(async {
  70. EventDispatcher::async_send_with_callback(dispatch, request, |_| Box::pin(async {})).await
  71. })
  72. }
  73. pub fn spawn<F>(&self, f: F)
  74. where
  75. F: Future<Output = ()> + Send + 'static,
  76. {
  77. self.runtime.spawn(f);
  78. }
  79. }
  80. #[pin_project]
  81. pub struct DispatchFuture<T: Send + Sync> {
  82. #[pin]
  83. pub fut: Pin<Box<dyn Future<Output = T> + Sync + Send>>,
  84. }
  85. impl<T> Future for DispatchFuture<T>
  86. where
  87. T: Send + Sync,
  88. {
  89. type Output = T;
  90. fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
  91. let this = self.as_mut().project();
  92. Poll::Ready(futures_core::ready!(this.fut.poll(cx)))
  93. }
  94. }
  95. pub type BoxFutureCallback = Box<dyn FnOnce(EventResponse) -> BoxFuture<'static, ()> + 'static + Send + Sync>;
  96. #[derive(Derivative)]
  97. #[derivative(Debug)]
  98. pub struct DispatchContext {
  99. pub request: ModuleRequest,
  100. #[derivative(Debug = "ignore")]
  101. pub callback: Option<BoxFutureCallback>,
  102. }
  103. impl DispatchContext {
  104. pub(crate) fn into_parts(self) -> (ModuleRequest, Option<BoxFutureCallback>) {
  105. let DispatchContext { request, callback } = self;
  106. (request, callback)
  107. }
  108. }
  109. pub(crate) struct DispatchService {
  110. pub(crate) module_map: ModuleMap,
  111. }
  112. impl Service<DispatchContext> for DispatchService {
  113. type Response = EventResponse;
  114. type Error = DispatchError;
  115. type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
  116. #[cfg_attr(
  117. feature = "use_tracing",
  118. tracing::instrument(name = "DispatchService", level = "debug", skip(self, ctx))
  119. )]
  120. fn call(&self, ctx: DispatchContext) -> Self::Future {
  121. let module_map = self.module_map.clone();
  122. let (request, callback) = ctx.into_parts();
  123. Box::pin(async move {
  124. let result = {
  125. // print_module_map_info(&module_map);
  126. match module_map.get(&request.event) {
  127. Some(module) => {
  128. tracing::trace!("Handle event: {:?} by {:?}", &request.event, module.name);
  129. let fut = module.new_service(());
  130. let service_fut = fut.await?.call(request);
  131. service_fut.await
  132. }
  133. None => {
  134. let msg = format!("Can not find the event handler. {:?}", request);
  135. tracing::error!("{}", msg);
  136. Err(InternalError::HandleNotFound(msg).into())
  137. }
  138. }
  139. };
  140. let response = result.unwrap_or_else(|e| e.into());
  141. tracing::trace!("Dispatch result: {:?}", response);
  142. if let Some(callback) = callback {
  143. callback(response.clone()).await;
  144. }
  145. Ok(response)
  146. })
  147. }
  148. }
  149. #[allow(dead_code)]
  150. fn module_info(modules: &[Module]) -> String {
  151. let mut info = format!("{} modules loaded\n", modules.len());
  152. for module in modules {
  153. info.push_str(&format!("-> {} loaded \n", module.name));
  154. }
  155. info
  156. }
  157. #[allow(dead_code)]
  158. fn print_module_map_info(module_map: &ModuleMap) {
  159. module_map.iter().for_each(|(k, v)| {
  160. tracing::info!("Event: {:?} module: {:?}", k, v.name);
  161. })
  162. }