rev_manager.rs 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. use crate::{
  2. entities::doc::{RevType, Revision, RevisionRange},
  3. errors::DocError,
  4. services::{
  5. doc::rev_manager::store::{Store, StoreMsg},
  6. util::RevIdCounter,
  7. ws::WsDocumentSender,
  8. },
  9. };
  10. use flowy_database::ConnectionPool;
  11. use parking_lot::RwLock;
  12. use std::{collections::VecDeque, sync::Arc};
  13. use tokio::sync::{mpsc, oneshot};
  14. pub struct RevisionManager {
  15. doc_id: String,
  16. rev_id_counter: RevIdCounter,
  17. ws_sender: Arc<dyn WsDocumentSender>,
  18. store_sender: mpsc::Sender<StoreMsg>,
  19. pending_revs: RwLock<VecDeque<Revision>>,
  20. }
  21. // tokio::time::timeout
  22. impl RevisionManager {
  23. pub fn new(doc_id: &str, rev_id: i64, pool: Arc<ConnectionPool>, ws_sender: Arc<dyn WsDocumentSender>) -> Self {
  24. let (sender, receiver) = mpsc::channel::<StoreMsg>(50);
  25. let store = Store::new(doc_id, pool, receiver);
  26. tokio::spawn(store.run());
  27. let doc_id = doc_id.to_string();
  28. let rev_id_counter = RevIdCounter::new(rev_id);
  29. let pending_revs = RwLock::new(VecDeque::new());
  30. Self {
  31. doc_id,
  32. rev_id_counter,
  33. ws_sender,
  34. pending_revs,
  35. store_sender: sender,
  36. }
  37. }
  38. pub fn push_compose_revision(&self, revision: Revision) { self.pending_revs.write().push_front(revision); }
  39. pub fn next_compose_revision(&self) -> Option<Revision> { self.pending_revs.write().pop_front() }
  40. #[tracing::instrument(level = "debug", skip(self, revision))]
  41. pub async fn add_revision(&self, revision: Revision) -> Result<(), DocError> {
  42. let msg = StoreMsg::Revision {
  43. revision: revision.clone(),
  44. };
  45. let _ = self.store_sender.send(msg).await;
  46. match revision.ty {
  47. RevType::Local => match self.ws_sender.send(revision.into()) {
  48. Ok(_) => {},
  49. Err(e) => log::error!("Send delta failed: {:?}", e),
  50. },
  51. RevType::Remote => {
  52. self.pending_revs.write().push_back(revision);
  53. },
  54. }
  55. Ok(())
  56. }
  57. pub fn ack_rev(&self, rev_id: i64) -> Result<(), DocError> {
  58. let sender = self.store_sender.clone();
  59. tokio::spawn(async move {
  60. let _ = sender.send(StoreMsg::AckRevision { rev_id }).await;
  61. });
  62. Ok(())
  63. }
  64. pub fn rev_id(&self) -> i64 { self.rev_id_counter.value() }
  65. pub fn next_rev_id(&self) -> (i64, i64) {
  66. let cur = self.rev_id_counter.value();
  67. let next = self.rev_id_counter.next();
  68. (cur, next)
  69. }
  70. pub fn send_revisions(&self, range: RevisionRange) -> Result<(), DocError> {
  71. debug_assert!(&range.doc_id == &self.doc_id);
  72. let (ret, _rx) = oneshot::channel();
  73. let sender = self.store_sender.clone();
  74. tokio::spawn(async move {
  75. let _ = sender.send(StoreMsg::SendRevisions { range, ret }).await;
  76. });
  77. unimplemented!()
  78. }
  79. }