rev_manager.rs 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. use crate::{
  2. entities::doc::{RevType, Revision},
  3. errors::{internal_error, DocError},
  4. services::{
  5. util::RevIdCounter,
  6. ws::{WsDocumentHandler, WsDocumentSender},
  7. },
  8. sql_tables::{OpTableSql, RevTable},
  9. };
  10. use flowy_database::ConnectionPool;
  11. use parking_lot::RwLock;
  12. use std::{
  13. collections::{BTreeMap, VecDeque},
  14. sync::Arc,
  15. };
  16. use tokio::sync::{futures::Notified, Notify};
  17. pub struct RevisionManager {
  18. doc_id: String,
  19. op_sql: Arc<OpTableSql>,
  20. pool: Arc<ConnectionPool>,
  21. rev_id_counter: RevIdCounter,
  22. ws_sender: Arc<dyn WsDocumentSender>,
  23. local_rev_cache: Arc<RwLock<BTreeMap<i64, Revision>>>,
  24. remote_rev_cache: RwLock<VecDeque<Revision>>,
  25. notify: Notify,
  26. }
  27. impl RevisionManager {
  28. pub fn new(doc_id: &str, rev_id: i64, pool: Arc<ConnectionPool>, ws_sender: Arc<dyn WsDocumentSender>) -> Self {
  29. let op_sql = Arc::new(OpTableSql {});
  30. let rev_id_counter = RevIdCounter::new(rev_id);
  31. let local_rev_cache = Arc::new(RwLock::new(BTreeMap::new()));
  32. let remote_rev_cache = RwLock::new(VecDeque::new());
  33. Self {
  34. doc_id: doc_id.to_owned(),
  35. op_sql,
  36. pool,
  37. rev_id_counter,
  38. ws_sender,
  39. local_rev_cache,
  40. remote_rev_cache,
  41. notify: Notify::new(),
  42. }
  43. }
  44. pub fn next_compose_revision<F>(&self, mut f: F)
  45. where
  46. F: FnMut(&Revision) -> Result<(), DocError>,
  47. {
  48. if let Some(rev) = self.remote_rev_cache.write().pop_front() {
  49. match f(&rev) {
  50. Ok(_) => {},
  51. Err(e) => {
  52. log::error!("{}", e);
  53. self.remote_rev_cache.write().push_front(rev);
  54. },
  55. }
  56. }
  57. }
  58. #[tracing::instrument(level = "debug", skip(self, revision))]
  59. pub fn add_revision(&self, revision: Revision) -> Result<(), DocError> {
  60. match revision.ty {
  61. RevType::Local => {
  62. self.local_rev_cache.write().insert(revision.rev_id, revision.clone());
  63. // self.save_revision(revision.clone());
  64. match self.ws_sender.send(revision.into()) {
  65. Ok(_) => {},
  66. Err(e) => {
  67. log::error!("Send delta failed: {:?}", e);
  68. },
  69. }
  70. },
  71. RevType::Remote => {
  72. self.remote_rev_cache.write().push_back(revision);
  73. self.notify.notify_waiters();
  74. },
  75. }
  76. Ok(())
  77. }
  78. pub fn remove(&self, rev_id: i64) -> Result<(), DocError> {
  79. self.local_rev_cache.write().remove(&rev_id);
  80. // self.delete_revision(rev_id);
  81. Ok(())
  82. }
  83. pub fn rev_notified(&self) -> Notified { self.notify.notified() }
  84. pub fn next_rev_id(&self) -> (i64, i64) {
  85. let cur = self.rev_id_counter.value();
  86. let next = self.rev_id_counter.next();
  87. (cur, next)
  88. }
  89. pub fn rev_id(&self) -> i64 { self.rev_id_counter.value() }
  90. fn save_revision(&self, revision: Revision) {
  91. let op_sql = self.op_sql.clone();
  92. let pool = self.pool.clone();
  93. tokio::spawn(async move {
  94. let conn = &*pool.get().map_err(internal_error).unwrap();
  95. let result = conn.immediate_transaction::<_, DocError, _>(|| {
  96. let op_table: RevTable = revision.into();
  97. let _ = op_sql.create_op_table(op_table, conn).unwrap();
  98. Ok(())
  99. });
  100. match result {
  101. Ok(_) => {},
  102. Err(e) => log::error!("Save revision failed: {:?}", e),
  103. }
  104. });
  105. }
  106. fn delete_revision(&self, rev_id: i64) {
  107. let op_sql = self.op_sql.clone();
  108. let pool = self.pool.clone();
  109. tokio::spawn(async move {
  110. let conn = &*pool.get().map_err(internal_error).unwrap();
  111. let result = conn.immediate_transaction::<_, DocError, _>(|| {
  112. let _ = op_sql.delete_op_table(rev_id, conn)?;
  113. Ok(())
  114. });
  115. match result {
  116. Ok(_) => {},
  117. Err(e) => log::error!("Delete revision failed: {:?}", e),
  118. }
  119. });
  120. }
  121. }