dispatcher.rs 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. use crate::runtime::AFPluginRuntime;
  2. use crate::{
  3. errors::{DispatchError, Error, InternalError},
  4. module::{as_plugin_map, AFPlugin, AFPluginMap, AFPluginRequest},
  5. response::AFPluginEventResponse,
  6. service::{AFPluginServiceFactory, Service},
  7. };
  8. use derivative::*;
  9. use futures_core::future::BoxFuture;
  10. use futures_util::task::Context;
  11. use pin_project::pin_project;
  12. use std::{future::Future, sync::Arc};
  13. use tokio::macros::support::{Pin, Poll};
  14. pub struct AFPluginDispatcher {
  15. plugins: AFPluginMap,
  16. runtime: AFPluginRuntime,
  17. }
  18. impl AFPluginDispatcher {
  19. pub fn construct<F>(runtime: AFPluginRuntime, module_factory: F) -> AFPluginDispatcher
  20. where
  21. F: FnOnce() -> Vec<AFPlugin>,
  22. {
  23. let plugins = module_factory();
  24. tracing::trace!("{}", plugin_info(&plugins));
  25. AFPluginDispatcher {
  26. plugins: as_plugin_map(plugins),
  27. runtime,
  28. }
  29. }
  30. pub fn async_send<Req>(
  31. dispatch: Arc<AFPluginDispatcher>,
  32. request: Req,
  33. ) -> DispatchFuture<AFPluginEventResponse>
  34. where
  35. Req: std::convert::Into<AFPluginRequest>,
  36. {
  37. AFPluginDispatcher::async_send_with_callback(dispatch, request, |_| Box::pin(async {}))
  38. }
  39. pub fn async_send_with_callback<Req, Callback>(
  40. dispatch: Arc<AFPluginDispatcher>,
  41. request: Req,
  42. callback: Callback,
  43. ) -> DispatchFuture<AFPluginEventResponse>
  44. where
  45. Req: std::convert::Into<AFPluginRequest>,
  46. Callback: FnOnce(AFPluginEventResponse) -> BoxFuture<'static, ()> + 'static + Send + Sync,
  47. {
  48. let request: AFPluginRequest = request.into();
  49. let plugins = dispatch.plugins.clone();
  50. let service = Box::new(DispatchService { plugins });
  51. tracing::trace!("Async event: {:?}", &request.event);
  52. let service_ctx = DispatchContext {
  53. request,
  54. callback: Some(Box::new(callback)),
  55. };
  56. let join_handle = dispatch.runtime.spawn(async move {
  57. service.call(service_ctx).await.unwrap_or_else(|e| {
  58. tracing::error!("Dispatch runtime error: {:?}", e);
  59. InternalError::Other(format!("{:?}", e)).as_response()
  60. })
  61. });
  62. DispatchFuture {
  63. fut: Box::pin(async move {
  64. join_handle.await.unwrap_or_else(|e| {
  65. let msg = format!("EVENT_DISPATCH join error: {:?}", e);
  66. tracing::error!("{}", msg);
  67. let error = InternalError::JoinError(msg);
  68. error.as_response()
  69. })
  70. }),
  71. }
  72. }
  73. pub fn sync_send(
  74. dispatch: Arc<AFPluginDispatcher>,
  75. request: AFPluginRequest,
  76. ) -> AFPluginEventResponse {
  77. futures::executor::block_on(async {
  78. AFPluginDispatcher::async_send_with_callback(dispatch, request, |_| Box::pin(async {})).await
  79. })
  80. }
  81. pub fn spawn<F>(&self, f: F)
  82. where
  83. F: Future<Output = ()> + Send + 'static,
  84. {
  85. self.runtime.spawn(f);
  86. }
  87. }
  88. #[pin_project]
  89. pub struct DispatchFuture<T: Send + Sync> {
  90. #[pin]
  91. pub fut: Pin<Box<dyn Future<Output = T> + Sync + Send>>,
  92. }
  93. impl<T> Future for DispatchFuture<T>
  94. where
  95. T: Send + Sync,
  96. {
  97. type Output = T;
  98. fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
  99. let this = self.as_mut().project();
  100. Poll::Ready(futures_core::ready!(this.fut.poll(cx)))
  101. }
  102. }
  103. pub type BoxFutureCallback =
  104. Box<dyn FnOnce(AFPluginEventResponse) -> BoxFuture<'static, ()> + 'static + Send + Sync>;
  105. #[derive(Derivative)]
  106. #[derivative(Debug)]
  107. pub struct DispatchContext {
  108. pub request: AFPluginRequest,
  109. #[derivative(Debug = "ignore")]
  110. pub callback: Option<BoxFutureCallback>,
  111. }
  112. impl DispatchContext {
  113. pub(crate) fn into_parts(self) -> (AFPluginRequest, Option<BoxFutureCallback>) {
  114. let DispatchContext { request, callback } = self;
  115. (request, callback)
  116. }
  117. }
  118. pub(crate) struct DispatchService {
  119. pub(crate) plugins: AFPluginMap,
  120. }
  121. impl Service<DispatchContext> for DispatchService {
  122. type Response = AFPluginEventResponse;
  123. type Error = DispatchError;
  124. type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
  125. #[cfg_attr(
  126. feature = "use_tracing",
  127. tracing::instrument(name = "DispatchService", level = "debug", skip(self, ctx))
  128. )]
  129. fn call(&self, ctx: DispatchContext) -> Self::Future {
  130. let module_map = self.plugins.clone();
  131. let (request, callback) = ctx.into_parts();
  132. Box::pin(async move {
  133. let result = {
  134. // print_module_map_info(&module_map);
  135. match module_map.get(&request.event) {
  136. Some(module) => {
  137. tracing::trace!("Handle event: {:?} by {:?}", &request.event, module.name);
  138. let fut = module.new_service(());
  139. let service_fut = fut.await?.call(request);
  140. service_fut.await
  141. },
  142. None => {
  143. let msg = format!("Can not find the event handler. {:?}", request);
  144. tracing::error!("{}", msg);
  145. Err(InternalError::HandleNotFound(msg).into())
  146. },
  147. }
  148. };
  149. let response = result.unwrap_or_else(|e| e.into());
  150. tracing::trace!("Dispatch result: {:?}", response);
  151. if let Some(callback) = callback {
  152. callback(response.clone()).await;
  153. }
  154. Ok(response)
  155. })
  156. }
  157. }
  158. #[allow(dead_code)]
  159. fn plugin_info(plugins: &[AFPlugin]) -> String {
  160. let mut info = format!("{} plugins loaded\n", plugins.len());
  161. for module in plugins {
  162. info.push_str(&format!("-> {} loaded \n", module.name));
  163. }
  164. info
  165. }
  166. #[allow(dead_code)]
  167. fn print_plugins(plugins: &AFPluginMap) {
  168. plugins.iter().for_each(|(k, v)| {
  169. tracing::info!("Event: {:?} plugin : {:?}", k, v.name);
  170. })
  171. }