#![allow(clippy::large_enum_variant)] #![allow(clippy::type_complexity)] use crate::retry::FixedInterval; use pin_project::pin_project; use std::{ future::Future, iter::{IntoIterator, Iterator}, pin::Pin, task::{Context, Poll}, }; use tokio::time::{sleep_until, Duration, Instant, Sleep}; #[pin_project(project = RetryStateProj)] enum RetryState where A: Action, { Running(#[pin] A::Future), Sleeping(#[pin] Sleep), } impl RetryState { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> RetryFuturePoll { match self.project() { RetryStateProj::Running(future) => RetryFuturePoll::Running(future.poll(cx)), RetryStateProj::Sleeping(future) => RetryFuturePoll::Sleeping(future.poll(cx)), } } } enum RetryFuturePoll where A: Action, { Running(Poll>), Sleeping(Poll<()>), } /// Future that drives multiple attempts at an action via a retry strategy. #[pin_project] pub struct Retry where I: Iterator, A: Action, { #[pin] retry_if: RetryIf bool>, } impl Retry where I: Iterator, A: Action, { pub fn new>( strategy: T, action: A, ) -> Retry { Retry { retry_if: RetryIf::spawn(strategy, action, (|_| true) as fn(&A::Error) -> bool), } } } impl Future for Retry where I: Iterator, A: Action, { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { let this = self.project(); this.retry_if.poll(cx) } } /// Future that drives multiple attempts at an action via a retry strategy. /// Retries are only attempted if the `Error` returned by the future satisfies a /// given condition. #[pin_project] pub struct RetryIf where I: Iterator, A: Action, C: Condition, { strategy: I, #[pin] state: RetryState, action: A, condition: C, } impl RetryIf where I: Iterator, A: Action, C: Condition, { pub fn spawn>( strategy: T, mut action: A, condition: C, ) -> RetryIf { RetryIf { strategy: strategy.into_iter(), state: RetryState::Running(action.run()), action, condition, } } fn attempt(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let future = { let this = self.as_mut().project(); this.action.run() }; self .as_mut() .project() .state .set(RetryState::Running(future)); self.poll(cx) } fn retry( mut self: Pin<&mut Self>, err: A::Error, cx: &mut Context, ) -> Result>, A::Error> { match self.as_mut().project().strategy.next() { None => Err(err), Some(duration) => { let deadline = Instant::now() + duration; let future = sleep_until(deadline); self .as_mut() .project() .state .set(RetryState::Sleeping(future)); Ok(self.poll(cx)) }, } } } impl Future for RetryIf where I: Iterator, A: Action, C: Condition, { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { match self.as_mut().project().state.poll(cx) { RetryFuturePoll::Running(poll_result) => match poll_result { Poll::Ready(Ok(ok)) => Poll::Ready(Ok(ok)), Poll::Pending => Poll::Pending, Poll::Ready(Err(err)) => { if self.as_mut().project().condition.should_retry(&err) { match self.retry(err, cx) { Ok(poll) => poll, Err(err) => Poll::Ready(Err(err)), } } else { Poll::Ready(Err(err)) } }, }, RetryFuturePoll::Sleeping(poll_result) => match poll_result { Poll::Pending => Poll::Pending, Poll::Ready(_) => self.attempt(cx), }, } } } /// An action can be run multiple times and produces a future. pub trait Action: Send + Sync { type Future: Future>; type Item; type Error; fn run(&mut self) -> Self::Future; } // impl>, F: FnMut() -> T + Send + Sync> // Action for F { type Future = T; // type Item = R; // type Error = E; // // fn run(&mut self) -> Self::Future { self() } // } pub trait Condition { fn should_retry(&mut self, error: &E) -> bool; } impl bool> Condition for F { fn should_retry(&mut self, error: &E) -> bool { self(error) } } pub fn spawn_retry( retry_count: usize, retry_per_millis: u64, action: A, ) -> impl Future> where A::Item: Send + Sync, A::Error: Send + Sync, ::Future: Send + Sync, { let strategy = FixedInterval::from_millis(retry_per_millis).take(retry_count); Retry::new(strategy, action) }