123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300 |
- use crate::{
- entities::doc::{RevId, Revision, RevisionRange},
- errors::{internal_error, DocError, DocResult},
- services::doc::revision::{model::RevisionOperation, DocRevision, RevisionServer},
- sql_tables::{RevState, RevTableSql},
- };
- use async_stream::stream;
- use dashmap::DashMap;
- use flowy_database::ConnectionPool;
- use flowy_ot::core::{Attributes, Delta, OperationTransformable};
- use futures::{stream::StreamExt, TryFutureExt};
- use std::{sync::Arc, time::Duration};
- use tokio::{
- sync::{mpsc, oneshot, RwLock},
- task::{spawn_blocking, JoinHandle},
- };
- pub enum RevisionCmd {
- Revision {
- revision: Revision,
- ret: oneshot::Sender<DocResult<()>>,
- },
- AckRevision {
- rev_id: RevId,
- },
- SendRevisions {
- range: RevisionRange,
- ret: oneshot::Sender<DocResult<Vec<Revision>>>,
- },
- DocumentDelta {
- ret: oneshot::Sender<DocResult<DocRevision>>,
- },
- }
- pub struct RevisionStoreActor {
- doc_id: String,
- persistence: Arc<Persistence>,
- revs: Arc<DashMap<i64, RevisionOperation>>,
- delay_save: RwLock<Option<JoinHandle<()>>>,
- receiver: Option<mpsc::Receiver<RevisionCmd>>,
- server: Arc<dyn RevisionServer>,
- }
- impl RevisionStoreActor {
- pub fn new(
- doc_id: &str,
- pool: Arc<ConnectionPool>,
- receiver: mpsc::Receiver<RevisionCmd>,
- server: Arc<dyn RevisionServer>,
- ) -> RevisionStoreActor {
- let persistence = Arc::new(Persistence::new(pool));
- let revs = Arc::new(DashMap::new());
- let doc_id = doc_id.to_owned();
- Self {
- doc_id,
- persistence,
- revs,
- delay_save: RwLock::new(None),
- receiver: Some(receiver),
- server,
- }
- }
- pub async fn run(mut self) {
- let mut receiver = self.receiver.take().expect("Should only call once");
- let stream = stream! {
- loop {
- match receiver.recv().await {
- Some(msg) => yield msg,
- None => break,
- }
- }
- };
- stream.for_each(|msg| self.handle_message(msg)).await;
- }
- async fn handle_message(&self, cmd: RevisionCmd) {
- match cmd {
- RevisionCmd::Revision { revision, ret } => {
- let result = self.handle_new_revision(revision).await;
- let _ = ret.send(result);
- },
- RevisionCmd::AckRevision { rev_id } => {
- self.handle_revision_acked(rev_id).await;
- },
- RevisionCmd::SendRevisions { range, ret } => {
- let result = revs_in_range(&self.doc_id, self.persistence.clone(), range).await;
- let _ = ret.send(result);
- },
- RevisionCmd::DocumentDelta { ret } => {
- let delta = fetch_document(&self.doc_id, self.server.clone(), self.persistence.clone()).await;
- let _ = ret.send(delta);
- },
- }
- }
- async fn handle_new_revision(&self, revision: Revision) -> DocResult<()> {
- if self.revs.contains_key(&revision.rev_id) {
- return Err(DocError::duplicate_rev().context(format!("Duplicate revision id: {}", revision.rev_id)));
- }
- let mut operation = RevisionOperation::new(&revision);
- let _receiver = operation.receiver();
- self.revs.insert(revision.rev_id, operation);
- self.save_revisions().await;
- Ok(())
- }
- async fn handle_revision_acked(&self, rev_id: RevId) {
- match self.revs.get_mut(rev_id.as_ref()) {
- None => {},
- Some(mut rev) => rev.value_mut().finish(),
- }
- self.save_revisions().await;
- }
- async fn save_revisions(&self) {
- if let Some(handler) = self.delay_save.write().await.take() {
- handler.abort();
- }
- if self.revs.is_empty() {
- return;
- }
- let revs = self.revs.clone();
- let persistence = self.persistence.clone();
- *self.delay_save.write().await = Some(tokio::spawn(async move {
- tokio::time::sleep(Duration::from_millis(300)).await;
- let ids = revs.iter().map(|kv| kv.key().clone()).collect::<Vec<i64>>();
- let revisions = revs
- .iter()
- .map(|kv| ((*kv.value()).clone(), kv.state))
- .collect::<Vec<(Revision, RevState)>>();
- // TODO: Ok to unwrap?
- let conn = &*persistence.pool.get().map_err(internal_error).unwrap();
- let result = conn.immediate_transaction::<_, DocError, _>(|| {
- let _ = persistence.rev_sql.create_rev_table(revisions, conn).unwrap();
- Ok(())
- });
- match result {
- Ok(_) => revs.retain(|k, _| !ids.contains(k)),
- Err(e) => log::error!("Save revision failed: {:?}", e),
- }
- }));
- }
- }
- async fn fetch_document(
- doc_id: &str,
- server: Arc<dyn RevisionServer>,
- persistence: Arc<Persistence>,
- ) -> DocResult<DocRevision> {
- let fetch_from_remote = server.fetch_document_from_remote(doc_id).or_else(|result| {
- log::error!(
- "Fetch document delta from remote failed: {:?}, try to fetch from local",
- result
- );
- fetch_from_local(doc_id, persistence.clone())
- });
- let fetch_from_local = fetch_from_local(doc_id, persistence.clone()).or_else(|result| async move {
- log::error!(
- "Fetch document delta from local failed: {:?}, try to fetch from remote",
- result
- );
- server.fetch_document_from_remote(doc_id).await
- });
- tokio::select! {
- result = fetch_from_remote => {
- log::debug!("Finish fetching document from remote");
- result
- },
- result = fetch_from_local => {
- log::debug!("Finish fetching document from local");
- result
- },
- }
- }
- async fn fetch_from_local(doc_id: &str, persistence: Arc<Persistence>) -> DocResult<DocRevision> {
- let doc_id = doc_id.to_owned();
- spawn_blocking(move || {
- // tokio::time::timeout
- let conn = &*persistence.pool.get().map_err(internal_error)?;
- let revisions = persistence.rev_sql.read_rev_tables(&doc_id, None, conn)?;
- if revisions.is_empty() {
- return Err(DocError::not_found());
- }
- let rev_id: RevId = revisions.last().unwrap().rev_id.into();
- let mut delta = Delta::new();
- for revision in revisions {
- match Delta::from_bytes(revision.delta_data) {
- Ok(local_delta) => {
- delta = delta.compose(&local_delta)?;
- },
- Err(e) => {
- log::error!("Deserialize delta from revision failed: {}", e);
- },
- }
- }
- delta.insert("\n", Attributes::default());
- Result::<DocRevision, DocError>::Ok(DocRevision { rev_id, delta })
- })
- .await
- .map_err(internal_error)?
- }
- async fn revs_in_range(doc_id: &str, persistence: Arc<Persistence>, range: RevisionRange) -> DocResult<Vec<Revision>> {
- let doc_id = doc_id.to_owned();
- let result = spawn_blocking(move || {
- let conn = &*persistence.pool.get().map_err(internal_error)?;
- let revisions = persistence.rev_sql.read_rev_tables_with_range(&doc_id, range, conn)?;
- Ok(revisions)
- })
- .await
- .map_err(internal_error)?;
- result
- }
- struct Persistence {
- rev_sql: Arc<RevTableSql>,
- pool: Arc<ConnectionPool>,
- }
- impl Persistence {
- fn new(pool: Arc<ConnectionPool>) -> Self {
- let rev_sql = Arc::new(RevTableSql {});
- Self { rev_sql, pool }
- }
- }
- // fn update_revisions(&self) {
- // let rev_ids = self
- // .revs
- // .iter()
- // .flat_map(|kv| match kv.state == RevState::Acked {
- // true => None,
- // false => Some(kv.key().clone()),
- // })
- // .collect::<Vec<i64>>();
- //
- // if rev_ids.is_empty() {
- // return;
- // }
- //
- // log::debug!("Try to update {:?} state", rev_ids);
- // match self.update(&rev_ids) {
- // Ok(_) => {
- // self.revs.retain(|k, _| !rev_ids.contains(k));
- // },
- // Err(e) => log::error!("Save revision failed: {:?}", e),
- // }
- // }
- //
- // fn update(&self, rev_ids: &Vec<i64>) -> Result<(), DocError> {
- // let conn = &*self.pool.get().map_err(internal_error).unwrap();
- // let result = conn.immediate_transaction::<_, DocError, _>(|| {
- // for rev_id in rev_ids {
- // let changeset = RevChangeset {
- // doc_id: self.doc_id.clone(),
- // rev_id: rev_id.clone(),
- // state: RevState::Acked,
- // };
- // let _ = self.op_sql.update_rev_table(changeset, conn)?;
- // }
- // Ok(())
- // });
- //
- // result
- // }
- // fn delete_revision(&self, rev_id: RevId) {
- // let op_sql = self.op_sql.clone();
- // let pool = self.pool.clone();
- // let doc_id = self.doc_id.clone();
- // tokio::spawn(async move {
- // let conn = &*pool.get().map_err(internal_error).unwrap();
- // let result = conn.immediate_transaction::<_, DocError, _>(|| {
- // let _ = op_sql.delete_rev_table(&doc_id, rev_id, conn)?;
- // Ok(())
- // });
- //
- // match result {
- // Ok(_) => {},
- // Err(e) => log::error!("Delete revision failed: {:?}", e),
- // }
- // });
- // }
|