#![allow(clippy::large_enum_variant)] #![allow(clippy::type_complexity)] use crate::retry::FixedInterval; use pin_project::pin_project; use std::{ future::Future, iter::{IntoIterator, Iterator}, pin::Pin, task::{Context, Poll}, }; use tokio::{ task::JoinHandle, time::{sleep_until, Duration, Instant, Sleep}, }; #[pin_project(project = RetryStateProj)] enum RetryState where A: Action, { Running(#[pin] A::Future), Sleeping(#[pin] Sleep), } impl RetryState { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> RetryFuturePoll { match self.project() { RetryStateProj::Running(future) => RetryFuturePoll::Running(future.poll(cx)), RetryStateProj::Sleeping(future) => RetryFuturePoll::Sleeping(future.poll(cx)), } } } enum RetryFuturePoll where A: Action, { Running(Poll>), Sleeping(Poll<()>), } /// Future that drives multiple attempts at an action via a retry strategy. #[pin_project] pub struct Retry where I: Iterator, A: Action, { #[pin] retry_if: RetryIf bool>, } impl Retry where I: Iterator, A: Action, { pub fn spawn>(strategy: T, action: A) -> Retry { Retry { retry_if: RetryIf::spawn(strategy, action, (|_| true) as fn(&A::Error) -> bool), } } } impl Future for Retry where I: Iterator, A: Action, { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { let this = self.project(); this.retry_if.poll(cx) } } /// Future that drives multiple attempts at an action via a retry strategy. /// Retries are only attempted if the `Error` returned by the future satisfies a /// given condition. #[pin_project] pub struct RetryIf where I: Iterator, A: Action, C: Condition, { strategy: I, #[pin] state: RetryState, action: A, condition: C, } impl RetryIf where I: Iterator, A: Action, C: Condition, { pub fn spawn>( strategy: T, mut action: A, condition: C, ) -> RetryIf { RetryIf { strategy: strategy.into_iter(), state: RetryState::Running(action.run()), action, condition, } } fn attempt(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let future = { let this = self.as_mut().project(); this.action.run() }; self.as_mut().project().state.set(RetryState::Running(future)); self.poll(cx) } fn retry( mut self: Pin<&mut Self>, err: A::Error, cx: &mut Context, ) -> Result>, A::Error> { match self.as_mut().project().strategy.next() { None => Err(err), Some(duration) => { let deadline = Instant::now() + duration; let future = sleep_until(deadline); self.as_mut().project().state.set(RetryState::Sleeping(future)); Ok(self.poll(cx)) } } } } impl Future for RetryIf where I: Iterator, A: Action, C: Condition, { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { match self.as_mut().project().state.poll(cx) { RetryFuturePoll::Running(poll_result) => match poll_result { Poll::Ready(Ok(ok)) => Poll::Ready(Ok(ok)), Poll::Pending => Poll::Pending, Poll::Ready(Err(err)) => { if self.as_mut().project().condition.should_retry(&err) { match self.retry(err, cx) { Ok(poll) => poll, Err(err) => Poll::Ready(Err(err)), } } else { Poll::Ready(Err(err)) } } }, RetryFuturePoll::Sleeping(poll_result) => match poll_result { Poll::Pending => Poll::Pending, Poll::Ready(_) => self.attempt(cx), }, } } } /// An action can be run multiple times and produces a future. pub trait Action: Send + Sync { type Future: Future>; type Item; type Error; fn run(&mut self) -> Self::Future; } // impl>, F: FnMut() -> T + Send + Sync> // Action for F { type Future = T; // type Item = R; // type Error = E; // // fn run(&mut self) -> Self::Future { self() } // } pub trait Condition { fn should_retry(&mut self, error: &E) -> bool; } impl bool> Condition for F { fn should_retry(&mut self, error: &E) -> bool { self(error) } } pub fn spawn_retry( millis: u64, retry_count: usize, action: A, ) -> JoinHandle> where A::Item: Send + Sync, A::Error: Send + Sync, ::Future: Send + Sync, { let strategy = FixedInterval::from_millis(millis).take(retry_count); let retry = Retry::spawn(strategy, action); tokio::spawn(async move { retry.await }) }