manager.rs 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. use crate::{
  2. entities::doc::{Doc, RevId, RevType, Revision, RevisionRange},
  3. errors::{DocError, DocResult},
  4. services::{doc::revision::RevisionStore, util::RevIdCounter},
  5. };
  6. use flowy_database::ConnectionPool;
  7. use flowy_infra::future::ResultFuture;
  8. use flowy_ot::core::{Delta, OperationTransformable};
  9. use std::sync::Arc;
  10. use tokio::sync::mpsc;
  11. pub struct DocRevision {
  12. pub base_rev_id: RevId,
  13. pub rev_id: RevId,
  14. pub delta: Delta,
  15. }
  16. pub trait RevisionServer: Send + Sync {
  17. fn fetch_document_from_remote(&self, doc_id: &str) -> ResultFuture<Doc, DocError>;
  18. }
  19. pub struct RevisionManager {
  20. doc_id: String,
  21. rev_id_counter: RevIdCounter,
  22. rev_store: Arc<RevisionStore>,
  23. }
  24. impl RevisionManager {
  25. pub fn new(
  26. doc_id: &str,
  27. pool: Arc<ConnectionPool>,
  28. server: Arc<dyn RevisionServer>,
  29. pending_rev_sender: mpsc::Sender<Revision>,
  30. ) -> Self {
  31. let rev_store = RevisionStore::new(doc_id, pool, server, pending_rev_sender);
  32. let rev_id_counter = RevIdCounter::new(0);
  33. Self {
  34. doc_id: doc_id.to_string(),
  35. rev_id_counter,
  36. rev_store,
  37. }
  38. }
  39. pub async fn load_document(&mut self) -> DocResult<Delta> {
  40. let doc = self.rev_store.fetch_document().await?;
  41. self.set_rev_id(doc.rev_id);
  42. Ok(doc.delta()?)
  43. }
  44. pub async fn add_revision(&self, revision: &Revision) -> Result<(), DocError> {
  45. let _ = self.rev_store.handle_new_revision(revision.clone()).await?;
  46. Ok(())
  47. }
  48. pub fn ack_rev(&self, rev_id: RevId) -> Result<(), DocError> {
  49. let rev_store = self.rev_store.clone();
  50. tokio::spawn(async move {
  51. rev_store.handle_revision_acked(rev_id).await;
  52. });
  53. Ok(())
  54. }
  55. pub fn rev_id(&self) -> i64 { self.rev_id_counter.value() }
  56. pub fn next_rev_id(&self) -> (i64, i64) {
  57. let cur = self.rev_id_counter.value();
  58. let next = self.rev_id_counter.next();
  59. (cur, next)
  60. }
  61. pub fn set_rev_id(&self, rev_id: i64) { self.rev_id_counter.set(rev_id); }
  62. pub async fn construct_revisions(&self, range: RevisionRange) -> Result<Revision, DocError> {
  63. debug_assert!(&range.doc_id == &self.doc_id);
  64. let revisions = self.rev_store.revs_in_range(range.clone()).await?;
  65. let mut new_delta = Delta::new();
  66. for revision in revisions {
  67. match Delta::from_bytes(revision.delta_data) {
  68. Ok(delta) => {
  69. new_delta = new_delta.compose(&delta)?;
  70. },
  71. Err(_) => {},
  72. }
  73. }
  74. let delta_data = new_delta.to_bytes();
  75. let revision = Revision::new(
  76. range.start,
  77. range.end,
  78. delta_data.to_vec(),
  79. &self.doc_id,
  80. RevType::Remote,
  81. );
  82. Ok(revision)
  83. }
  84. }