model.rs 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. use crate::{
  2. entities::doc::{Revision, RevisionRange},
  3. errors::{internal_error, DocError, DocResult},
  4. sql_tables::{RevState, RevTableSql},
  5. };
  6. use async_stream::stream;
  7. use flowy_database::ConnectionPool;
  8. use flowy_infra::future::ResultFuture;
  9. use futures::{stream::StreamExt, TryFutureExt};
  10. use std::{sync::Arc, time::Duration};
  11. use tokio::sync::{broadcast, mpsc};
  12. pub type RevIdReceiver = broadcast::Receiver<i64>;
  13. pub type RevIdSender = broadcast::Sender<i64>;
  14. pub struct RevisionContext {
  15. pub revision: Revision,
  16. pub state: RevState,
  17. }
  18. impl RevisionContext {
  19. pub fn new(revision: Revision) -> Self {
  20. Self {
  21. revision,
  22. state: RevState::Local,
  23. }
  24. }
  25. }
  26. pub(crate) struct PendingRevId {
  27. pub rev_id: i64,
  28. pub sender: RevIdSender,
  29. }
  30. impl PendingRevId {
  31. pub(crate) fn new(rev_id: i64, sender: RevIdSender) -> Self { Self { rev_id, sender } }
  32. pub(crate) fn finish(&self, rev_id: i64) -> bool {
  33. if self.rev_id > rev_id {
  34. false
  35. } else {
  36. self.sender.send(self.rev_id);
  37. true
  38. }
  39. }
  40. }
  41. pub(crate) struct Persistence {
  42. pub(crate) rev_sql: Arc<RevTableSql>,
  43. pub(crate) pool: Arc<ConnectionPool>,
  44. }
  45. impl Persistence {
  46. pub(crate) fn new(pool: Arc<ConnectionPool>) -> Self {
  47. let rev_sql = Arc::new(RevTableSql {});
  48. Self { rev_sql, pool }
  49. }
  50. pub(crate) fn create_revs(&self, revisions_state: Vec<(Revision, RevState)>) -> DocResult<()> {
  51. let conn = &*self.pool.get().map_err(internal_error)?;
  52. conn.immediate_transaction::<_, DocError, _>(|| {
  53. let _ = self.rev_sql.create_rev_table(revisions_state, conn)?;
  54. Ok(())
  55. })
  56. }
  57. pub(crate) fn read_rev_with_range(&self, doc_id: &str, range: RevisionRange) -> DocResult<Vec<Revision>> {
  58. let conn = &*self.pool.get().map_err(internal_error).unwrap();
  59. let revisions = self.rev_sql.read_rev_tables_with_range(doc_id, range, conn)?;
  60. Ok(revisions)
  61. }
  62. pub(crate) fn read_rev(&self, doc_id: &str, rev_id: &i64) -> DocResult<Option<Revision>> {
  63. let conn = self.pool.get().map_err(internal_error)?;
  64. let some = self.rev_sql.read_rev_table(&doc_id, rev_id, &*conn)?;
  65. Ok(some)
  66. }
  67. }
  68. pub trait RevisionIterator: Send + Sync {
  69. fn next(&self) -> ResultFuture<Option<Revision>, DocError>;
  70. }
  71. pub(crate) enum PendingMsg {
  72. Revision { ret: RevIdReceiver },
  73. }
  74. pub(crate) type PendingSender = mpsc::UnboundedSender<PendingMsg>;
  75. pub(crate) type PendingReceiver = mpsc::UnboundedReceiver<PendingMsg>;
  76. pub(crate) struct PendingRevisionStream {
  77. revisions: Arc<dyn RevisionIterator>,
  78. receiver: Option<PendingReceiver>,
  79. next_revision: mpsc::Sender<Revision>,
  80. }
  81. impl PendingRevisionStream {
  82. pub(crate) fn new(
  83. revisions: Arc<dyn RevisionIterator>,
  84. pending_rx: PendingReceiver,
  85. next_revision: mpsc::Sender<Revision>,
  86. ) -> Self {
  87. Self {
  88. revisions,
  89. receiver: Some(pending_rx),
  90. next_revision,
  91. }
  92. }
  93. pub async fn run(mut self) {
  94. let mut receiver = self.receiver.take().expect("Should only call once");
  95. let stream = stream! {
  96. loop {
  97. match receiver.recv().await {
  98. Some(msg) => yield msg,
  99. None => break,
  100. }
  101. }
  102. };
  103. stream
  104. .for_each(|msg| async {
  105. match self.handle_msg(msg).await {
  106. Ok(_) => {},
  107. Err(e) => log::error!("{:?}", e),
  108. }
  109. })
  110. .await;
  111. }
  112. async fn handle_msg(&self, msg: PendingMsg) -> DocResult<()> {
  113. match msg {
  114. PendingMsg::Revision { ret } => self.prepare_next_pending_rev(ret).await,
  115. }
  116. }
  117. async fn prepare_next_pending_rev(&self, mut ret: RevIdReceiver) -> DocResult<()> {
  118. match self.revisions.next().await? {
  119. None => Ok(()),
  120. Some(revision) => {
  121. self.next_revision.send(revision).await.map_err(internal_error);
  122. let _ = tokio::time::timeout(Duration::from_millis(2000), ret.recv()).await;
  123. Ok(())
  124. },
  125. }
  126. }
  127. }