rev_history.rs 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. use crate::{RevisionCompactor, RevisionHistory};
  2. use async_stream::stream;
  3. use flowy_error::{FlowyError, FlowyResult};
  4. use futures_util::future::BoxFuture;
  5. use futures_util::stream::StreamExt;
  6. use futures_util::FutureExt;
  7. use revision_model::Revision;
  8. use std::sync::Arc;
  9. use std::time::Duration;
  10. use tokio::sync::{mpsc, RwLock};
  11. use tokio::time::interval;
  12. pub trait RevisionHistoryDiskCache: Send + Sync {
  13. fn write_history(&self, revision: Revision) -> FlowyResult<()>;
  14. fn read_histories(&self) -> FlowyResult<Vec<RevisionHistory>>;
  15. }
  16. pub struct RevisionHistoryManager {
  17. user_id: String,
  18. stop_tx: mpsc::Sender<()>,
  19. config: RevisionHistoryConfig,
  20. revisions: Arc<RwLock<Vec<Revision>>>,
  21. disk_cache: Arc<dyn RevisionHistoryDiskCache>,
  22. }
  23. impl RevisionHistoryManager {
  24. pub fn new(
  25. user_id: &str,
  26. object_id: &str,
  27. config: RevisionHistoryConfig,
  28. disk_cache: Arc<dyn RevisionHistoryDiskCache>,
  29. rev_compactor: Arc<dyn RevisionCompactor>,
  30. ) -> Self {
  31. let revisions = Arc::new(RwLock::new(vec![]));
  32. let stop_tx =
  33. spawn_history_checkpoint_runner(user_id, object_id, &disk_cache, &revisions, rev_compactor, &config);
  34. let user_id = user_id.to_owned();
  35. Self {
  36. user_id,
  37. stop_tx,
  38. config,
  39. revisions,
  40. disk_cache,
  41. }
  42. }
  43. pub async fn add_revision(&self, revision: &Revision) {
  44. self.revisions.write().await.push(revision.clone());
  45. }
  46. pub async fn read_revision_histories(&self) -> FlowyResult<Vec<RevisionHistory>> {
  47. self.disk_cache.read_histories()
  48. }
  49. }
  50. pub struct RevisionHistoryConfig {
  51. check_duration: Duration,
  52. }
  53. impl std::default::Default for RevisionHistoryConfig {
  54. fn default() -> Self {
  55. Self {
  56. check_duration: Duration::from_secs(5),
  57. }
  58. }
  59. }
  60. fn spawn_history_checkpoint_runner(
  61. user_id: &str,
  62. object_id: &str,
  63. disk_cache: &Arc<dyn RevisionHistoryDiskCache>,
  64. revisions: &Arc<RwLock<Vec<Revision>>>,
  65. rev_compactor: Arc<dyn RevisionCompactor>,
  66. config: &RevisionHistoryConfig,
  67. ) -> mpsc::Sender<()> {
  68. let user_id = user_id.to_string();
  69. let object_id = object_id.to_string();
  70. let disk_cache = disk_cache.clone();
  71. let revisions = revisions.clone();
  72. let (checkpoint_tx, checkpoint_rx) = mpsc::channel(1);
  73. let (stop_tx, stop_rx) = mpsc::channel(1);
  74. let checkpoint_sender = FixedDurationCheckpointSender {
  75. user_id,
  76. object_id,
  77. checkpoint_tx,
  78. disk_cache,
  79. revisions,
  80. rev_compactor,
  81. duration: config.check_duration,
  82. };
  83. tokio::spawn(HistoryCheckpointRunner::new(stop_rx, checkpoint_rx).run());
  84. tokio::spawn(checkpoint_sender.run());
  85. stop_tx
  86. }
  87. struct HistoryCheckpointRunner {
  88. stop_rx: Option<mpsc::Receiver<()>>,
  89. checkpoint_rx: Option<mpsc::Receiver<HistoryCheckpoint>>,
  90. }
  91. impl HistoryCheckpointRunner {
  92. fn new(stop_rx: mpsc::Receiver<()>, checkpoint_rx: mpsc::Receiver<HistoryCheckpoint>) -> Self {
  93. Self {
  94. stop_rx: Some(stop_rx),
  95. checkpoint_rx: Some(checkpoint_rx),
  96. }
  97. }
  98. async fn run(mut self) {
  99. let mut stop_rx = self.stop_rx.take().expect("It should only run once");
  100. let mut checkpoint_rx = self.checkpoint_rx.take().expect("It should only run once");
  101. let stream = stream! {
  102. loop {
  103. tokio::select! {
  104. result = checkpoint_rx.recv() => {
  105. match result {
  106. Some(checkpoint) => yield checkpoint,
  107. None => {},
  108. }
  109. },
  110. _ = stop_rx.recv() => {
  111. tracing::trace!("Checkpoint runner exit");
  112. break
  113. },
  114. };
  115. }
  116. };
  117. stream
  118. .for_each(|checkpoint| async move {
  119. checkpoint.write().await;
  120. })
  121. .await;
  122. }
  123. }
  124. struct HistoryCheckpoint {
  125. user_id: String,
  126. object_id: String,
  127. revisions: Vec<Revision>,
  128. disk_cache: Arc<dyn RevisionHistoryDiskCache>,
  129. rev_compactor: Arc<dyn RevisionCompactor>,
  130. }
  131. impl HistoryCheckpoint {
  132. async fn write(self) {
  133. if self.revisions.is_empty() {
  134. return;
  135. }
  136. let result = || {
  137. let revision = self
  138. .rev_compactor
  139. .compact(&self.user_id, &self.object_id, self.revisions)?;
  140. let _ = self.disk_cache.write_history(revision)?;
  141. Ok::<(), FlowyError>(())
  142. };
  143. match result() {
  144. Ok(_) => {}
  145. Err(e) => tracing::error!("Write history checkout failed: {:?}", e),
  146. }
  147. }
  148. }
  149. struct FixedDurationCheckpointSender {
  150. user_id: String,
  151. object_id: String,
  152. checkpoint_tx: mpsc::Sender<HistoryCheckpoint>,
  153. disk_cache: Arc<dyn RevisionHistoryDiskCache>,
  154. revisions: Arc<RwLock<Vec<Revision>>>,
  155. rev_compactor: Arc<dyn RevisionCompactor>,
  156. duration: Duration,
  157. }
  158. impl FixedDurationCheckpointSender {
  159. fn run(self) -> BoxFuture<'static, ()> {
  160. async move {
  161. let mut interval = interval(self.duration);
  162. let checkpoint_revisions: Vec<Revision> = self.revisions.write().await.drain(..).collect();
  163. let checkpoint = HistoryCheckpoint {
  164. user_id: self.user_id.clone(),
  165. object_id: self.object_id.clone(),
  166. revisions: checkpoint_revisions,
  167. disk_cache: self.disk_cache.clone(),
  168. rev_compactor: self.rev_compactor.clone(),
  169. };
  170. match self.checkpoint_tx.send(checkpoint).await {
  171. Ok(_) => {
  172. interval.tick().await;
  173. self.run();
  174. }
  175. Err(_) => {}
  176. }
  177. }
  178. .boxed()
  179. }
  180. }