module.rs 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. use crate::{
  2. errors::{DispatchError, InternalError},
  3. module::{container::AFPluginStateMap, AFPluginState},
  4. request::{payload::Payload, AFPluginEventRequest, FromAFPluginRequest},
  5. response::{AFPluginEventResponse, AFPluginResponder},
  6. service::{
  7. factory, AFPluginHandler, AFPluginHandlerService, AFPluginServiceFactory, BoxService,
  8. BoxServiceFactory, Service, ServiceRequest, ServiceResponse,
  9. },
  10. };
  11. use futures_core::future::BoxFuture;
  12. use futures_core::ready;
  13. use nanoid::nanoid;
  14. use pin_project::pin_project;
  15. use std::sync::Arc;
  16. use std::{
  17. collections::HashMap,
  18. fmt,
  19. fmt::{Debug, Display},
  20. future::Future,
  21. hash::Hash,
  22. pin::Pin,
  23. task::{Context, Poll},
  24. };
  25. pub type AFPluginMap = Arc<HashMap<AFPluginEvent, Arc<AFPlugin>>>;
  26. pub(crate) fn as_plugin_map(plugins: Vec<AFPlugin>) -> AFPluginMap {
  27. let mut plugin_map = HashMap::new();
  28. plugins.into_iter().for_each(|m| {
  29. let events = m.events();
  30. let plugins = Arc::new(m);
  31. events.into_iter().for_each(|e| {
  32. plugin_map.insert(e, plugins.clone());
  33. });
  34. });
  35. Arc::new(plugin_map)
  36. }
  37. #[derive(PartialEq, Eq, Hash, Debug, Clone)]
  38. pub struct AFPluginEvent(pub String);
  39. impl<T: Display + Eq + Hash + Debug + Clone> std::convert::From<T> for AFPluginEvent {
  40. fn from(t: T) -> Self {
  41. AFPluginEvent(format!("{}", t))
  42. }
  43. }
  44. /// A plugin is used to handle the events that the plugin can handle.
  45. ///
  46. /// When an event is a dispatched by the `AFPluginDispatcher`, the dispatcher will
  47. /// find the corresponding plugin to handle the event. The name of the event must be unique,
  48. /// which means only one handler will get called.
  49. ///
  50. pub struct AFPlugin {
  51. pub name: String,
  52. /// a list of `AFPluginState` that the plugin registers. The state can be read by the plugin's handler.
  53. states: Arc<AFPluginStateMap>,
  54. /// Contains a list of factories that are used to generate the services used to handle the passed-in
  55. /// `ServiceRequest`.
  56. ///
  57. event_service_factory: Arc<
  58. HashMap<AFPluginEvent, BoxServiceFactory<(), ServiceRequest, ServiceResponse, DispatchError>>,
  59. >,
  60. }
  61. impl std::default::Default for AFPlugin {
  62. fn default() -> Self {
  63. Self {
  64. name: "".to_owned(),
  65. states: Arc::new(AFPluginStateMap::new()),
  66. event_service_factory: Arc::new(HashMap::new()),
  67. }
  68. }
  69. }
  70. impl AFPlugin {
  71. pub fn new() -> Self {
  72. AFPlugin::default()
  73. }
  74. pub fn name(mut self, s: &str) -> Self {
  75. self.name = s.to_owned();
  76. self
  77. }
  78. pub fn state<D: 'static + Send + Sync>(mut self, data: D) -> Self {
  79. Arc::get_mut(&mut self.states)
  80. .unwrap()
  81. .insert(AFPluginState::new(data));
  82. self
  83. }
  84. pub fn event<E, H, T, R>(mut self, event: E, handler: H) -> Self
  85. where
  86. H: AFPluginHandler<T, R>,
  87. T: FromAFPluginRequest + 'static + Send + Sync,
  88. <T as FromAFPluginRequest>::Future: Sync + Send,
  89. R: Future + 'static + Send + Sync,
  90. R::Output: AFPluginResponder + 'static,
  91. E: Eq + Hash + Debug + Clone + Display,
  92. {
  93. let event: AFPluginEvent = event.into();
  94. if self.event_service_factory.contains_key(&event) {
  95. panic!("Register duplicate Event: {:?}", &event);
  96. } else {
  97. Arc::get_mut(&mut self.event_service_factory)
  98. .unwrap()
  99. .insert(event, factory(AFPluginHandlerService::new(handler)));
  100. }
  101. self
  102. }
  103. pub fn events(&self) -> Vec<AFPluginEvent> {
  104. self
  105. .event_service_factory
  106. .keys()
  107. .cloned()
  108. .collect::<Vec<_>>()
  109. }
  110. }
  111. /// A request that will be passed to the corresponding plugin.
  112. ///
  113. /// Each request can carry the payload that will be deserialized into the corresponding data struct.
  114. ///
  115. #[derive(Debug, Clone)]
  116. pub struct AFPluginRequest {
  117. pub id: String,
  118. pub event: AFPluginEvent,
  119. pub(crate) payload: Payload,
  120. }
  121. impl AFPluginRequest {
  122. pub fn new<E>(event: E) -> Self
  123. where
  124. E: Into<AFPluginEvent>,
  125. {
  126. Self {
  127. id: nanoid!(6),
  128. event: event.into(),
  129. payload: Payload::None,
  130. }
  131. }
  132. pub fn payload<P>(mut self, payload: P) -> Self
  133. where
  134. P: Into<Payload>,
  135. {
  136. self.payload = payload.into();
  137. self
  138. }
  139. }
  140. impl std::fmt::Display for AFPluginRequest {
  141. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  142. write!(f, "{}:{:?}", self.id, self.event)
  143. }
  144. }
  145. impl AFPluginServiceFactory<AFPluginRequest> for AFPlugin {
  146. type Response = AFPluginEventResponse;
  147. type Error = DispatchError;
  148. type Service = BoxService<AFPluginRequest, Self::Response, Self::Error>;
  149. type Context = ();
  150. type Future = BoxFuture<'static, Result<Self::Service, Self::Error>>;
  151. fn new_service(&self, _cfg: Self::Context) -> Self::Future {
  152. let services = self.event_service_factory.clone();
  153. let states = self.states.clone();
  154. Box::pin(async move {
  155. let service = AFPluginService { services, states };
  156. Ok(Box::new(service) as Self::Service)
  157. })
  158. }
  159. }
  160. pub struct AFPluginService {
  161. services: Arc<
  162. HashMap<AFPluginEvent, BoxServiceFactory<(), ServiceRequest, ServiceResponse, DispatchError>>,
  163. >,
  164. states: Arc<AFPluginStateMap>,
  165. }
  166. impl Service<AFPluginRequest> for AFPluginService {
  167. type Response = AFPluginEventResponse;
  168. type Error = DispatchError;
  169. type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
  170. fn call(&self, request: AFPluginRequest) -> Self::Future {
  171. let AFPluginRequest { id, event, payload } = request;
  172. let states = self.states.clone();
  173. let request = AFPluginEventRequest::new(id, event, states);
  174. match self.services.get(&request.event) {
  175. Some(factory) => {
  176. let service_fut = factory.new_service(());
  177. let fut = AFPluginServiceFuture {
  178. fut: Box::pin(async {
  179. let service = service_fut.await?;
  180. let service_req = ServiceRequest::new(request, payload);
  181. service.call(service_req).await
  182. }),
  183. };
  184. Box::pin(async move { Ok(fut.await.unwrap_or_else(|e| e.into())) })
  185. },
  186. None => {
  187. let msg = format!(
  188. "Can not find service factory for event: {:?}",
  189. request.event
  190. );
  191. Box::pin(async { Err(InternalError::ServiceNotFound(msg).into()) })
  192. },
  193. }
  194. }
  195. }
  196. #[pin_project]
  197. pub struct AFPluginServiceFuture {
  198. #[pin]
  199. fut: BoxFuture<'static, Result<ServiceResponse, DispatchError>>,
  200. }
  201. impl Future for AFPluginServiceFuture {
  202. type Output = Result<AFPluginEventResponse, DispatchError>;
  203. fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
  204. let (_, response) = ready!(self.as_mut().project().fut.poll(cx))?.into_parts();
  205. Poll::Ready(Ok(response))
  206. }
  207. }