module.rs 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. use crate::{
  2. errors::{DispatchError, InternalError},
  3. module::{container::AFPluginStateMap, AFPluginState},
  4. request::{payload::Payload, AFPluginEventRequest, FromAFPluginRequest},
  5. response::{AFPluginEventResponse, AFPluginResponder},
  6. service::{
  7. factory, AFPluginHandler, AFPluginHandlerService, AFPluginServiceFactory, BoxService, BoxServiceFactory,
  8. Service, ServiceRequest, ServiceResponse,
  9. },
  10. };
  11. use futures_core::future::BoxFuture;
  12. use futures_core::ready;
  13. use nanoid::nanoid;
  14. use pin_project::pin_project;
  15. use std::sync::Arc;
  16. use std::{
  17. collections::HashMap,
  18. fmt,
  19. fmt::{Debug, Display},
  20. future::Future,
  21. hash::Hash,
  22. pin::Pin,
  23. task::{Context, Poll},
  24. };
  25. pub type AFPluginMap = Arc<HashMap<AFPluginEvent, Arc<AFPlugin>>>;
  26. pub(crate) fn as_plugin_map(plugins: Vec<AFPlugin>) -> AFPluginMap {
  27. let mut plugin_map = HashMap::new();
  28. plugins.into_iter().for_each(|m| {
  29. let events = m.events();
  30. let plugins = Arc::new(m);
  31. events.into_iter().for_each(|e| {
  32. plugin_map.insert(e, plugins.clone());
  33. });
  34. });
  35. Arc::new(plugin_map)
  36. }
  37. #[derive(PartialEq, Eq, Hash, Debug, Clone)]
  38. pub struct AFPluginEvent(String);
  39. impl<T: Display + Eq + Hash + Debug + Clone> std::convert::From<T> for AFPluginEvent {
  40. fn from(t: T) -> Self {
  41. AFPluginEvent(format!("{}", t))
  42. }
  43. }
  44. /// A plugin is used to handle the events that the plugin can handle.
  45. ///
  46. /// When an event is a dispatched by the `AFPluginDispatcher`, the dispatcher will
  47. /// find the corresponding plugin to handle the event. The name of the event must be unique,
  48. /// which means only one handler will get called.
  49. ///
  50. pub struct AFPlugin {
  51. pub name: String,
  52. /// a list of `AFPluginState` that the plugin registers. The state can be read by the plugin's handler.
  53. states: Arc<AFPluginStateMap>,
  54. /// Contains a list of factories that are used to generate the services used to handle the passed-in
  55. /// `ServiceRequest`.
  56. ///
  57. event_service_factory:
  58. Arc<HashMap<AFPluginEvent, BoxServiceFactory<(), ServiceRequest, ServiceResponse, DispatchError>>>,
  59. }
  60. impl std::default::Default for AFPlugin {
  61. fn default() -> Self {
  62. Self {
  63. name: "".to_owned(),
  64. states: Arc::new(AFPluginStateMap::new()),
  65. event_service_factory: Arc::new(HashMap::new()),
  66. }
  67. }
  68. }
  69. impl AFPlugin {
  70. pub fn new() -> Self {
  71. AFPlugin::default()
  72. }
  73. pub fn name(mut self, s: &str) -> Self {
  74. self.name = s.to_owned();
  75. self
  76. }
  77. pub fn state<D: 'static + Send + Sync>(mut self, data: D) -> Self {
  78. Arc::get_mut(&mut self.states).unwrap().insert(AFPluginState::new(data));
  79. self
  80. }
  81. pub fn event<E, H, T, R>(mut self, event: E, handler: H) -> Self
  82. where
  83. H: AFPluginHandler<T, R>,
  84. T: FromAFPluginRequest + 'static + Send + Sync,
  85. <T as FromAFPluginRequest>::Future: Sync + Send,
  86. R: Future + 'static + Send + Sync,
  87. R::Output: AFPluginResponder + 'static,
  88. E: Eq + Hash + Debug + Clone + Display,
  89. {
  90. let event: AFPluginEvent = event.into();
  91. if self.event_service_factory.contains_key(&event) {
  92. panic!("Register duplicate Event: {:?}", &event);
  93. } else {
  94. Arc::get_mut(&mut self.event_service_factory)
  95. .unwrap()
  96. .insert(event, factory(AFPluginHandlerService::new(handler)));
  97. }
  98. self
  99. }
  100. pub fn events(&self) -> Vec<AFPluginEvent> {
  101. self.event_service_factory.keys().cloned().collect::<Vec<_>>()
  102. }
  103. }
  104. /// A request that will be passed to the corresponding plugin.
  105. ///
  106. /// Each request can carry the payload that will be deserialized into the corresponding data struct.
  107. ///
  108. #[derive(Debug, Clone)]
  109. pub struct AFPluginRequest {
  110. pub id: String,
  111. pub event: AFPluginEvent,
  112. pub(crate) payload: Payload,
  113. }
  114. impl AFPluginRequest {
  115. pub fn new<E>(event: E) -> Self
  116. where
  117. E: Into<AFPluginEvent>,
  118. {
  119. Self {
  120. id: nanoid!(6),
  121. event: event.into(),
  122. payload: Payload::None,
  123. }
  124. }
  125. pub fn payload<P>(mut self, payload: P) -> Self
  126. where
  127. P: Into<Payload>,
  128. {
  129. self.payload = payload.into();
  130. self
  131. }
  132. }
  133. impl std::fmt::Display for AFPluginRequest {
  134. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  135. write!(f, "{}:{:?}", self.id, self.event)
  136. }
  137. }
  138. impl AFPluginServiceFactory<AFPluginRequest> for AFPlugin {
  139. type Response = AFPluginEventResponse;
  140. type Error = DispatchError;
  141. type Service = BoxService<AFPluginRequest, Self::Response, Self::Error>;
  142. type Context = ();
  143. type Future = BoxFuture<'static, Result<Self::Service, Self::Error>>;
  144. fn new_service(&self, _cfg: Self::Context) -> Self::Future {
  145. let services = self.event_service_factory.clone();
  146. let states = self.states.clone();
  147. Box::pin(async move {
  148. let service = AFPluginService { services, states };
  149. Ok(Box::new(service) as Self::Service)
  150. })
  151. }
  152. }
  153. pub struct AFPluginService {
  154. services: Arc<HashMap<AFPluginEvent, BoxServiceFactory<(), ServiceRequest, ServiceResponse, DispatchError>>>,
  155. states: Arc<AFPluginStateMap>,
  156. }
  157. impl Service<AFPluginRequest> for AFPluginService {
  158. type Response = AFPluginEventResponse;
  159. type Error = DispatchError;
  160. type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
  161. fn call(&self, request: AFPluginRequest) -> Self::Future {
  162. let AFPluginRequest { id, event, payload } = request;
  163. let states = self.states.clone();
  164. let request = AFPluginEventRequest::new(id, event, states);
  165. match self.services.get(&request.event) {
  166. Some(factory) => {
  167. let service_fut = factory.new_service(());
  168. let fut = AFPluginServiceFuture {
  169. fut: Box::pin(async {
  170. let service = service_fut.await?;
  171. let service_req = ServiceRequest::new(request, payload);
  172. service.call(service_req).await
  173. }),
  174. };
  175. Box::pin(async move { Ok(fut.await.unwrap_or_else(|e| e.into())) })
  176. }
  177. None => {
  178. let msg = format!("Can not find service factory for event: {:?}", request.event);
  179. Box::pin(async { Err(InternalError::ServiceNotFound(msg).into()) })
  180. }
  181. }
  182. }
  183. }
  184. #[pin_project]
  185. pub struct AFPluginServiceFuture {
  186. #[pin]
  187. fut: BoxFuture<'static, Result<ServiceResponse, DispatchError>>,
  188. }
  189. impl Future for AFPluginServiceFuture {
  190. type Output = Result<AFPluginEventResponse, DispatchError>;
  191. fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
  192. let (_, response) = ready!(self.as_mut().project().fut.poll(cx))?.into_parts();
  193. Poll::Ready(Ok(response))
  194. }
  195. }