future.rs 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. #![allow(clippy::large_enum_variant)]
  2. #![allow(clippy::type_complexity)]
  3. use crate::retry::FixedInterval;
  4. use pin_project::pin_project;
  5. use std::{
  6. future::Future,
  7. iter::{IntoIterator, Iterator},
  8. pin::Pin,
  9. task::{Context, Poll},
  10. };
  11. use tokio::time::{sleep_until, Duration, Instant, Sleep};
  12. #[pin_project(project = RetryStateProj)]
  13. enum RetryState<A>
  14. where
  15. A: Action,
  16. {
  17. Running(#[pin] A::Future),
  18. Sleeping(#[pin] Sleep),
  19. }
  20. impl<A: Action> RetryState<A> {
  21. fn poll(self: Pin<&mut Self>, cx: &mut Context) -> RetryFuturePoll<A> {
  22. match self.project() {
  23. RetryStateProj::Running(future) => RetryFuturePoll::Running(future.poll(cx)),
  24. RetryStateProj::Sleeping(future) => RetryFuturePoll::Sleeping(future.poll(cx)),
  25. }
  26. }
  27. }
  28. enum RetryFuturePoll<A>
  29. where
  30. A: Action,
  31. {
  32. Running(Poll<Result<A::Item, A::Error>>),
  33. Sleeping(Poll<()>),
  34. }
  35. /// Future that drives multiple attempts at an action via a retry strategy.
  36. #[pin_project]
  37. pub struct Retry<I, A>
  38. where
  39. I: Iterator<Item = Duration>,
  40. A: Action,
  41. {
  42. #[pin]
  43. retry_if: RetryIf<I, A, fn(&A::Error) -> bool>,
  44. }
  45. impl<I, A> Retry<I, A>
  46. where
  47. I: Iterator<Item = Duration>,
  48. A: Action,
  49. {
  50. pub fn new<T: IntoIterator<IntoIter = I, Item = Duration>>(
  51. strategy: T,
  52. action: A,
  53. ) -> Retry<I, A> {
  54. Retry {
  55. retry_if: RetryIf::spawn(strategy, action, (|_| true) as fn(&A::Error) -> bool),
  56. }
  57. }
  58. }
  59. impl<I, A> Future for Retry<I, A>
  60. where
  61. I: Iterator<Item = Duration>,
  62. A: Action,
  63. {
  64. type Output = Result<A::Item, A::Error>;
  65. fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
  66. let this = self.project();
  67. this.retry_if.poll(cx)
  68. }
  69. }
  70. /// Future that drives multiple attempts at an action via a retry strategy.
  71. /// Retries are only attempted if the `Error` returned by the future satisfies a
  72. /// given condition.
  73. #[pin_project]
  74. pub struct RetryIf<I, A, C>
  75. where
  76. I: Iterator<Item = Duration>,
  77. A: Action,
  78. C: Condition<A::Error>,
  79. {
  80. strategy: I,
  81. #[pin]
  82. state: RetryState<A>,
  83. action: A,
  84. condition: C,
  85. }
  86. impl<I, A, C> RetryIf<I, A, C>
  87. where
  88. I: Iterator<Item = Duration>,
  89. A: Action,
  90. C: Condition<A::Error>,
  91. {
  92. pub fn spawn<T: IntoIterator<IntoIter = I, Item = Duration>>(
  93. strategy: T,
  94. mut action: A,
  95. condition: C,
  96. ) -> RetryIf<I, A, C> {
  97. RetryIf {
  98. strategy: strategy.into_iter(),
  99. state: RetryState::Running(action.run()),
  100. action,
  101. condition,
  102. }
  103. }
  104. fn attempt(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<A::Item, A::Error>> {
  105. let future = {
  106. let this = self.as_mut().project();
  107. this.action.run()
  108. };
  109. self
  110. .as_mut()
  111. .project()
  112. .state
  113. .set(RetryState::Running(future));
  114. self.poll(cx)
  115. }
  116. fn retry(
  117. mut self: Pin<&mut Self>,
  118. err: A::Error,
  119. cx: &mut Context,
  120. ) -> Result<Poll<Result<A::Item, A::Error>>, A::Error> {
  121. match self.as_mut().project().strategy.next() {
  122. None => Err(err),
  123. Some(duration) => {
  124. let deadline = Instant::now() + duration;
  125. let future = sleep_until(deadline);
  126. self
  127. .as_mut()
  128. .project()
  129. .state
  130. .set(RetryState::Sleeping(future));
  131. Ok(self.poll(cx))
  132. },
  133. }
  134. }
  135. }
  136. impl<I, A, C> Future for RetryIf<I, A, C>
  137. where
  138. I: Iterator<Item = Duration>,
  139. A: Action,
  140. C: Condition<A::Error>,
  141. {
  142. type Output = Result<A::Item, A::Error>;
  143. fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
  144. match self.as_mut().project().state.poll(cx) {
  145. RetryFuturePoll::Running(poll_result) => match poll_result {
  146. Poll::Ready(Ok(ok)) => Poll::Ready(Ok(ok)),
  147. Poll::Pending => Poll::Pending,
  148. Poll::Ready(Err(err)) => {
  149. if self.as_mut().project().condition.should_retry(&err) {
  150. match self.retry(err, cx) {
  151. Ok(poll) => poll,
  152. Err(err) => Poll::Ready(Err(err)),
  153. }
  154. } else {
  155. Poll::Ready(Err(err))
  156. }
  157. },
  158. },
  159. RetryFuturePoll::Sleeping(poll_result) => match poll_result {
  160. Poll::Pending => Poll::Pending,
  161. Poll::Ready(_) => self.attempt(cx),
  162. },
  163. }
  164. }
  165. }
  166. /// An action can be run multiple times and produces a future.
  167. pub trait Action: Send + Sync {
  168. type Future: Future<Output = Result<Self::Item, Self::Error>>;
  169. type Item;
  170. type Error;
  171. fn run(&mut self) -> Self::Future;
  172. }
  173. // impl<R, E, T: Future<Output = Result<R, E>>, F: FnMut() -> T + Send + Sync>
  174. // Action for F { type Future = T;
  175. // type Item = R;
  176. // type Error = E;
  177. //
  178. // fn run(&mut self) -> Self::Future { self() }
  179. // }
  180. pub trait Condition<E> {
  181. fn should_retry(&mut self, error: &E) -> bool;
  182. }
  183. impl<E, F: FnMut(&E) -> bool> Condition<E> for F {
  184. fn should_retry(&mut self, error: &E) -> bool {
  185. self(error)
  186. }
  187. }
  188. pub fn spawn_retry<A: Action + 'static>(
  189. retry_count: usize,
  190. retry_per_millis: u64,
  191. action: A,
  192. ) -> impl Future<Output = Result<A::Item, A::Error>>
  193. where
  194. A::Item: Send + Sync,
  195. A::Error: Send + Sync,
  196. <A as Action>::Future: Send + Sync,
  197. {
  198. let strategy = FixedInterval::from_millis(retry_per_millis).take(retry_count);
  199. Retry::new(strategy, action)
  200. }