rev_manager.rs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. use crate::rev_queue::{RevCommand, RevCommandSender, RevQueue};
  2. use crate::{
  3. RevisionPersistence, RevisionSnapshot, RevisionSnapshotController, RevisionSnapshotDiskCache,
  4. WSDataProviderDataSource,
  5. };
  6. use bytes::Bytes;
  7. use flowy_error::{internal_error, FlowyError, FlowyResult};
  8. use flowy_http_model::revision::{Revision, RevisionRange};
  9. use flowy_http_model::util::md5;
  10. use lib_infra::future::FutureResult;
  11. use std::sync::atomic::AtomicI64;
  12. use std::sync::atomic::Ordering::SeqCst;
  13. use std::sync::Arc;
  14. use tokio::sync::{mpsc, oneshot};
  15. pub trait RevisionCloudService: Send + Sync {
  16. /// Read the object's revision from remote
  17. /// Returns a list of revisions that used to build the object
  18. /// # Arguments
  19. ///
  20. /// * `user_id`: the id of the user
  21. /// * `object_id`: the id of the object
  22. ///
  23. fn fetch_object(&self, user_id: &str, object_id: &str) -> FutureResult<Vec<Revision>, FlowyError>;
  24. }
  25. pub trait RevisionObjectDeserializer: Send + Sync {
  26. type Output;
  27. /// Deserialize the list of revisions into an concrete object type.
  28. ///
  29. /// # Arguments
  30. ///
  31. /// * `object_id`: the id of the object
  32. /// * `revisions`: a list of revisions that represent the object
  33. ///
  34. fn deserialize_revisions(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output>;
  35. fn recover_operations_from_revisions(revisions: Vec<Revision>) -> Option<Self::Output>;
  36. }
  37. pub trait RevisionObjectSerializer: Send + Sync {
  38. /// Serialize a list of revisions into one in `Bytes` format
  39. ///
  40. /// * `revisions`: a list of revisions will be serialized to `Bytes`
  41. ///
  42. fn combine_revisions(revisions: Vec<Revision>) -> FlowyResult<Bytes>;
  43. }
  44. /// `RevisionCompress` is used to compress multiple revisions into one revision
  45. ///
  46. pub trait RevisionMergeable: Send + Sync {
  47. fn merge_revisions(&self, _user_id: &str, object_id: &str, mut revisions: Vec<Revision>) -> FlowyResult<Revision> {
  48. if revisions.is_empty() {
  49. return Err(FlowyError::internal().context("Can't compact the empty revisions"));
  50. }
  51. if revisions.len() == 1 {
  52. return Ok(revisions.pop().unwrap());
  53. }
  54. let first_revision = revisions.first().unwrap();
  55. let last_revision = revisions.last().unwrap();
  56. let (base_rev_id, rev_id) = first_revision.pair_rev_id();
  57. let md5 = last_revision.md5.clone();
  58. let bytes = self.combine_revisions(revisions)?;
  59. Ok(Revision::new(object_id, base_rev_id, rev_id, bytes, md5))
  60. }
  61. fn combine_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes>;
  62. }
  63. pub struct RevisionManager<Connection> {
  64. pub object_id: String,
  65. user_id: String,
  66. rev_id_counter: Arc<RevIdCounter>,
  67. rev_persistence: Arc<RevisionPersistence<Connection>>,
  68. rev_snapshot: Arc<RevisionSnapshotController<Connection>>,
  69. rev_compress: Arc<dyn RevisionMergeable>,
  70. #[cfg(feature = "flowy_unit_test")]
  71. rev_ack_notifier: tokio::sync::broadcast::Sender<i64>,
  72. rev_queue: RevCommandSender,
  73. }
  74. impl<Connection: 'static> RevisionManager<Connection> {
  75. pub fn new<SP, C>(
  76. user_id: &str,
  77. object_id: &str,
  78. rev_persistence: RevisionPersistence<Connection>,
  79. rev_compress: C,
  80. snapshot_persistence: SP,
  81. ) -> Self
  82. where
  83. SP: 'static + RevisionSnapshotDiskCache,
  84. C: 'static + RevisionMergeable,
  85. {
  86. let rev_id_counter = Arc::new(RevIdCounter::new(0));
  87. let rev_compress = Arc::new(rev_compress);
  88. let rev_persistence = Arc::new(rev_persistence);
  89. let rev_snapshot = RevisionSnapshotController::new(
  90. user_id,
  91. object_id,
  92. snapshot_persistence,
  93. rev_id_counter.clone(),
  94. rev_persistence.clone(),
  95. rev_compress.clone(),
  96. );
  97. let (rev_queue, receiver) = mpsc::channel(1000);
  98. let queue = RevQueue::new(
  99. object_id.to_owned(),
  100. rev_id_counter.clone(),
  101. rev_persistence.clone(),
  102. rev_compress.clone(),
  103. receiver,
  104. );
  105. tokio::spawn(queue.run());
  106. Self {
  107. object_id: object_id.to_string(),
  108. user_id: user_id.to_owned(),
  109. rev_id_counter,
  110. rev_persistence,
  111. rev_snapshot: Arc::new(rev_snapshot),
  112. rev_compress,
  113. #[cfg(feature = "flowy_unit_test")]
  114. rev_ack_notifier: tokio::sync::broadcast::channel(1).0,
  115. rev_queue,
  116. }
  117. }
  118. #[tracing::instrument(name = "revision_manager_initialize", level = "debug", skip_all, fields(deserializer, object_id, deserialize_revisions) err)]
  119. pub async fn initialize<B>(&mut self, _cloud: Option<Arc<dyn RevisionCloudService>>) -> FlowyResult<B::Output>
  120. where
  121. B: RevisionObjectDeserializer,
  122. {
  123. let revision_records = self.rev_persistence.load_all_records(&self.object_id)?;
  124. tracing::Span::current().record("object_id", self.object_id.as_str());
  125. tracing::Span::current().record("deserializer", std::any::type_name::<B>());
  126. let revisions: Vec<Revision> = revision_records.iter().map(|record| record.revision.clone()).collect();
  127. tracing::Span::current().record("deserialize_revisions", revisions.len());
  128. let current_rev_id = revisions.last().as_ref().map(|revision| revision.rev_id).unwrap_or(0);
  129. match B::deserialize_revisions(&self.object_id, revisions.clone()) {
  130. Ok(object) => {
  131. self.rev_persistence.sync_revision_records(&revision_records).await?;
  132. self.rev_id_counter.set(current_rev_id);
  133. Ok(object)
  134. }
  135. Err(e) => match self.rev_snapshot.restore_from_snapshot::<B>(current_rev_id) {
  136. None => {
  137. tracing::info!("Restore object from validation revisions");
  138. B::recover_operations_from_revisions(revisions).ok_or(e)
  139. }
  140. Some((object, snapshot_rev)) => {
  141. let snapshot_rev_id = snapshot_rev.rev_id;
  142. let _ = self.rev_persistence.reset(vec![snapshot_rev]).await;
  143. // revision_records.retain(|record| record.revision.rev_id <= snapshot_rev_id);
  144. // let _ = self.rev_persistence.sync_revision_records(&revision_records).await?;
  145. self.rev_id_counter.set(snapshot_rev_id);
  146. Ok(object)
  147. }
  148. },
  149. }
  150. }
  151. pub async fn close(&self) {
  152. let _ = self.rev_persistence.compact_lagging_revisions(&self.rev_compress).await;
  153. }
  154. pub async fn generate_snapshot(&self) {
  155. self.rev_snapshot.generate_snapshot().await;
  156. }
  157. pub async fn read_snapshot(&self, rev_id: Option<i64>) -> FlowyResult<Option<RevisionSnapshot>> {
  158. match rev_id {
  159. None => self.rev_snapshot.read_last_snapshot(),
  160. Some(rev_id) => self.rev_snapshot.read_snapshot(rev_id),
  161. }
  162. }
  163. pub async fn load_revisions(&self) -> FlowyResult<Vec<Revision>> {
  164. let revisions = RevisionLoader {
  165. object_id: self.object_id.clone(),
  166. user_id: self.user_id.clone(),
  167. cloud: None,
  168. rev_persistence: self.rev_persistence.clone(),
  169. }
  170. .load_revisions()
  171. .await?;
  172. Ok(revisions)
  173. }
  174. #[tracing::instrument(level = "debug", skip(self, revisions), err)]
  175. pub async fn reset_object(&self, revisions: Vec<Revision>) -> FlowyResult<()> {
  176. let rev_id = pair_rev_id_from_revisions(&revisions).1;
  177. self.rev_persistence.reset(revisions).await?;
  178. self.rev_id_counter.set(rev_id);
  179. Ok(())
  180. }
  181. #[tracing::instrument(level = "debug", skip(self, revision), err)]
  182. pub async fn add_remote_revision(&self, revision: &Revision) -> Result<(), FlowyError> {
  183. if revision.bytes.is_empty() {
  184. return Err(FlowyError::internal().context("Remote revisions is empty"));
  185. }
  186. self.rev_persistence.add_ack_revision(revision).await?;
  187. self.rev_id_counter.set(revision.rev_id);
  188. Ok(())
  189. }
  190. /// Adds the revision that generated by user editing
  191. // #[tracing::instrument(level = "trace", skip_all, err)]
  192. pub async fn add_local_revision(&self, data: Bytes, object_md5: String) -> Result<i64, FlowyError> {
  193. if data.is_empty() {
  194. return Err(FlowyError::internal().context("The data of the revisions is empty"));
  195. }
  196. self.rev_snapshot.generate_snapshot_if_need();
  197. let (ret, rx) = oneshot::channel();
  198. self.rev_queue
  199. .send(RevCommand::RevisionData { data, object_md5, ret })
  200. .await
  201. .map_err(internal_error)?;
  202. rx.await.map_err(internal_error)?
  203. }
  204. #[tracing::instrument(level = "debug", skip(self), err)]
  205. pub async fn ack_revision(&self, rev_id: i64) -> Result<(), FlowyError> {
  206. if self.rev_persistence.ack_revision(rev_id).await.is_ok() {
  207. #[cfg(feature = "flowy_unit_test")]
  208. let _ = self.rev_ack_notifier.send(rev_id);
  209. }
  210. Ok(())
  211. }
  212. /// Returns the current revision id
  213. pub fn rev_id(&self) -> i64 {
  214. self.rev_id_counter.value()
  215. }
  216. pub async fn next_sync_rev_id(&self) -> Option<i64> {
  217. self.rev_persistence.next_sync_rev_id().await
  218. }
  219. pub fn next_rev_id_pair(&self) -> (i64, i64) {
  220. let cur = self.rev_id_counter.value();
  221. let next = self.rev_id_counter.next_id();
  222. (cur, next)
  223. }
  224. pub fn number_of_sync_revisions(&self) -> usize {
  225. self.rev_persistence.number_of_sync_records()
  226. }
  227. pub fn number_of_revisions_in_disk(&self) -> usize {
  228. self.rev_persistence.number_of_records_in_disk()
  229. }
  230. pub async fn get_revisions_in_range(&self, range: RevisionRange) -> Result<Vec<Revision>, FlowyError> {
  231. let revisions = self.rev_persistence.revisions_in_range(&range).await?;
  232. Ok(revisions)
  233. }
  234. pub async fn next_sync_revision(&self) -> FlowyResult<Option<Revision>> {
  235. self.rev_persistence.next_sync_revision().await
  236. }
  237. pub async fn get_revision(&self, rev_id: i64) -> Option<Revision> {
  238. self.rev_persistence.get(rev_id).await.map(|record| record.revision)
  239. }
  240. }
  241. impl<Connection: 'static> WSDataProviderDataSource for Arc<RevisionManager<Connection>> {
  242. fn next_revision(&self) -> FutureResult<Option<Revision>, FlowyError> {
  243. let rev_manager = self.clone();
  244. FutureResult::new(async move { rev_manager.next_sync_revision().await })
  245. }
  246. fn ack_revision(&self, rev_id: i64) -> FutureResult<(), FlowyError> {
  247. let rev_manager = self.clone();
  248. FutureResult::new(async move { (*rev_manager).ack_revision(rev_id).await })
  249. }
  250. fn current_rev_id(&self) -> i64 {
  251. self.rev_id()
  252. }
  253. }
  254. #[cfg(feature = "flowy_unit_test")]
  255. impl<Connection: 'static> RevisionManager<Connection> {
  256. pub async fn revision_cache(&self) -> Arc<RevisionPersistence<Connection>> {
  257. self.rev_persistence.clone()
  258. }
  259. pub fn ack_notify(&self) -> tokio::sync::broadcast::Receiver<i64> {
  260. self.rev_ack_notifier.subscribe()
  261. }
  262. pub fn get_all_revision_records(&self) -> FlowyResult<Vec<flowy_revision_persistence::SyncRecord>> {
  263. self.rev_persistence.load_all_records(&self.object_id)
  264. }
  265. }
  266. pub struct RevisionLoader<Connection> {
  267. pub object_id: String,
  268. pub user_id: String,
  269. pub cloud: Option<Arc<dyn RevisionCloudService>>,
  270. pub rev_persistence: Arc<RevisionPersistence<Connection>>,
  271. }
  272. impl<Connection: 'static> RevisionLoader<Connection> {
  273. pub async fn load_revisions(&self) -> Result<Vec<Revision>, FlowyError> {
  274. let records = self.rev_persistence.load_all_records(&self.object_id)?;
  275. let revisions = records.into_iter().map(|record| record.revision).collect::<_>();
  276. Ok(revisions)
  277. }
  278. }
  279. /// Represents as the md5 of the revision object after applying the
  280. /// revision. For example, RevisionMD5 will be the md5 of the document
  281. /// content.
  282. #[derive(Debug, Clone)]
  283. pub struct RevisionMD5(String);
  284. impl RevisionMD5 {
  285. pub fn from_bytes<T: AsRef<[u8]>>(bytes: T) -> Result<Self, FlowyError> {
  286. Ok(RevisionMD5(md5(bytes)))
  287. }
  288. pub fn into_inner(self) -> String {
  289. self.0
  290. }
  291. pub fn is_equal(&self, s: &str) -> bool {
  292. self.0 == s
  293. }
  294. }
  295. impl std::convert::From<RevisionMD5> for String {
  296. fn from(md5: RevisionMD5) -> Self {
  297. md5.0
  298. }
  299. }
  300. impl std::convert::From<&str> for RevisionMD5 {
  301. fn from(s: &str) -> Self {
  302. Self(s.to_owned())
  303. }
  304. }
  305. impl std::convert::From<String> for RevisionMD5 {
  306. fn from(s: String) -> Self {
  307. Self(s)
  308. }
  309. }
  310. impl std::ops::Deref for RevisionMD5 {
  311. type Target = String;
  312. fn deref(&self) -> &Self::Target {
  313. &self.0
  314. }
  315. }
  316. impl PartialEq<Self> for RevisionMD5 {
  317. fn eq(&self, other: &Self) -> bool {
  318. self.0 == other.0
  319. }
  320. }
  321. impl std::cmp::Eq for RevisionMD5 {}
  322. fn pair_rev_id_from_revisions(revisions: &[Revision]) -> (i64, i64) {
  323. let mut rev_id = 0;
  324. revisions.iter().for_each(|revision| {
  325. if rev_id < revision.rev_id {
  326. rev_id = revision.rev_id;
  327. }
  328. });
  329. if rev_id > 0 {
  330. (rev_id - 1, rev_id)
  331. } else {
  332. (0, rev_id)
  333. }
  334. }
  335. #[derive(Debug)]
  336. pub struct RevIdCounter(pub AtomicI64);
  337. impl RevIdCounter {
  338. pub fn new(n: i64) -> Self {
  339. Self(AtomicI64::new(n))
  340. }
  341. pub fn next_id(&self) -> i64 {
  342. let _ = self.0.fetch_add(1, SeqCst);
  343. self.value()
  344. }
  345. pub fn value(&self) -> i64 {
  346. self.0.load(SeqCst)
  347. }
  348. pub fn set(&self, n: i64) {
  349. let _ = self.0.fetch_update(SeqCst, SeqCst, |_| Some(n));
  350. }
  351. }