manager.rs 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. use crate::{
  2. entities::doc::{RevId, RevType, Revision, RevisionRange},
  3. errors::{DocError, DocResult},
  4. services::{
  5. doc::revision::{
  6. actor::{RevisionCmd, RevisionStoreActor},
  7. util::NotifyOpenDocAction,
  8. },
  9. util::RevIdCounter,
  10. ws::WsDocumentSender,
  11. },
  12. };
  13. use flowy_infra::{
  14. future::ResultFuture,
  15. retry::{ExponentialBackoff, Retry},
  16. };
  17. use flowy_ot::core::Delta;
  18. use parking_lot::RwLock;
  19. use std::{collections::VecDeque, sync::Arc};
  20. use tokio::sync::{mpsc, oneshot};
  21. pub struct DocRevision {
  22. pub rev_id: RevId,
  23. pub delta: Delta,
  24. }
  25. pub trait RevisionServer: Send + Sync {
  26. fn fetch_document_from_remote(&self, doc_id: &str) -> ResultFuture<DocRevision, DocError>;
  27. }
  28. pub struct RevisionManager {
  29. doc_id: String,
  30. user_id: String,
  31. rev_id_counter: RevIdCounter,
  32. ws: Arc<dyn WsDocumentSender>,
  33. rev_store: mpsc::Sender<RevisionCmd>,
  34. pending_revs: RwLock<VecDeque<Revision>>,
  35. }
  36. impl RevisionManager {
  37. pub fn new(
  38. doc_id: &str,
  39. user_id: &str,
  40. rev_id: RevId,
  41. ws: Arc<dyn WsDocumentSender>,
  42. rev_store: mpsc::Sender<RevisionCmd>,
  43. ) -> Self {
  44. notify_open_doc(&ws, user_id, doc_id, &rev_id);
  45. let rev_id_counter = RevIdCounter::new(rev_id.into());
  46. let pending_revs = RwLock::new(VecDeque::new());
  47. Self {
  48. doc_id: doc_id.to_string(),
  49. user_id: user_id.to_string(),
  50. rev_id_counter,
  51. ws,
  52. pending_revs,
  53. rev_store,
  54. }
  55. }
  56. pub fn push_compose_revision(&self, revision: Revision) { self.pending_revs.write().push_front(revision); }
  57. pub fn next_compose_revision(&self) -> Option<Revision> { self.pending_revs.write().pop_front() }
  58. #[tracing::instrument(level = "debug", skip(self))]
  59. pub async fn add_revision(&self, revision: Revision) -> Result<(), DocError> {
  60. let cmd = RevisionCmd::Revision {
  61. revision: revision.clone(),
  62. };
  63. let _ = self.rev_store.send(cmd).await;
  64. match revision.ty {
  65. RevType::Local => match self.ws.send(revision.into()) {
  66. Ok(_) => {},
  67. Err(e) => log::error!("Send delta failed: {:?}", e),
  68. },
  69. RevType::Remote => {
  70. self.pending_revs.write().push_back(revision);
  71. },
  72. }
  73. Ok(())
  74. }
  75. pub fn ack_rev(&self, rev_id: RevId) -> Result<(), DocError> {
  76. let sender = self.rev_store.clone();
  77. tokio::spawn(async move {
  78. let _ = sender.send(RevisionCmd::AckRevision { rev_id }).await;
  79. });
  80. Ok(())
  81. }
  82. pub fn rev_id(&self) -> i64 { self.rev_id_counter.value() }
  83. pub fn next_rev_id(&self) -> (i64, i64) {
  84. let cur = self.rev_id_counter.value();
  85. let next = self.rev_id_counter.next();
  86. (cur, next)
  87. }
  88. pub fn send_revisions(&self, range: RevisionRange) -> Result<(), DocError> {
  89. debug_assert!(&range.doc_id == &self.doc_id);
  90. let (ret, _rx) = oneshot::channel();
  91. let sender = self.rev_store.clone();
  92. tokio::spawn(async move {
  93. let _ = sender.send(RevisionCmd::SendRevisions { range, ret }).await;
  94. });
  95. unimplemented!()
  96. }
  97. }
  98. // FIXME:
  99. // user_id may be invalid if the user switch to another account while
  100. // theNotifyOpenDocAction is flying
  101. fn notify_open_doc(ws: &Arc<dyn WsDocumentSender>, user_id: &str, doc_id: &str, rev_id: &RevId) {
  102. let action = NotifyOpenDocAction::new(user_id, doc_id, rev_id, ws);
  103. let strategy = ExponentialBackoff::from_millis(50).take(3);
  104. let retry = Retry::spawn(strategy, action);
  105. tokio::spawn(async move {
  106. match retry.await {
  107. Ok(_) => {},
  108. Err(e) => log::error!("Notify open doc failed: {}", e),
  109. }
  110. });
  111. }