handler.rs 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. use std::{
  2. future::Future,
  3. marker::PhantomData,
  4. pin::Pin,
  5. task::{Context, Poll},
  6. };
  7. use futures_core::ready;
  8. use pin_project::pin_project;
  9. use crate::{
  10. errors::DispatchError,
  11. request::{payload::Payload, AFPluginEventRequest, FromAFPluginRequest},
  12. response::{AFPluginEventResponse, AFPluginResponder},
  13. service::{AFPluginServiceFactory, Service, ServiceRequest, ServiceResponse},
  14. util::ready::*,
  15. };
  16. /// A closure that is run every time for the specified plugin event
  17. pub trait AFPluginHandler<T, R>: Clone + 'static + Sync + Send
  18. where
  19. R: Future + Send + Sync,
  20. R::Output: AFPluginResponder,
  21. {
  22. fn call(&self, param: T) -> R;
  23. }
  24. pub struct AFPluginHandlerService<H, T, R>
  25. where
  26. H: AFPluginHandler<T, R>,
  27. T: FromAFPluginRequest,
  28. R: Future + Sync + Send,
  29. R::Output: AFPluginResponder,
  30. {
  31. handler: H,
  32. _phantom: PhantomData<(T, R)>,
  33. }
  34. impl<H, T, R> AFPluginHandlerService<H, T, R>
  35. where
  36. H: AFPluginHandler<T, R>,
  37. T: FromAFPluginRequest,
  38. R: Future + Sync + Send,
  39. R::Output: AFPluginResponder,
  40. {
  41. pub fn new(handler: H) -> Self {
  42. Self {
  43. handler,
  44. _phantom: PhantomData,
  45. }
  46. }
  47. }
  48. impl<H, T, R> Clone for AFPluginHandlerService<H, T, R>
  49. where
  50. H: AFPluginHandler<T, R>,
  51. T: FromAFPluginRequest,
  52. R: Future + Sync + Send,
  53. R::Output: AFPluginResponder,
  54. {
  55. fn clone(&self) -> Self {
  56. Self {
  57. handler: self.handler.clone(),
  58. _phantom: PhantomData,
  59. }
  60. }
  61. }
  62. impl<F, T, R> AFPluginServiceFactory<ServiceRequest> for AFPluginHandlerService<F, T, R>
  63. where
  64. F: AFPluginHandler<T, R>,
  65. T: FromAFPluginRequest,
  66. R: Future + Send + Sync,
  67. R::Output: AFPluginResponder,
  68. {
  69. type Response = ServiceResponse;
  70. type Error = DispatchError;
  71. type Service = Self;
  72. type Context = ();
  73. type Future = Ready<Result<Self::Service, Self::Error>>;
  74. fn new_service(&self, _: ()) -> Self::Future {
  75. ready(Ok(self.clone()))
  76. }
  77. }
  78. impl<H, T, R> Service<ServiceRequest> for AFPluginHandlerService<H, T, R>
  79. where
  80. H: AFPluginHandler<T, R>,
  81. T: FromAFPluginRequest,
  82. R: Future + Sync + Send,
  83. R::Output: AFPluginResponder,
  84. {
  85. type Response = ServiceResponse;
  86. type Error = DispatchError;
  87. type Future = HandlerServiceFuture<H, T, R>;
  88. fn call(&self, req: ServiceRequest) -> Self::Future {
  89. let (req, mut payload) = req.into_parts();
  90. let fut = T::from_request(&req, &mut payload);
  91. HandlerServiceFuture::Extract(fut, Some(req), self.handler.clone())
  92. }
  93. }
  94. #[pin_project(project = HandlerServiceProj)]
  95. pub enum HandlerServiceFuture<H, T, R>
  96. where
  97. H: AFPluginHandler<T, R>,
  98. T: FromAFPluginRequest,
  99. R: Future + Sync + Send,
  100. R::Output: AFPluginResponder,
  101. {
  102. Extract(#[pin] T::Future, Option<AFPluginEventRequest>, H),
  103. Handle(#[pin] R, Option<AFPluginEventRequest>),
  104. }
  105. impl<F, T, R> Future for HandlerServiceFuture<F, T, R>
  106. where
  107. F: AFPluginHandler<T, R>,
  108. T: FromAFPluginRequest,
  109. R: Future + Sync + Send,
  110. R::Output: AFPluginResponder,
  111. {
  112. type Output = Result<ServiceResponse, DispatchError>;
  113. fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
  114. loop {
  115. match self.as_mut().project() {
  116. HandlerServiceProj::Extract(fut, req, handle) => {
  117. match ready!(fut.poll(cx)) {
  118. Ok(params) => {
  119. let fut = handle.call(params);
  120. let state = HandlerServiceFuture::Handle(fut, req.take());
  121. self.as_mut().set(state);
  122. },
  123. Err(err) => {
  124. let req = req.take().unwrap();
  125. let system_err: DispatchError = err.into();
  126. let res: AFPluginEventResponse = system_err.into();
  127. return Poll::Ready(Ok(ServiceResponse::new(req, res)));
  128. },
  129. };
  130. },
  131. HandlerServiceProj::Handle(fut, req) => {
  132. let result = ready!(fut.poll(cx));
  133. let req = req.take().unwrap();
  134. let resp = result.respond_to(&req);
  135. return Poll::Ready(Ok(ServiceResponse::new(req, resp)));
  136. },
  137. }
  138. }
  139. }
  140. }
  141. macro_rules! factory_tuple ({ $($param:ident)* } => {
  142. impl<Func, $($param,)* Res> AFPluginHandler<($($param,)*), Res> for Func
  143. where Func: Fn($($param),*) -> Res + Clone + 'static + Sync + Send,
  144. Res: Future + Sync + Send,
  145. Res::Output: AFPluginResponder,
  146. {
  147. #[allow(non_snake_case)]
  148. fn call(&self, ($($param,)*): ($($param,)*)) -> Res {
  149. (self)($($param,)*)
  150. }
  151. }
  152. });
  153. macro_rules! tuple_from_req ({$tuple_type:ident, $(($n:tt, $T:ident)),+} => {
  154. #[allow(non_snake_case)]
  155. mod $tuple_type {
  156. use super::*;
  157. #[pin_project::pin_project]
  158. struct FromRequestFutures<$($T: FromAFPluginRequest),+>($(#[pin] $T::Future),+);
  159. /// FromRequest implementation for tuple
  160. #[doc(hidden)]
  161. #[allow(unused_parens)]
  162. impl<$($T: FromAFPluginRequest + 'static),+> FromAFPluginRequest for ($($T,)+)
  163. {
  164. type Error = DispatchError;
  165. type Future = $tuple_type<$($T),+>;
  166. fn from_request(req: &AFPluginEventRequest, payload: &mut Payload) -> Self::Future {
  167. $tuple_type {
  168. items: <($(Option<$T>,)+)>::default(),
  169. futs: FromRequestFutures($($T::from_request(req, payload),)+),
  170. }
  171. }
  172. }
  173. #[doc(hidden)]
  174. #[pin_project::pin_project]
  175. pub struct $tuple_type<$($T: FromAFPluginRequest),+> {
  176. items: ($(Option<$T>,)+),
  177. #[pin]
  178. futs: FromRequestFutures<$($T,)+>,
  179. }
  180. impl<$($T: FromAFPluginRequest),+> Future for $tuple_type<$($T),+>
  181. {
  182. type Output = Result<($($T,)+), DispatchError>;
  183. fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
  184. let mut this = self.project();
  185. let mut ready = true;
  186. $(
  187. if this.items.$n.is_none() {
  188. match this.futs.as_mut().project().$n.poll(cx) {
  189. Poll::Ready(Ok(item)) => this.items.$n = Some(item),
  190. Poll::Pending => ready = false,
  191. Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
  192. }
  193. }
  194. )+
  195. if ready {
  196. Poll::Ready(Ok(
  197. ($(this.items.$n.take().unwrap(),)+)
  198. ))
  199. } else {
  200. Poll::Pending
  201. }
  202. }
  203. }
  204. }
  205. });
  206. factory_tuple! {}
  207. factory_tuple! { A }
  208. factory_tuple! { A B }
  209. factory_tuple! { A B C }
  210. factory_tuple! { A B C D }
  211. factory_tuple! { A B C D E }
  212. #[rustfmt::skip]
  213. mod m {
  214. use super::*;
  215. tuple_from_req!(TupleFromRequest1, (0, A));
  216. tuple_from_req!(TupleFromRequest2, (0, A), (1, B));
  217. tuple_from_req!(TupleFromRequest3, (0, A), (1, B), (2, C));
  218. tuple_from_req!(TupleFromRequest4, (0, A), (1, B), (2, C), (3, D));
  219. tuple_from_req!(TupleFromRequest5, (0, A), (1, B), (2, C), (3, D), (4, E));
  220. }