script.rs 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. use bytes::Bytes;
  2. use flowy_error::{internal_error, FlowyError, FlowyResult};
  3. use flowy_revision::{
  4. RevisionManager, RevisionMergeable, RevisionObjectDeserializer, RevisionPersistence,
  5. RevisionPersistenceConfiguration, RevisionSnapshotData, RevisionSnapshotPersistence,
  6. REVISION_WRITE_INTERVAL_IN_MILLIS,
  7. };
  8. use flowy_revision_persistence::{RevisionChangeset, RevisionDiskCache, SyncRecord};
  9. use lib_infra::util::md5;
  10. use nanoid::nanoid;
  11. use parking_lot::RwLock;
  12. use revision_model::{Revision, RevisionRange};
  13. use serde::{Deserialize, Serialize};
  14. use std::sync::Arc;
  15. use std::time::Duration;
  16. pub enum RevisionScript {
  17. AddLocalRevision { content: String },
  18. AddLocalRevision2 { content: String },
  19. AddInvalidLocalRevision { bytes: Vec<u8> },
  20. AckRevision { rev_id: i64 },
  21. AssertNextSyncRevisionId { rev_id: Option<i64> },
  22. AssertNumberOfSyncRevisions { num: usize },
  23. AssertNumberOfRevisionsInDisk { num: usize },
  24. AssertNextSyncRevisionContent { expected: String },
  25. WaitWhenWriteToDisk,
  26. }
  27. pub struct RevisionTest {
  28. user_id: String,
  29. object_id: String,
  30. configuration: RevisionPersistenceConfiguration,
  31. rev_manager: Arc<RevisionManager<RevisionConnectionMock>>,
  32. }
  33. impl RevisionTest {
  34. pub async fn new() -> Self {
  35. Self::new_with_configuration(2).await
  36. }
  37. pub async fn new_with_configuration(max_merge_len: i64) -> Self {
  38. let user_id = nanoid!(10);
  39. let object_id = nanoid!(6);
  40. let configuration = RevisionPersistenceConfiguration::new(max_merge_len as usize, false);
  41. let disk_cache = RevisionDiskCacheMock::new(vec![]);
  42. let persistence =
  43. RevisionPersistence::new(&user_id, &object_id, disk_cache, configuration.clone());
  44. let compress = RevisionMergeableMock {};
  45. let snapshot = RevisionSnapshotMock {};
  46. let mut rev_manager =
  47. RevisionManager::new(&user_id, &object_id, persistence, compress, snapshot);
  48. rev_manager
  49. .initialize::<RevisionObjectMockSerde>(None)
  50. .await
  51. .unwrap();
  52. Self {
  53. user_id,
  54. object_id,
  55. configuration,
  56. rev_manager: Arc::new(rev_manager),
  57. }
  58. }
  59. pub async fn new_with_other(old_test: RevisionTest) -> Self {
  60. let records = old_test.rev_manager.get_all_revision_records().unwrap();
  61. let disk_cache = RevisionDiskCacheMock::new(records);
  62. let configuration = old_test.configuration;
  63. let persistence = RevisionPersistence::new(
  64. &old_test.user_id,
  65. &old_test.object_id,
  66. disk_cache,
  67. configuration.clone(),
  68. );
  69. let compress = RevisionMergeableMock {};
  70. let snapshot = RevisionSnapshotMock {};
  71. let mut rev_manager = RevisionManager::new(
  72. &old_test.user_id,
  73. &old_test.object_id,
  74. persistence,
  75. compress,
  76. snapshot,
  77. );
  78. rev_manager
  79. .initialize::<RevisionObjectMockSerde>(None)
  80. .await
  81. .unwrap();
  82. Self {
  83. user_id: old_test.user_id,
  84. object_id: old_test.object_id,
  85. configuration,
  86. rev_manager: Arc::new(rev_manager),
  87. }
  88. }
  89. pub async fn run_scripts(&self, scripts: Vec<RevisionScript>) {
  90. for script in scripts {
  91. self.run_script(script).await;
  92. }
  93. }
  94. pub async fn run_script(&self, script: RevisionScript) {
  95. match script {
  96. RevisionScript::AddLocalRevision { content } => {
  97. let object = RevisionObjectMock::new(&content);
  98. let bytes = object.to_bytes();
  99. let md5 = md5(&bytes);
  100. self
  101. .rev_manager
  102. .add_local_revision(Bytes::from(bytes), md5)
  103. .await
  104. .unwrap();
  105. },
  106. RevisionScript::AddLocalRevision2 { content } => {
  107. let object = RevisionObjectMock::new(&content);
  108. let bytes = object.to_bytes();
  109. let md5 = md5(&bytes);
  110. self
  111. .rev_manager
  112. .add_local_revision(Bytes::from(bytes), md5)
  113. .await
  114. .unwrap();
  115. },
  116. RevisionScript::AddInvalidLocalRevision { bytes } => {
  117. let md5 = md5(&bytes);
  118. self
  119. .rev_manager
  120. .add_local_revision(Bytes::from(bytes), md5)
  121. .await
  122. .unwrap();
  123. },
  124. RevisionScript::AckRevision { rev_id } => {
  125. //
  126. self.rev_manager.ack_revision(rev_id).await.unwrap()
  127. },
  128. RevisionScript::AssertNextSyncRevisionId { rev_id } => {
  129. assert_eq!(self.rev_manager.next_sync_rev_id().await, rev_id)
  130. },
  131. RevisionScript::AssertNumberOfSyncRevisions { num } => {
  132. assert_eq!(self.rev_manager.number_of_sync_revisions(), num)
  133. },
  134. RevisionScript::AssertNumberOfRevisionsInDisk { num } => {
  135. assert_eq!(self.rev_manager.number_of_revisions_in_disk(), num)
  136. },
  137. RevisionScript::AssertNextSyncRevisionContent { expected } => {
  138. //
  139. let rev_id = self.rev_manager.next_sync_rev_id().await.unwrap();
  140. let revision = self.rev_manager.get_revision(rev_id).await.unwrap();
  141. let object = RevisionObjectMock::from_bytes(&revision.bytes).unwrap();
  142. assert_eq!(object.content, expected);
  143. },
  144. RevisionScript::WaitWhenWriteToDisk => {
  145. let milliseconds = 2 * REVISION_WRITE_INTERVAL_IN_MILLIS;
  146. tokio::time::sleep(Duration::from_millis(milliseconds)).await;
  147. },
  148. }
  149. }
  150. }
  151. pub struct RevisionDiskCacheMock {
  152. records: RwLock<Vec<SyncRecord>>,
  153. }
  154. impl RevisionDiskCacheMock {
  155. pub fn new(records: Vec<SyncRecord>) -> Self {
  156. Self {
  157. records: RwLock::new(records),
  158. }
  159. }
  160. }
  161. impl RevisionDiskCache<RevisionConnectionMock> for RevisionDiskCacheMock {
  162. type Error = FlowyError;
  163. fn create_revision_records(&self, revision_records: Vec<SyncRecord>) -> Result<(), Self::Error> {
  164. self.records.write().extend(revision_records);
  165. Ok(())
  166. }
  167. fn get_connection(&self) -> Result<RevisionConnectionMock, Self::Error> {
  168. todo!()
  169. }
  170. fn read_revision_records(
  171. &self,
  172. _object_id: &str,
  173. rev_ids: Option<Vec<i64>>,
  174. ) -> Result<Vec<SyncRecord>, Self::Error> {
  175. match rev_ids {
  176. None => Ok(self.records.read().clone()),
  177. Some(rev_ids) => Ok(
  178. self
  179. .records
  180. .read()
  181. .iter()
  182. .filter(|record| rev_ids.contains(&record.revision.rev_id))
  183. .cloned()
  184. .collect::<Vec<SyncRecord>>(),
  185. ),
  186. }
  187. }
  188. fn read_revision_records_with_range(
  189. &self,
  190. _object_id: &str,
  191. range: &RevisionRange,
  192. ) -> Result<Vec<SyncRecord>, Self::Error> {
  193. let read_guard = self.records.read();
  194. let records = range
  195. .iter()
  196. .flat_map(|rev_id| {
  197. read_guard
  198. .iter()
  199. .find(|record| record.revision.rev_id == rev_id)
  200. .cloned()
  201. })
  202. .collect::<Vec<SyncRecord>>();
  203. Ok(records)
  204. }
  205. fn update_revision_record(&self, changesets: Vec<RevisionChangeset>) -> FlowyResult<()> {
  206. for changeset in changesets {
  207. if let Some(record) = self
  208. .records
  209. .write()
  210. .iter_mut()
  211. .find(|record| record.revision.rev_id == changeset.rev_id)
  212. {
  213. record.state = changeset.state;
  214. }
  215. }
  216. Ok(())
  217. }
  218. fn delete_revision_records(
  219. &self,
  220. _object_id: &str,
  221. rev_ids: Option<Vec<i64>>,
  222. ) -> Result<(), Self::Error> {
  223. match rev_ids {
  224. None => {},
  225. Some(rev_ids) => {
  226. for rev_id in rev_ids {
  227. if let Some(index) = self
  228. .records
  229. .read()
  230. .iter()
  231. .position(|record| record.revision.rev_id == rev_id)
  232. {
  233. self.records.write().remove(index);
  234. }
  235. }
  236. },
  237. }
  238. Ok(())
  239. }
  240. fn delete_and_insert_records(
  241. &self,
  242. _object_id: &str,
  243. _deleted_rev_ids: Option<Vec<i64>>,
  244. _inserted_records: Vec<SyncRecord>,
  245. ) -> Result<(), Self::Error> {
  246. todo!()
  247. }
  248. }
  249. pub struct RevisionConnectionMock {}
  250. pub struct RevisionSnapshotMock {}
  251. impl RevisionSnapshotPersistence for RevisionSnapshotMock {
  252. fn write_snapshot(&self, _rev_id: i64, _data: Vec<u8>) -> FlowyResult<()> {
  253. Ok(())
  254. }
  255. fn read_snapshot(&self, _rev_id: i64) -> FlowyResult<Option<RevisionSnapshotData>> {
  256. Ok(None)
  257. }
  258. fn read_last_snapshot(&self) -> FlowyResult<Option<RevisionSnapshotData>> {
  259. Ok(None)
  260. }
  261. }
  262. pub struct RevisionMergeableMock {}
  263. impl RevisionMergeable for RevisionMergeableMock {
  264. fn combine_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
  265. let mut object = RevisionObjectMock::new("");
  266. for revision in revisions {
  267. if let Ok(other) = RevisionObjectMock::from_bytes(&revision.bytes) {
  268. object.compose(other)?;
  269. }
  270. }
  271. Ok(Bytes::from(object.to_bytes()))
  272. }
  273. }
  274. #[derive(Serialize, Deserialize)]
  275. pub struct InvalidRevisionObject {
  276. data: String,
  277. }
  278. impl InvalidRevisionObject {
  279. pub fn new() -> Self {
  280. InvalidRevisionObject {
  281. data: "".to_string(),
  282. }
  283. }
  284. pub(crate) fn to_bytes(&self) -> Vec<u8> {
  285. serde_json::to_vec(self).unwrap()
  286. }
  287. // fn from_bytes(bytes: &[u8]) -> Self {
  288. // serde_json::from_slice(bytes).unwrap()
  289. // }
  290. }
  291. #[derive(Serialize, Deserialize)]
  292. pub struct RevisionObjectMock {
  293. content: String,
  294. }
  295. impl RevisionObjectMock {
  296. pub fn new(s: &str) -> Self {
  297. Self {
  298. content: s.to_owned(),
  299. }
  300. }
  301. pub fn compose(&mut self, other: RevisionObjectMock) -> FlowyResult<()> {
  302. self.content.push_str(other.content.as_str());
  303. Ok(())
  304. }
  305. pub fn to_bytes(&self) -> Vec<u8> {
  306. serde_json::to_vec(self).unwrap()
  307. }
  308. pub fn from_bytes(bytes: &[u8]) -> FlowyResult<Self> {
  309. serde_json::from_slice(bytes).map_err(internal_error)
  310. }
  311. }
  312. pub struct RevisionObjectMockSerde();
  313. impl RevisionObjectDeserializer for RevisionObjectMockSerde {
  314. type Output = RevisionObjectMock;
  315. fn deserialize_revisions(
  316. _object_id: &str,
  317. revisions: Vec<Revision>,
  318. ) -> FlowyResult<Self::Output> {
  319. let mut object = RevisionObjectMock::new("");
  320. if revisions.is_empty() {
  321. return Ok(object);
  322. }
  323. for revision in revisions {
  324. if let Ok(revision_object) = RevisionObjectMock::from_bytes(&revision.bytes) {
  325. object.compose(revision_object)?;
  326. }
  327. }
  328. Ok(object)
  329. }
  330. fn recover_from_revisions(_revisions: Vec<Revision>) -> Option<(Self::Output, i64)> {
  331. None
  332. }
  333. }