|
@@ -1,17 +1,15 @@
|
|
|
use crate::{
|
|
|
entities::doc::{revision_from_doc, Doc, RevId, RevType, Revision, RevisionRange},
|
|
|
errors::{internal_error, DocError, DocResult},
|
|
|
- services::doc::revision::{
|
|
|
- model::{RevisionIterator, *},
|
|
|
- RevisionServer,
|
|
|
- },
|
|
|
+ services::doc::revision::{model::*, RevisionServer},
|
|
|
sql_tables::RevState,
|
|
|
};
|
|
|
-
|
|
|
+use async_stream::stream;
|
|
|
use dashmap::DashMap;
|
|
|
use flowy_database::ConnectionPool;
|
|
|
use flowy_infra::future::ResultFuture;
|
|
|
use flowy_ot::core::{Delta, OperationTransformable};
|
|
|
+use futures::stream::StreamExt;
|
|
|
use std::{collections::VecDeque, sync::Arc, time::Duration};
|
|
|
use tokio::{
|
|
|
sync::{broadcast, mpsc, RwLock},
|
|
@@ -33,7 +31,7 @@ impl RevisionStore {
|
|
|
doc_id: &str,
|
|
|
pool: Arc<ConnectionPool>,
|
|
|
server: Arc<dyn RevisionServer>,
|
|
|
- next_revision: mpsc::UnboundedSender<Revision>,
|
|
|
+ ws_revision_sender: mpsc::UnboundedSender<Revision>,
|
|
|
) -> Arc<RevisionStore> {
|
|
|
let doc_id = doc_id.to_owned();
|
|
|
let persistence = Arc::new(Persistence::new(pool));
|
|
@@ -51,7 +49,7 @@ impl RevisionStore {
|
|
|
server,
|
|
|
});
|
|
|
|
|
|
- tokio::spawn(PendingRevisionStream::new(store.clone(), pending_rx, next_revision).run());
|
|
|
+ tokio::spawn(RevisionStream::new(store.clone(), pending_rx, ws_revision_sender).run());
|
|
|
|
|
|
store
|
|
|
}
|
|
@@ -265,3 +263,67 @@ async fn fetch_from_local(doc_id: &str, persistence: Arc<Persistence>) -> DocRes
|
|
|
// }
|
|
|
// });
|
|
|
// }
|
|
|
+
|
|
|
+pub(crate) enum PendingMsg {
|
|
|
+ Revision { ret: RevIdReceiver },
|
|
|
+}
|
|
|
+
|
|
|
+pub(crate) type PendingSender = mpsc::UnboundedSender<PendingMsg>;
|
|
|
+pub(crate) type PendingReceiver = mpsc::UnboundedReceiver<PendingMsg>;
|
|
|
+
|
|
|
+pub(crate) struct RevisionStream {
|
|
|
+ revisions: Arc<dyn RevisionIterator>,
|
|
|
+ receiver: Option<PendingReceiver>,
|
|
|
+ ws_revision_sender: mpsc::UnboundedSender<Revision>,
|
|
|
+}
|
|
|
+
|
|
|
+impl RevisionStream {
|
|
|
+ pub(crate) fn new(
|
|
|
+ revisions: Arc<dyn RevisionIterator>,
|
|
|
+ pending_rx: PendingReceiver,
|
|
|
+ ws_revision_sender: mpsc::UnboundedSender<Revision>,
|
|
|
+ ) -> Self {
|
|
|
+ Self {
|
|
|
+ revisions,
|
|
|
+ receiver: Some(pending_rx),
|
|
|
+ ws_revision_sender,
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ pub async fn run(mut self) {
|
|
|
+ let mut receiver = self.receiver.take().expect("Should only call once");
|
|
|
+ let stream = stream! {
|
|
|
+ loop {
|
|
|
+ match receiver.recv().await {
|
|
|
+ Some(msg) => yield msg,
|
|
|
+ None => break,
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+ stream
|
|
|
+ .for_each(|msg| async {
|
|
|
+ match self.handle_msg(msg).await {
|
|
|
+ Ok(_) => {},
|
|
|
+ Err(e) => log::error!("{:?}", e),
|
|
|
+ }
|
|
|
+ })
|
|
|
+ .await;
|
|
|
+ }
|
|
|
+
|
|
|
+ async fn handle_msg(&self, msg: PendingMsg) -> DocResult<()> {
|
|
|
+ match msg {
|
|
|
+ PendingMsg::Revision { ret } => self.prepare_next_pending_rev(ret).await,
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ async fn prepare_next_pending_rev(&self, mut ret: RevIdReceiver) -> DocResult<()> {
|
|
|
+ match self.revisions.next().await? {
|
|
|
+ None => Ok(()),
|
|
|
+ Some(revision) => {
|
|
|
+ let _ = self.ws_revision_sender.send(revision).map_err(internal_error);
|
|
|
+ let _ = tokio::time::timeout(Duration::from_millis(2000), ret.recv()).await;
|
|
|
+ Ok(())
|
|
|
+ },
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|