future.rs 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. #![allow(clippy::large_enum_variant)]
  2. #![allow(clippy::type_complexity)]
  3. use crate::retry::FixedInterval;
  4. use pin_project::pin_project;
  5. use std::{
  6. future::Future,
  7. iter::{IntoIterator, Iterator},
  8. pin::Pin,
  9. task::{Context, Poll},
  10. };
  11. use tokio::{
  12. task::JoinHandle,
  13. time::{sleep_until, Duration, Instant, Sleep},
  14. };
  15. #[pin_project(project = RetryStateProj)]
  16. enum RetryState<A>
  17. where
  18. A: Action,
  19. {
  20. Running(#[pin] A::Future),
  21. Sleeping(#[pin] Sleep),
  22. }
  23. impl<A: Action> RetryState<A> {
  24. fn poll(self: Pin<&mut Self>, cx: &mut Context) -> RetryFuturePoll<A> {
  25. match self.project() {
  26. RetryStateProj::Running(future) => RetryFuturePoll::Running(future.poll(cx)),
  27. RetryStateProj::Sleeping(future) => RetryFuturePoll::Sleeping(future.poll(cx)),
  28. }
  29. }
  30. }
  31. enum RetryFuturePoll<A>
  32. where
  33. A: Action,
  34. {
  35. Running(Poll<Result<A::Item, A::Error>>),
  36. Sleeping(Poll<()>),
  37. }
  38. /// Future that drives multiple attempts at an action via a retry strategy.
  39. #[pin_project]
  40. pub struct Retry<I, A>
  41. where
  42. I: Iterator<Item = Duration>,
  43. A: Action,
  44. {
  45. #[pin]
  46. retry_if: RetryIf<I, A, fn(&A::Error) -> bool>,
  47. }
  48. impl<I, A> Retry<I, A>
  49. where
  50. I: Iterator<Item = Duration>,
  51. A: Action,
  52. {
  53. pub fn spawn<T: IntoIterator<IntoIter = I, Item = Duration>>(strategy: T, action: A) -> Retry<I, A> {
  54. Retry {
  55. retry_if: RetryIf::spawn(strategy, action, (|_| true) as fn(&A::Error) -> bool),
  56. }
  57. }
  58. }
  59. impl<I, A> Future for Retry<I, A>
  60. where
  61. I: Iterator<Item = Duration>,
  62. A: Action,
  63. {
  64. type Output = Result<A::Item, A::Error>;
  65. fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
  66. let this = self.project();
  67. this.retry_if.poll(cx)
  68. }
  69. }
  70. /// Future that drives multiple attempts at an action via a retry strategy.
  71. /// Retries are only attempted if the `Error` returned by the future satisfies a
  72. /// given condition.
  73. #[pin_project]
  74. pub struct RetryIf<I, A, C>
  75. where
  76. I: Iterator<Item = Duration>,
  77. A: Action,
  78. C: Condition<A::Error>,
  79. {
  80. strategy: I,
  81. #[pin]
  82. state: RetryState<A>,
  83. action: A,
  84. condition: C,
  85. }
  86. impl<I, A, C> RetryIf<I, A, C>
  87. where
  88. I: Iterator<Item = Duration>,
  89. A: Action,
  90. C: Condition<A::Error>,
  91. {
  92. pub fn spawn<T: IntoIterator<IntoIter = I, Item = Duration>>(
  93. strategy: T,
  94. mut action: A,
  95. condition: C,
  96. ) -> RetryIf<I, A, C> {
  97. RetryIf {
  98. strategy: strategy.into_iter(),
  99. state: RetryState::Running(action.run()),
  100. action,
  101. condition,
  102. }
  103. }
  104. fn attempt(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<A::Item, A::Error>> {
  105. let future = {
  106. let this = self.as_mut().project();
  107. this.action.run()
  108. };
  109. self.as_mut().project().state.set(RetryState::Running(future));
  110. self.poll(cx)
  111. }
  112. fn retry(
  113. mut self: Pin<&mut Self>,
  114. err: A::Error,
  115. cx: &mut Context,
  116. ) -> Result<Poll<Result<A::Item, A::Error>>, A::Error> {
  117. match self.as_mut().project().strategy.next() {
  118. None => Err(err),
  119. Some(duration) => {
  120. let deadline = Instant::now() + duration;
  121. let future = sleep_until(deadline);
  122. self.as_mut().project().state.set(RetryState::Sleeping(future));
  123. Ok(self.poll(cx))
  124. }
  125. }
  126. }
  127. }
  128. impl<I, A, C> Future for RetryIf<I, A, C>
  129. where
  130. I: Iterator<Item = Duration>,
  131. A: Action,
  132. C: Condition<A::Error>,
  133. {
  134. type Output = Result<A::Item, A::Error>;
  135. fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
  136. match self.as_mut().project().state.poll(cx) {
  137. RetryFuturePoll::Running(poll_result) => match poll_result {
  138. Poll::Ready(Ok(ok)) => Poll::Ready(Ok(ok)),
  139. Poll::Pending => Poll::Pending,
  140. Poll::Ready(Err(err)) => {
  141. if self.as_mut().project().condition.should_retry(&err) {
  142. match self.retry(err, cx) {
  143. Ok(poll) => poll,
  144. Err(err) => Poll::Ready(Err(err)),
  145. }
  146. } else {
  147. Poll::Ready(Err(err))
  148. }
  149. }
  150. },
  151. RetryFuturePoll::Sleeping(poll_result) => match poll_result {
  152. Poll::Pending => Poll::Pending,
  153. Poll::Ready(_) => self.attempt(cx),
  154. },
  155. }
  156. }
  157. }
  158. /// An action can be run multiple times and produces a future.
  159. pub trait Action: Send + Sync {
  160. type Future: Future<Output = Result<Self::Item, Self::Error>>;
  161. type Item;
  162. type Error;
  163. fn run(&mut self) -> Self::Future;
  164. }
  165. // impl<R, E, T: Future<Output = Result<R, E>>, F: FnMut() -> T + Send + Sync>
  166. // Action for F { type Future = T;
  167. // type Item = R;
  168. // type Error = E;
  169. //
  170. // fn run(&mut self) -> Self::Future { self() }
  171. // }
  172. pub trait Condition<E> {
  173. fn should_retry(&mut self, error: &E) -> bool;
  174. }
  175. impl<E, F: FnMut(&E) -> bool> Condition<E> for F {
  176. fn should_retry(&mut self, error: &E) -> bool {
  177. self(error)
  178. }
  179. }
  180. pub fn spawn_retry<A: Action + 'static>(
  181. millis: u64,
  182. retry_count: usize,
  183. action: A,
  184. ) -> JoinHandle<Result<A::Item, A::Error>>
  185. where
  186. A::Item: Send + Sync,
  187. A::Error: Send + Sync,
  188. <A as Action>::Future: Send + Sync,
  189. {
  190. let strategy = FixedInterval::from_millis(millis).take(retry_count);
  191. let retry = Retry::spawn(strategy, action);
  192. tokio::spawn(async move { retry.await })
  193. }