瀏覽代碼

retry if ws send new_doc_user message failed

appflowy 3 年之前
父節點
當前提交
d7300dd7e2

+ 2 - 2
backend/src/service/doc/ws_actor.rs

@@ -80,8 +80,8 @@ impl DocWsActor {
         }
     }
 
-    async fn handle_new_doc_user(&self, socket: Socket, data: Vec<u8>) -> DocResult<()> {
-        let user = spawn_blocking(move || {
+    async fn handle_new_doc_user(&self, _socket: Socket, data: Vec<u8>) -> DocResult<()> {
+        let _user = spawn_blocking(move || {
             let user: NewDocUser = parse_from_bytes(&data)?;
             DocResult::Ok(user)
         })

+ 1 - 1
rust-lib/flowy-document/src/services/doc/revision/actor.rs

@@ -2,7 +2,7 @@ use crate::{
     entities::doc::{RevId, Revision, RevisionRange},
     errors::{internal_error, DocError, DocResult},
     services::doc::revision::{util::RevisionOperation, DocRevision, RevisionServer},
-    sql_tables::{DocTableSql, RevState, RevTableSql},
+    sql_tables::{RevState, RevTableSql},
 };
 use async_stream::stream;
 use dashmap::DashMap;

+ 21 - 19
rust-lib/flowy-document/src/services/doc/revision/manager.rs

@@ -1,19 +1,19 @@
 use crate::{
-    entities::doc::{RevType, Revision, RevisionRange},
-    errors::DocError,
+    entities::doc::{RevId, RevType, Revision, RevisionRange},
+    errors::{DocError, DocResult},
     services::{
-        doc::revision::actor::{RevisionCmd, RevisionStoreActor},
+        doc::revision::{
+            actor::{RevisionCmd, RevisionStoreActor},
+            util::NotifyOpenDocAction,
+        },
         util::RevIdCounter,
         ws::WsDocumentSender,
     },
 };
-
-use crate::{
-    entities::doc::{NewDocUser, RevId},
-    errors::DocResult,
+use flowy_infra::{
+    future::ResultFuture,
+    retry::{ExponentialBackoff, Retry},
 };
-use flowy_database::ConnectionPool;
-use flowy_infra::future::ResultFuture;
 use flowy_ot::core::Delta;
 use parking_lot::RwLock;
 use std::{collections::VecDeque, sync::Arc};
@@ -112,15 +112,17 @@ impl RevisionManager {
     }
 }
 
+// FIXME:
+// user_id may be invalid if the user switch to another account while
+// theNotifyOpenDocAction is flying
 fn notify_open_doc(ws: &Arc<dyn WsDocumentSender>, user_id: &str, doc_id: &str, rev_id: &RevId) {
-    let new_doc_user = NewDocUser {
-        user_id: user_id.to_string(),
-        rev_id: rev_id.clone().into(),
-        doc_id: doc_id.to_string(),
-    };
-
-    match ws.send(new_doc_user.into()) {
-        Ok(_) => {},
-        Err(e) => log::error!("Send new_doc_user failed: {:?}", e),
-    }
+    let action = NotifyOpenDocAction::new(user_id, doc_id, rev_id, ws);
+    let strategy = ExponentialBackoff::from_millis(50).take(3);
+    let retry = Retry::spawn(strategy, action);
+    tokio::spawn(async move {
+        match retry.await {
+            Ok(_) => {},
+            Err(e) => log::error!("Notify open doc failed: {}", e),
+        }
+    });
 }

+ 46 - 1
rust-lib/flowy-document/src/services/doc/revision/util.rs

@@ -1,4 +1,12 @@
-use crate::{entities::doc::Revision, errors::DocResult, sql_tables::RevState};
+use crate::{
+    entities::doc::{NewDocUser, RevId, Revision},
+    errors::{DocError, DocResult},
+    services::ws::WsDocumentSender,
+    sql_tables::RevState,
+};
+use flowy_infra::retry::Action;
+use futures::future::BoxFuture;
+use std::{future, sync::Arc};
 use tokio::sync::oneshot;
 
 pub type Sender = oneshot::Sender<DocResult<()>>;
@@ -41,3 +49,40 @@ impl std::ops::Deref for RevisionOperation {
 
     fn deref(&self) -> &Self::Target { &self.inner }
 }
+
+pub(crate) struct NotifyOpenDocAction {
+    user_id: String,
+    rev_id: RevId,
+    doc_id: String,
+    ws: Arc<dyn WsDocumentSender>,
+}
+
+impl NotifyOpenDocAction {
+    pub(crate) fn new(user_id: &str, doc_id: &str, rev_id: &RevId, ws: &Arc<dyn WsDocumentSender>) -> Self {
+        Self {
+            user_id: user_id.to_owned(),
+            rev_id: rev_id.clone(),
+            doc_id: doc_id.to_owned(),
+            ws: ws.clone(),
+        }
+    }
+}
+
+impl Action for NotifyOpenDocAction {
+    type Future = BoxFuture<'static, Result<Self::Item, Self::Error>>;
+    type Item = ();
+    type Error = DocError;
+
+    fn run(&mut self) -> Self::Future {
+        let new_doc_user = NewDocUser {
+            user_id: self.user_id.clone(),
+            rev_id: self.rev_id.clone().into(),
+            doc_id: self.doc_id.clone(),
+        };
+
+        match self.ws.send(new_doc_user.into()) {
+            Ok(_) => Box::pin(future::ready(Ok::<(), DocError>(()))),
+            Err(e) => Box::pin(future::ready(Err::<(), DocError>(e))),
+        }
+    }
+}

+ 4 - 2
rust-lib/flowy-infra/Cargo.toml

@@ -17,5 +17,7 @@ protobuf = {version = "2.18.0"}
 log = "0.4.14"
 chrono = "0.4.19"
 bytes = { version = "1.0" }
-pin-project = "1.0.0"
-futures-core = { version = "0.3", default-features = false }
+pin-project = "1.0"
+futures-core = { version = "0.3", default-features = false }
+tokio = { version = "1.0", features = ["time"] }
+rand = "0.8.3"

+ 1 - 0
rust-lib/flowy-infra/src/lib.rs

@@ -7,6 +7,7 @@ extern crate diesel_derives;
 pub mod future;
 pub mod kv;
 mod protobuf;
+pub mod retry;
 
 #[allow(dead_code)]
 pub fn uuid() -> String { uuid::Uuid::new_v4().to_string() }

+ 189 - 0
rust-lib/flowy-infra/src/retry/future.rs

@@ -0,0 +1,189 @@
+use pin_project::pin_project;
+use std::{
+    future::Future,
+    iter::{IntoIterator, Iterator},
+    pin::Pin,
+    task::{Context, Poll},
+};
+use tokio::time::{sleep_until, Duration, Instant, Sleep};
+
+#[pin_project(project = RetryStateProj)]
+enum RetryState<A>
+where
+    A: Action,
+{
+    Running(#[pin] A::Future),
+    Sleeping(#[pin] Sleep),
+}
+
+impl<A: Action> RetryState<A> {
+    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> RetryFuturePoll<A> {
+        match self.project() {
+            RetryStateProj::Running(future) => RetryFuturePoll::Running(future.poll(cx)),
+            RetryStateProj::Sleeping(future) => RetryFuturePoll::Sleeping(future.poll(cx)),
+        }
+    }
+}
+
+enum RetryFuturePoll<A>
+where
+    A: Action,
+{
+    Running(Poll<Result<A::Item, A::Error>>),
+    Sleeping(Poll<()>),
+}
+
+/// Future that drives multiple attempts at an action via a retry strategy.
+#[pin_project]
+pub struct Retry<I, A>
+where
+    I: Iterator<Item = Duration>,
+    A: Action,
+{
+    #[pin]
+    retry_if: RetryIf<I, A, fn(&A::Error) -> bool>,
+}
+
+impl<I, A> Retry<I, A>
+where
+    I: Iterator<Item = Duration>,
+    A: Action,
+{
+    pub fn spawn<T: IntoIterator<IntoIter = I, Item = Duration>>(strategy: T, action: A) -> Retry<I, A> {
+        Retry {
+            retry_if: RetryIf::spawn(strategy, action, (|_| true) as fn(&A::Error) -> bool),
+        }
+    }
+}
+
+impl<I, A> Future for Retry<I, A>
+where
+    I: Iterator<Item = Duration>,
+    A: Action,
+{
+    type Output = Result<A::Item, A::Error>;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+        let this = self.project();
+        this.retry_if.poll(cx)
+    }
+}
+
+/// Future that drives multiple attempts at an action via a retry strategy.
+/// Retries are only attempted if the `Error` returned by the future satisfies a
+/// given condition.
+#[pin_project]
+pub struct RetryIf<I, A, C>
+where
+    I: Iterator<Item = Duration>,
+    A: Action,
+    C: Condition<A::Error>,
+{
+    strategy: I,
+    #[pin]
+    state: RetryState<A>,
+    action: A,
+    condition: C,
+}
+
+impl<I, A, C> RetryIf<I, A, C>
+where
+    I: Iterator<Item = Duration>,
+    A: Action,
+    C: Condition<A::Error>,
+{
+    pub fn spawn<T: IntoIterator<IntoIter = I, Item = Duration>>(
+        strategy: T,
+        mut action: A,
+        condition: C,
+    ) -> RetryIf<I, A, C> {
+        RetryIf {
+            strategy: strategy.into_iter(),
+            state: RetryState::Running(action.run()),
+            action,
+            condition,
+        }
+    }
+
+    fn attempt(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<A::Item, A::Error>> {
+        let future = {
+            let this = self.as_mut().project();
+            this.action.run()
+        };
+        self.as_mut().project().state.set(RetryState::Running(future));
+        self.poll(cx)
+    }
+
+    fn retry(
+        mut self: Pin<&mut Self>,
+        err: A::Error,
+        cx: &mut Context,
+    ) -> Result<Poll<Result<A::Item, A::Error>>, A::Error> {
+        match self.as_mut().project().strategy.next() {
+            None => Err(err),
+            Some(duration) => {
+                let deadline = Instant::now() + duration;
+                let future = sleep_until(deadline);
+                self.as_mut().project().state.set(RetryState::Sleeping(future));
+                Ok(self.poll(cx))
+            },
+        }
+    }
+}
+
+impl<I, A, C> Future for RetryIf<I, A, C>
+where
+    I: Iterator<Item = Duration>,
+    A: Action,
+    C: Condition<A::Error>,
+{
+    type Output = Result<A::Item, A::Error>;
+
+    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+        match self.as_mut().project().state.poll(cx) {
+            RetryFuturePoll::Running(poll_result) => match poll_result {
+                Poll::Ready(Ok(ok)) => Poll::Ready(Ok(ok)),
+                Poll::Pending => Poll::Pending,
+                Poll::Ready(Err(err)) => {
+                    if self.as_mut().project().condition.should_retry(&err) {
+                        match self.retry(err, cx) {
+                            Ok(poll) => poll,
+                            Err(err) => Poll::Ready(Err(err)),
+                        }
+                    } else {
+                        Poll::Ready(Err(err))
+                    }
+                },
+            },
+            RetryFuturePoll::Sleeping(poll_result) => match poll_result {
+                Poll::Pending => Poll::Pending,
+                Poll::Ready(_) => self.attempt(cx),
+            },
+        }
+    }
+}
+
+/// An action can be run multiple times and produces a future.
+pub trait Action {
+    type Future: Future<Output = Result<Self::Item, Self::Error>>;
+    type Item;
+    type Error;
+
+    fn run(&mut self) -> Self::Future;
+}
+
+impl<R, E, T: Future<Output = Result<R, E>>, F: FnMut() -> T> Action for F {
+    type Future = T;
+    type Item = R;
+    type Error = E;
+
+    fn run(&mut self) -> Self::Future { self() }
+}
+
+pub trait Condition<E> {
+    fn should_retry(&mut self, error: &E) -> bool;
+}
+
+impl<E, F: FnMut(&E) -> bool> Condition<E> for F {
+    fn should_retry(&mut self, error: &E) -> bool { self(error) }
+}

+ 5 - 0
rust-lib/flowy-infra/src/retry/mod.rs

@@ -0,0 +1,5 @@
+mod future;
+mod strategy;
+
+pub use future::*;
+pub use strategy::*;

+ 127 - 0
rust-lib/flowy-infra/src/retry/strategy/exponential_backoff.rs

@@ -0,0 +1,127 @@
+use std::{iter::Iterator, time::Duration};
+/// A retry strategy driven by exponential back-off.
+///
+/// The power corresponds to the number of past attempts.
+#[derive(Debug, Clone)]
+pub struct ExponentialBackoff {
+    current: u64,
+    base: u64,
+    factor: u64,
+    max_delay: Option<Duration>,
+}
+
+impl ExponentialBackoff {
+    /// Constructs a new exponential back-off strategy,
+    /// given a base duration in milliseconds.
+    ///
+    /// The resulting duration is calculated by taking the base to the `n`-th
+    /// power, where `n` denotes the number of past attempts.
+    pub fn from_millis(base: u64) -> ExponentialBackoff {
+        ExponentialBackoff {
+            current: base,
+            base,
+            factor: 1u64,
+            max_delay: None,
+        }
+    }
+
+    /// A multiplicative factor that will be applied to the retry delay.
+    ///
+    /// For example, using a factor of `1000` will make each delay in units of
+    /// seconds.
+    ///
+    /// Default factor is `1`.
+    pub fn factor(mut self, factor: u64) -> ExponentialBackoff {
+        self.factor = factor;
+        self
+    }
+
+    /// Apply a maximum delay. No retry delay will be longer than this
+    /// `Duration`.
+    pub fn max_delay(mut self, duration: Duration) -> ExponentialBackoff {
+        self.max_delay = Some(duration);
+        self
+    }
+}
+
+impl Iterator for ExponentialBackoff {
+    type Item = Duration;
+
+    fn next(&mut self) -> Option<Duration> {
+        // set delay duration by applying factor
+        let duration = if let Some(duration) = self.current.checked_mul(self.factor) {
+            Duration::from_millis(duration)
+        } else {
+            Duration::from_millis(u64::MAX)
+        };
+
+        // check if we reached max delay
+        if let Some(ref max_delay) = self.max_delay {
+            if duration > *max_delay {
+                return Some(*max_delay);
+            }
+        }
+
+        if let Some(next) = self.current.checked_mul(self.base) {
+            self.current = next;
+        } else {
+            self.current = u64::MAX;
+        }
+
+        Some(duration)
+    }
+}
+
+#[test]
+fn returns_some_exponential_base_10() {
+    let mut s = ExponentialBackoff::from_millis(10);
+
+    assert_eq!(s.next(), Some(Duration::from_millis(10)));
+    assert_eq!(s.next(), Some(Duration::from_millis(100)));
+    assert_eq!(s.next(), Some(Duration::from_millis(1000)));
+}
+
+#[test]
+fn returns_some_exponential_base_2() {
+    let mut s = ExponentialBackoff::from_millis(2);
+
+    assert_eq!(s.next(), Some(Duration::from_millis(2)));
+    assert_eq!(s.next(), Some(Duration::from_millis(4)));
+    assert_eq!(s.next(), Some(Duration::from_millis(8)));
+}
+
+#[test]
+fn saturates_at_maximum_value() {
+    let mut s = ExponentialBackoff::from_millis(u64::MAX - 1);
+
+    assert_eq!(s.next(), Some(Duration::from_millis(u64::MAX - 1)));
+    assert_eq!(s.next(), Some(Duration::from_millis(u64::MAX)));
+    assert_eq!(s.next(), Some(Duration::from_millis(u64::MAX)));
+}
+
+#[test]
+fn can_use_factor_to_get_seconds() {
+    let factor = 1000;
+    let mut s = ExponentialBackoff::from_millis(2).factor(factor);
+
+    assert_eq!(s.next(), Some(Duration::from_secs(2)));
+    assert_eq!(s.next(), Some(Duration::from_secs(4)));
+    assert_eq!(s.next(), Some(Duration::from_secs(8)));
+}
+
+#[test]
+fn stops_increasing_at_max_delay() {
+    let mut s = ExponentialBackoff::from_millis(2).max_delay(Duration::from_millis(4));
+
+    assert_eq!(s.next(), Some(Duration::from_millis(2)));
+    assert_eq!(s.next(), Some(Duration::from_millis(4)));
+    assert_eq!(s.next(), Some(Duration::from_millis(4)));
+}
+
+#[test]
+fn returns_max_when_max_less_than_base() {
+    let mut s = ExponentialBackoff::from_millis(20).max_delay(Duration::from_millis(10));
+
+    assert_eq!(s.next(), Some(Duration::from_millis(10)));
+    assert_eq!(s.next(), Some(Duration::from_millis(10)));
+}

+ 35 - 0
rust-lib/flowy-infra/src/retry/strategy/fixed_interval.rs

@@ -0,0 +1,35 @@
+use std::{iter::Iterator, time::Duration};
+
+/// A retry strategy driven by a fixed interval.
+#[derive(Debug, Clone)]
+pub struct FixedInterval {
+    duration: Duration,
+}
+
+impl FixedInterval {
+    /// Constructs a new fixed interval strategy.
+    pub fn new(duration: Duration) -> FixedInterval { FixedInterval { duration } }
+
+    /// Constructs a new fixed interval strategy,
+    /// given a duration in milliseconds.
+    pub fn from_millis(millis: u64) -> FixedInterval {
+        FixedInterval {
+            duration: Duration::from_millis(millis),
+        }
+    }
+}
+
+impl Iterator for FixedInterval {
+    type Item = Duration;
+
+    fn next(&mut self) -> Option<Duration> { Some(self.duration) }
+}
+
+#[test]
+fn returns_some_fixed() {
+    let mut s = FixedInterval::new(Duration::from_millis(123));
+
+    assert_eq!(s.next(), Some(Duration::from_millis(123)));
+    assert_eq!(s.next(), Some(Duration::from_millis(123)));
+    assert_eq!(s.next(), Some(Duration::from_millis(123)));
+}

+ 5 - 0
rust-lib/flowy-infra/src/retry/strategy/jitter.rs

@@ -0,0 +1,5 @@
+use std::time::Duration;
+
+pub fn jitter(duration: Duration) -> Duration {
+    duration.mul_f64(rand::random::<f64>())
+}

+ 7 - 0
rust-lib/flowy-infra/src/retry/strategy/mod.rs

@@ -0,0 +1,7 @@
+mod exponential_backoff;
+mod fixed_interval;
+mod jitter;
+
+pub use exponential_backoff::*;
+pub use fixed_interval::*;
+pub use jitter::*;

+ 1 - 0
rust-lib/flowy-ws/Cargo.toml

@@ -8,6 +8,7 @@ edition = "2018"
 [dependencies]
 flowy-derive = { path = "../flowy-derive" }
 flowy-net = { path = "../flowy-net" }
+flowy-infra = { path = "../flowy-infra" }
 
 tokio-tungstenite = "0.15"
 futures-util = "0.3.17"

+ 2 - 2
rust-lib/flowy-ws/src/ws.rs

@@ -96,7 +96,7 @@ impl WsController {
                 Ok(stream) => {
                     let _ = state_notify.send(WsState::Connected(sender));
                     let _ = ret.send(Ok(()));
-                    spawn_steam_and_handlers(stream, handlers, state_notify).await;
+                    spawn_stream_and_handlers(stream, handlers, state_notify).await;
                 },
                 Err(e) => {
                     let _ = state_notify.send(WsState::Disconnected(e.clone()));
@@ -128,7 +128,7 @@ impl WsController {
     }
 }
 
-async fn spawn_steam_and_handlers(
+async fn spawn_stream_and_handlers(
     stream: WsStream,
     handlers: WsHandlerFuture,
     state_notify: Arc<broadcast::Sender<WsState>>,