module.rs 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. use std::sync::Arc;
  2. use std::{
  3. collections::HashMap,
  4. fmt,
  5. fmt::{Debug, Display},
  6. future::Future,
  7. hash::Hash,
  8. pin::Pin,
  9. task::{Context, Poll},
  10. };
  11. use futures_core::future::BoxFuture;
  12. use futures_core::ready;
  13. use nanoid::nanoid;
  14. use pin_project::pin_project;
  15. use crate::service::AFPluginHandler;
  16. use crate::{
  17. errors::{DispatchError, InternalError},
  18. module::{container::AFPluginStateMap, AFPluginState},
  19. request::{payload::Payload, AFPluginEventRequest, FromAFPluginRequest},
  20. response::{AFPluginEventResponse, AFPluginResponder},
  21. service::{
  22. factory, AFPluginHandlerService, AFPluginServiceFactory, BoxService, BoxServiceFactory,
  23. Service, ServiceRequest, ServiceResponse,
  24. },
  25. };
  26. pub type AFPluginMap = Arc<HashMap<AFPluginEvent, Arc<AFPlugin>>>;
  27. pub(crate) fn as_plugin_map(plugins: Vec<AFPlugin>) -> AFPluginMap {
  28. let mut plugin_map = HashMap::new();
  29. plugins.into_iter().for_each(|m| {
  30. let events = m.events();
  31. let plugins = Arc::new(m);
  32. events.into_iter().for_each(|e| {
  33. plugin_map.insert(e, plugins.clone());
  34. });
  35. });
  36. Arc::new(plugin_map)
  37. }
  38. #[derive(PartialEq, Eq, Hash, Debug, Clone)]
  39. pub struct AFPluginEvent(pub String);
  40. impl<T: Display + Eq + Hash + Debug + Clone> std::convert::From<T> for AFPluginEvent {
  41. fn from(t: T) -> Self {
  42. AFPluginEvent(format!("{}", t))
  43. }
  44. }
  45. /// A plugin is used to handle the events that the plugin can handle.
  46. ///
  47. /// When an event is a dispatched by the `AFPluginDispatcher`, the dispatcher will
  48. /// find the corresponding plugin to handle the event. The name of the event must be unique,
  49. /// which means only one handler will get called.
  50. ///
  51. pub struct AFPlugin {
  52. pub name: String,
  53. /// a list of `AFPluginState` that the plugin registers. The state can be read by the plugin's handler.
  54. states: Arc<AFPluginStateMap>,
  55. /// Contains a list of factories that are used to generate the services used to handle the passed-in
  56. /// `ServiceRequest`.
  57. ///
  58. event_service_factory: Arc<
  59. HashMap<AFPluginEvent, BoxServiceFactory<(), ServiceRequest, ServiceResponse, DispatchError>>,
  60. >,
  61. }
  62. impl std::default::Default for AFPlugin {
  63. fn default() -> Self {
  64. Self {
  65. name: "".to_owned(),
  66. states: Arc::new(AFPluginStateMap::new()),
  67. event_service_factory: Arc::new(HashMap::new()),
  68. }
  69. }
  70. }
  71. impl AFPlugin {
  72. pub fn new() -> Self {
  73. AFPlugin::default()
  74. }
  75. pub fn name(mut self, s: &str) -> Self {
  76. self.name = s.to_owned();
  77. self
  78. }
  79. pub fn state<D: 'static + Send + Sync>(mut self, data: D) -> Self {
  80. Arc::get_mut(&mut self.states)
  81. .unwrap()
  82. .insert(AFPluginState::new(data));
  83. self
  84. }
  85. #[track_caller]
  86. pub fn event<E, H, T, R>(mut self, event: E, handler: H) -> Self
  87. where
  88. H: AFPluginHandler<T, R>,
  89. T: FromAFPluginRequest + 'static + Send + Sync,
  90. <T as FromAFPluginRequest>::Future: Sync + Send,
  91. R: Future + 'static + Send + Sync,
  92. R::Output: AFPluginResponder + 'static,
  93. E: Eq + Hash + Debug + Clone + Display,
  94. {
  95. let event: AFPluginEvent = event.into();
  96. if self.event_service_factory.contains_key(&event) {
  97. panic!("Register duplicate Event: {:?}", &event);
  98. } else {
  99. Arc::get_mut(&mut self.event_service_factory)
  100. .unwrap()
  101. .insert(event, factory(AFPluginHandlerService::new(handler)));
  102. }
  103. self
  104. }
  105. pub fn events(&self) -> Vec<AFPluginEvent> {
  106. self
  107. .event_service_factory
  108. .keys()
  109. .cloned()
  110. .collect::<Vec<_>>()
  111. }
  112. }
  113. /// A request that will be passed to the corresponding plugin.
  114. ///
  115. /// Each request can carry the payload that will be deserialized into the corresponding data struct.
  116. ///
  117. #[derive(Debug, Clone)]
  118. pub struct AFPluginRequest {
  119. pub id: String,
  120. pub event: AFPluginEvent,
  121. pub(crate) payload: Payload,
  122. }
  123. impl AFPluginRequest {
  124. pub fn new<E>(event: E) -> Self
  125. where
  126. E: Into<AFPluginEvent>,
  127. {
  128. Self {
  129. id: nanoid!(6),
  130. event: event.into(),
  131. payload: Payload::None,
  132. }
  133. }
  134. pub fn payload<P>(mut self, payload: P) -> Self
  135. where
  136. P: Into<Payload>,
  137. {
  138. self.payload = payload.into();
  139. self
  140. }
  141. }
  142. impl std::fmt::Display for AFPluginRequest {
  143. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  144. write!(f, "{}:{:?}", self.id, self.event)
  145. }
  146. }
  147. impl AFPluginServiceFactory<AFPluginRequest> for AFPlugin {
  148. type Response = AFPluginEventResponse;
  149. type Error = DispatchError;
  150. type Service = BoxService<AFPluginRequest, Self::Response, Self::Error>;
  151. type Context = ();
  152. type Future = BoxFuture<'static, Result<Self::Service, Self::Error>>;
  153. fn new_service(&self, _cfg: Self::Context) -> Self::Future {
  154. let services = self.event_service_factory.clone();
  155. let states = self.states.clone();
  156. Box::pin(async move {
  157. let service = AFPluginService { services, states };
  158. Ok(Box::new(service) as Self::Service)
  159. })
  160. }
  161. }
  162. pub struct AFPluginService {
  163. services: Arc<
  164. HashMap<AFPluginEvent, BoxServiceFactory<(), ServiceRequest, ServiceResponse, DispatchError>>,
  165. >,
  166. states: Arc<AFPluginStateMap>,
  167. }
  168. impl Service<AFPluginRequest> for AFPluginService {
  169. type Response = AFPluginEventResponse;
  170. type Error = DispatchError;
  171. type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
  172. fn call(&self, request: AFPluginRequest) -> Self::Future {
  173. let AFPluginRequest { id, event, payload } = request;
  174. let states = self.states.clone();
  175. let request = AFPluginEventRequest::new(id, event, states);
  176. match self.services.get(&request.event) {
  177. Some(factory) => {
  178. let service_fut = factory.new_service(());
  179. let fut = AFPluginServiceFuture {
  180. fut: Box::pin(async {
  181. let service = service_fut.await?;
  182. let service_req = ServiceRequest::new(request, payload);
  183. service.call(service_req).await
  184. }),
  185. };
  186. Box::pin(async move { Ok(fut.await.unwrap_or_else(|e| e.into())) })
  187. },
  188. None => {
  189. let msg = format!(
  190. "Can not find service factory for event: {:?}",
  191. request.event
  192. );
  193. Box::pin(async { Err(InternalError::ServiceNotFound(msg).into()) })
  194. },
  195. }
  196. }
  197. }
  198. #[pin_project]
  199. pub struct AFPluginServiceFuture {
  200. #[pin]
  201. fut: BoxFuture<'static, Result<ServiceResponse, DispatchError>>,
  202. }
  203. impl Future for AFPluginServiceFuture {
  204. type Output = Result<AFPluginEventResponse, DispatchError>;
  205. fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
  206. let (_, response) = ready!(self.as_mut().project().fut.poll(cx))?.into_parts();
  207. Poll::Ready(Ok(response))
  208. }
  209. }