request.rs 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. use std::future::Future;
  2. use crate::{
  3. error::{InternalError, SystemError},
  4. module::Event,
  5. request::payload::Payload,
  6. util::ready::{ready, Ready},
  7. };
  8. use futures_core::ready;
  9. use std::{
  10. fmt::Debug,
  11. ops,
  12. pin::Pin,
  13. task::{Context, Poll},
  14. };
  15. #[derive(Clone, Debug)]
  16. pub struct EventRequest {
  17. pub(crate) id: String,
  18. pub(crate) event: Event,
  19. }
  20. impl EventRequest {
  21. pub fn new<E>(event: E, id: String) -> EventRequest
  22. where
  23. E: Into<Event>,
  24. {
  25. Self {
  26. id,
  27. event: event.into(),
  28. }
  29. }
  30. }
  31. pub trait FromRequest: Sized {
  32. type Error: Into<SystemError>;
  33. type Future: Future<Output = Result<Self, Self::Error>>;
  34. fn from_request(req: &EventRequest, payload: &mut Payload) -> Self::Future;
  35. }
  36. #[doc(hidden)]
  37. impl FromRequest for () {
  38. type Error = SystemError;
  39. type Future = Ready<Result<(), SystemError>>;
  40. fn from_request(_req: &EventRequest, _payload: &mut Payload) -> Self::Future { ready(Ok(())) }
  41. }
  42. #[doc(hidden)]
  43. impl FromRequest for String {
  44. type Error = SystemError;
  45. type Future = Ready<Result<Self, Self::Error>>;
  46. fn from_request(req: &EventRequest, payload: &mut Payload) -> Self::Future {
  47. match &payload {
  48. Payload::None => ready(Err(unexpected_none_payload(req))),
  49. Payload::Bytes(buf) => ready(Ok(String::from_utf8_lossy(buf).into_owned())),
  50. }
  51. }
  52. }
  53. fn unexpected_none_payload(request: &EventRequest) -> SystemError {
  54. log::warn!("{:?} expected payload", &request.event);
  55. InternalError::new("Expected payload").into()
  56. }
  57. #[doc(hidden)]
  58. impl<T> FromRequest for Result<T, T::Error>
  59. where
  60. T: FromRequest,
  61. {
  62. type Error = SystemError;
  63. type Future = FromRequestFuture<T::Future>;
  64. fn from_request(req: &EventRequest, payload: &mut Payload) -> Self::Future {
  65. FromRequestFuture {
  66. fut: T::from_request(req, payload),
  67. }
  68. }
  69. }
  70. #[pin_project::pin_project]
  71. pub struct FromRequestFuture<Fut> {
  72. #[pin]
  73. fut: Fut,
  74. }
  75. impl<Fut, T, E> Future for FromRequestFuture<Fut>
  76. where
  77. Fut: Future<Output = Result<T, E>>,
  78. {
  79. type Output = Result<Result<T, E>, SystemError>;
  80. fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
  81. let this = self.project();
  82. let res = ready!(this.fut.poll(cx));
  83. Poll::Ready(Ok(res))
  84. }
  85. }
  86. pub struct Data<T>(pub T);
  87. impl<T> Data<T> {
  88. pub fn into_inner(self) -> T { self.0 }
  89. }
  90. impl<T> ops::Deref for Data<T> {
  91. type Target = T;
  92. fn deref(&self) -> &T { &self.0 }
  93. }
  94. impl<T> ops::DerefMut for Data<T> {
  95. fn deref_mut(&mut self) -> &mut T { &mut self.0 }
  96. }
  97. #[cfg(feature = "use_serde")]
  98. impl<T> FromRequest for Data<T>
  99. where
  100. T: serde::de::DeserializeOwned + 'static,
  101. {
  102. type Error = SystemError;
  103. type Future = Ready<Result<Self, SystemError>>;
  104. #[inline]
  105. fn from_request(req: &EventRequest, payload: &mut Payload) -> Self::Future {
  106. match payload {
  107. Payload::None => ready(Err(unexpected_none_payload(req))),
  108. Payload::Bytes(bytes) => {
  109. let s = String::from_utf8_lossy(bytes);
  110. match serde_json::from_str(s.as_ref()) {
  111. Ok(data) => ready(Ok(Data(data))),
  112. Err(e) => ready(Err(InternalError::new(format!("{:?}", e)).into())),
  113. }
  114. },
  115. }
  116. }
  117. }
  118. pub trait FromBytes: Sized {
  119. fn parse_from_bytes(bytes: &Vec<u8>) -> Result<Self, SystemError>;
  120. }
  121. #[cfg(not(feature = "use_serde"))]
  122. impl<T> FromRequest for Data<T>
  123. where
  124. T: FromBytes + 'static,
  125. {
  126. type Error = SystemError;
  127. type Future = Ready<Result<Self, SystemError>>;
  128. #[inline]
  129. fn from_request(req: &EventRequest, payload: &mut Payload) -> Self::Future {
  130. match payload {
  131. Payload::None => ready(Err(unexpected_none_payload(req))),
  132. Payload::Bytes(bytes) => {
  133. let data = T::parse_from_bytes(bytes).unwrap();
  134. ready(Ok(Data(data)))
  135. },
  136. }
  137. }
  138. }