module.rs 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. use std::{
  2. collections::HashMap,
  3. fmt,
  4. fmt::{Debug, Display},
  5. future::Future,
  6. hash::Hash,
  7. pin::Pin,
  8. task::{Context, Poll},
  9. };
  10. use futures_core::ready;
  11. use pin_project::pin_project;
  12. use crate::{
  13. errors::{DispatchError, InternalError},
  14. module::{container::ModuleDataMap, Unit},
  15. request::{payload::Payload, EventRequest, FromRequest},
  16. response::{EventResponse, Responder},
  17. service::{
  18. factory,
  19. BoxService,
  20. BoxServiceFactory,
  21. Handler,
  22. HandlerService,
  23. Service,
  24. ServiceFactory,
  25. ServiceRequest,
  26. ServiceResponse,
  27. },
  28. };
  29. use futures_core::future::BoxFuture;
  30. use std::sync::Arc;
  31. pub type ModuleMap = Arc<HashMap<Event, Arc<Module>>>;
  32. pub(crate) fn as_module_map(modules: Vec<Module>) -> ModuleMap {
  33. let mut module_map = HashMap::new();
  34. modules.into_iter().for_each(|m| {
  35. let events = m.events();
  36. let module = Arc::new(m);
  37. events.into_iter().for_each(|e| {
  38. module_map.insert(e, module.clone());
  39. });
  40. });
  41. Arc::new(module_map)
  42. }
  43. #[derive(PartialEq, Eq, Hash, Debug, Clone)]
  44. pub struct Event(String);
  45. impl<T: Display + Eq + Hash + Debug + Clone> std::convert::From<T> for Event {
  46. fn from(t: T) -> Self { Event(format!("{}", t)) }
  47. }
  48. pub type EventServiceFactory =
  49. BoxServiceFactory<(), ServiceRequest, ServiceResponse, DispatchError>;
  50. pub struct Module {
  51. pub name: String,
  52. module_data: Arc<ModuleDataMap>,
  53. service_map: Arc<HashMap<Event, EventServiceFactory>>,
  54. }
  55. impl Module {
  56. pub fn new() -> Self {
  57. Self {
  58. name: "".to_owned(),
  59. module_data: Arc::new(ModuleDataMap::new()),
  60. service_map: Arc::new(HashMap::new()),
  61. }
  62. }
  63. pub fn name(mut self, s: &str) -> Self {
  64. self.name = s.to_owned();
  65. self
  66. }
  67. pub fn data<D: 'static + Send + Sync>(mut self, data: D) -> Self {
  68. Arc::get_mut(&mut self.module_data)
  69. .unwrap()
  70. .insert(Unit::new(data));
  71. self
  72. }
  73. pub fn event<E, H, T, R>(mut self, event: E, handler: H) -> Self
  74. where
  75. H: Handler<T, R>,
  76. T: FromRequest + 'static + Send + Sync,
  77. <T as FromRequest>::Future: Sync + Send,
  78. R: Future + 'static + Send + Sync,
  79. R::Output: Responder + 'static,
  80. E: Eq + Hash + Debug + Clone + Display,
  81. {
  82. let event: Event = event.into();
  83. if self.service_map.contains_key(&event) {
  84. log::error!("Duplicate Event: {:?}", &event);
  85. }
  86. Arc::get_mut(&mut self.service_map)
  87. .unwrap()
  88. .insert(event, factory(HandlerService::new(handler)));
  89. self
  90. }
  91. pub fn events(&self) -> Vec<Event> {
  92. self.service_map
  93. .keys()
  94. .map(|key| key.clone())
  95. .collect::<Vec<_>>()
  96. }
  97. }
  98. #[derive(Debug)]
  99. pub struct ModuleRequest {
  100. pub id: String,
  101. pub event: Event,
  102. pub(crate) payload: Payload,
  103. }
  104. impl ModuleRequest {
  105. pub fn new<E>(event: E) -> Self
  106. where
  107. E: Into<Event>,
  108. {
  109. Self {
  110. id: uuid::Uuid::new_v4().to_string(),
  111. event: event.into(),
  112. payload: Payload::None,
  113. }
  114. }
  115. pub fn payload<P>(mut self, payload: P) -> Self
  116. where
  117. P: Into<Payload>,
  118. {
  119. self.payload = payload.into();
  120. self
  121. }
  122. }
  123. impl std::fmt::Display for ModuleRequest {
  124. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  125. write!(f, "{}:{:?}", self.id, self.event)
  126. }
  127. }
  128. impl ServiceFactory<ModuleRequest> for Module {
  129. type Response = EventResponse;
  130. type Error = DispatchError;
  131. type Service = BoxService<ModuleRequest, Self::Response, Self::Error>;
  132. type Context = ();
  133. type Future = BoxFuture<'static, Result<Self::Service, Self::Error>>;
  134. fn new_service(&self, _cfg: Self::Context) -> Self::Future {
  135. let service_map = self.service_map.clone();
  136. let module_data = self.module_data.clone();
  137. Box::pin(async move {
  138. let service = ModuleService {
  139. service_map,
  140. module_data,
  141. };
  142. let module_service = Box::new(service) as Self::Service;
  143. Ok(module_service)
  144. })
  145. }
  146. }
  147. pub struct ModuleService {
  148. service_map: Arc<HashMap<Event, EventServiceFactory>>,
  149. module_data: Arc<ModuleDataMap>,
  150. }
  151. impl Service<ModuleRequest> for ModuleService {
  152. type Response = EventResponse;
  153. type Error = DispatchError;
  154. type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
  155. fn call(&self, request: ModuleRequest) -> Self::Future {
  156. let ModuleRequest { id, event, payload } = request;
  157. let module_data = self.module_data.clone();
  158. let request = EventRequest::new(id.clone(), event, module_data);
  159. match self.service_map.get(&request.event) {
  160. Some(factory) => {
  161. let service_fut = factory.new_service(());
  162. let fut = ModuleServiceFuture {
  163. fut: Box::pin(async {
  164. let service = service_fut.await?;
  165. let service_req = ServiceRequest::new(request, payload);
  166. service.call(service_req).await
  167. }),
  168. };
  169. Box::pin(async move { Ok(fut.await.unwrap_or_else(|e| e.into())) })
  170. },
  171. None => {
  172. let msg = format!(
  173. "Can not find service factory for event: {:?}",
  174. request.event
  175. );
  176. Box::pin(async { Err(InternalError::ServiceNotFound(msg).into()) })
  177. },
  178. }
  179. }
  180. }
  181. #[pin_project]
  182. pub struct ModuleServiceFuture {
  183. #[pin]
  184. fut: BoxFuture<'static, Result<ServiceResponse, DispatchError>>,
  185. }
  186. impl Future for ModuleServiceFuture {
  187. type Output = Result<EventResponse, DispatchError>;
  188. fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
  189. loop {
  190. let (_, response) = ready!(self.as_mut().project().fut.poll(cx))?.into_parts();
  191. return Poll::Ready(Ok(response));
  192. }
  193. }
  194. }
  195. // #[cfg(test)]
  196. // mod tests {
  197. // use super::*;
  198. // use crate::rt::Runtime;
  199. // use futures_util::{future, pin_mut};
  200. // use tokio::sync::mpsc::unbounded_channel;
  201. // pub async fn hello_service() -> String { "hello".to_string() }
  202. // #[test]
  203. // fn test() {
  204. // let runtime = Runtime::new().unwrap();
  205. // runtime.block_on(async {
  206. // let (sys_tx, mut sys_rx) = unbounded_channel::<SystemCommand>();
  207. // let event = "hello".to_string();
  208. // let module = Module::new(sys_tx).event(event.clone(),
  209. // hello_service); let req_tx = module.req_tx();
  210. // let event = async move {
  211. // let request = EventRequest::new(event.clone());
  212. // req_tx.send(request).unwrap();
  213. //
  214. // match sys_rx.recv().await {
  215. // Some(cmd) => {
  216. // log::info!("{:?}", cmd);
  217. // },
  218. // None => panic!(""),
  219. // }
  220. // };
  221. //
  222. // pin_mut!(module, event);
  223. // future::select(module, event).await;
  224. // });
  225. // }
  226. // }