script.rs 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. use bytes::Bytes;
  2. use flowy_error::{internal_error, FlowyError, FlowyResult};
  3. use flowy_revision::{
  4. RevisionManager, RevisionMergeable, RevisionObjectDeserializer, RevisionPersistence,
  5. RevisionPersistenceConfiguration, RevisionSnapshotData, RevisionSnapshotPersistence,
  6. REVISION_WRITE_INTERVAL_IN_MILLIS,
  7. };
  8. use flowy_revision_persistence::{RevisionChangeset, RevisionDiskCache, SyncRecord};
  9. use lib_infra::util::md5;
  10. use nanoid::nanoid;
  11. use parking_lot::RwLock;
  12. use revision_model::{Revision, RevisionRange};
  13. use serde::{Deserialize, Serialize};
  14. use std::sync::Arc;
  15. use std::time::Duration;
  16. pub enum RevisionScript {
  17. AddLocalRevision { content: String },
  18. AddLocalRevision2 { content: String },
  19. AddInvalidLocalRevision { bytes: Vec<u8> },
  20. AckRevision { rev_id: i64 },
  21. AssertNextSyncRevisionId { rev_id: Option<i64> },
  22. AssertNumberOfSyncRevisions { num: usize },
  23. AssertNumberOfRevisionsInDisk { num: usize },
  24. AssertNextSyncRevisionContent { expected: String },
  25. WaitWhenWriteToDisk,
  26. }
  27. pub struct RevisionTest {
  28. object_id: String,
  29. configuration: RevisionPersistenceConfiguration,
  30. rev_manager: Arc<RevisionManager<RevisionConnectionMock>>,
  31. }
  32. impl RevisionTest {
  33. pub async fn new() -> Self {
  34. Self::new_with_configuration(2).await
  35. }
  36. pub async fn new_with_configuration(max_merge_len: i64) -> Self {
  37. let object_id = nanoid!(6);
  38. let configuration = RevisionPersistenceConfiguration::new(max_merge_len as usize, false);
  39. let disk_cache = RevisionDiskCacheMock::new(vec![]);
  40. let persistence = RevisionPersistence::new(&object_id, disk_cache, configuration.clone());
  41. let compress = RevisionMergeableMock {};
  42. let snapshot = RevisionSnapshotMock {};
  43. let mut rev_manager = RevisionManager::new(&object_id, persistence, compress, snapshot);
  44. rev_manager
  45. .initialize::<RevisionObjectMockSerde>(None)
  46. .await
  47. .unwrap();
  48. Self {
  49. object_id,
  50. configuration,
  51. rev_manager: Arc::new(rev_manager),
  52. }
  53. }
  54. pub async fn new_with_other(old_test: RevisionTest) -> Self {
  55. let records = old_test.rev_manager.get_all_revision_records().unwrap();
  56. let disk_cache = RevisionDiskCacheMock::new(records);
  57. let configuration = old_test.configuration;
  58. let persistence =
  59. RevisionPersistence::new(&old_test.object_id, disk_cache, configuration.clone());
  60. let compress = RevisionMergeableMock {};
  61. let snapshot = RevisionSnapshotMock {};
  62. let mut rev_manager =
  63. RevisionManager::new(&old_test.object_id, persistence, compress, snapshot);
  64. rev_manager
  65. .initialize::<RevisionObjectMockSerde>(None)
  66. .await
  67. .unwrap();
  68. Self {
  69. object_id: old_test.object_id,
  70. configuration,
  71. rev_manager: Arc::new(rev_manager),
  72. }
  73. }
  74. pub async fn run_scripts(&self, scripts: Vec<RevisionScript>) {
  75. for script in scripts {
  76. self.run_script(script).await;
  77. }
  78. }
  79. pub async fn run_script(&self, script: RevisionScript) {
  80. match script {
  81. RevisionScript::AddLocalRevision { content } => {
  82. let object = RevisionObjectMock::new(&content);
  83. let bytes = object.to_bytes();
  84. let md5 = md5(&bytes);
  85. self
  86. .rev_manager
  87. .add_local_revision(Bytes::from(bytes), md5)
  88. .await
  89. .unwrap();
  90. },
  91. RevisionScript::AddLocalRevision2 { content } => {
  92. let object = RevisionObjectMock::new(&content);
  93. let bytes = object.to_bytes();
  94. let md5 = md5(&bytes);
  95. self
  96. .rev_manager
  97. .add_local_revision(Bytes::from(bytes), md5)
  98. .await
  99. .unwrap();
  100. },
  101. RevisionScript::AddInvalidLocalRevision { bytes } => {
  102. let md5 = md5(&bytes);
  103. self
  104. .rev_manager
  105. .add_local_revision(Bytes::from(bytes), md5)
  106. .await
  107. .unwrap();
  108. },
  109. RevisionScript::AckRevision { rev_id } => {
  110. //
  111. self.rev_manager.ack_revision(rev_id).await.unwrap()
  112. },
  113. RevisionScript::AssertNextSyncRevisionId { rev_id } => {
  114. assert_eq!(self.rev_manager.next_sync_rev_id().await, rev_id)
  115. },
  116. RevisionScript::AssertNumberOfSyncRevisions { num } => {
  117. assert_eq!(self.rev_manager.number_of_sync_revisions(), num)
  118. },
  119. RevisionScript::AssertNumberOfRevisionsInDisk { num } => {
  120. assert_eq!(self.rev_manager.number_of_revisions_in_disk(), num)
  121. },
  122. RevisionScript::AssertNextSyncRevisionContent { expected } => {
  123. //
  124. let rev_id = self.rev_manager.next_sync_rev_id().await.unwrap();
  125. let revision = self.rev_manager.get_revision(rev_id).await.unwrap();
  126. let object = RevisionObjectMock::from_bytes(&revision.bytes).unwrap();
  127. assert_eq!(object.content, expected);
  128. },
  129. RevisionScript::WaitWhenWriteToDisk => {
  130. let milliseconds = 2 * REVISION_WRITE_INTERVAL_IN_MILLIS;
  131. tokio::time::sleep(Duration::from_millis(milliseconds)).await;
  132. },
  133. }
  134. }
  135. }
  136. pub struct RevisionDiskCacheMock {
  137. records: RwLock<Vec<SyncRecord>>,
  138. }
  139. impl RevisionDiskCacheMock {
  140. pub fn new(records: Vec<SyncRecord>) -> Self {
  141. Self {
  142. records: RwLock::new(records),
  143. }
  144. }
  145. }
  146. impl RevisionDiskCache<RevisionConnectionMock> for RevisionDiskCacheMock {
  147. type Error = FlowyError;
  148. fn create_revision_records(&self, revision_records: Vec<SyncRecord>) -> Result<(), Self::Error> {
  149. self.records.write().extend(revision_records);
  150. Ok(())
  151. }
  152. fn get_connection(&self) -> Result<RevisionConnectionMock, Self::Error> {
  153. todo!()
  154. }
  155. fn read_revision_records(
  156. &self,
  157. _object_id: &str,
  158. rev_ids: Option<Vec<i64>>,
  159. ) -> Result<Vec<SyncRecord>, Self::Error> {
  160. match rev_ids {
  161. None => Ok(self.records.read().clone()),
  162. Some(rev_ids) => Ok(
  163. self
  164. .records
  165. .read()
  166. .iter()
  167. .filter(|record| rev_ids.contains(&record.revision.rev_id))
  168. .cloned()
  169. .collect::<Vec<SyncRecord>>(),
  170. ),
  171. }
  172. }
  173. fn read_revision_records_with_range(
  174. &self,
  175. _object_id: &str,
  176. range: &RevisionRange,
  177. ) -> Result<Vec<SyncRecord>, Self::Error> {
  178. let read_guard = self.records.read();
  179. let records = range
  180. .iter()
  181. .flat_map(|rev_id| {
  182. read_guard
  183. .iter()
  184. .find(|record| record.revision.rev_id == rev_id)
  185. .cloned()
  186. })
  187. .collect::<Vec<SyncRecord>>();
  188. Ok(records)
  189. }
  190. fn update_revision_record(&self, changesets: Vec<RevisionChangeset>) -> FlowyResult<()> {
  191. for changeset in changesets {
  192. if let Some(record) = self
  193. .records
  194. .write()
  195. .iter_mut()
  196. .find(|record| record.revision.rev_id == changeset.rev_id)
  197. {
  198. record.state = changeset.state;
  199. }
  200. }
  201. Ok(())
  202. }
  203. fn delete_revision_records(
  204. &self,
  205. _object_id: &str,
  206. rev_ids: Option<Vec<i64>>,
  207. ) -> Result<(), Self::Error> {
  208. match rev_ids {
  209. None => {},
  210. Some(rev_ids) => {
  211. for rev_id in rev_ids {
  212. if let Some(index) = self
  213. .records
  214. .read()
  215. .iter()
  216. .position(|record| record.revision.rev_id == rev_id)
  217. {
  218. self.records.write().remove(index);
  219. }
  220. }
  221. },
  222. }
  223. Ok(())
  224. }
  225. fn delete_and_insert_records(
  226. &self,
  227. _object_id: &str,
  228. _deleted_rev_ids: Option<Vec<i64>>,
  229. _inserted_records: Vec<SyncRecord>,
  230. ) -> Result<(), Self::Error> {
  231. todo!()
  232. }
  233. }
  234. pub struct RevisionConnectionMock {}
  235. pub struct RevisionSnapshotMock {}
  236. impl RevisionSnapshotPersistence for RevisionSnapshotMock {
  237. fn write_snapshot(&self, _rev_id: i64, _data: Vec<u8>) -> FlowyResult<()> {
  238. Ok(())
  239. }
  240. fn read_snapshot(&self, _rev_id: i64) -> FlowyResult<Option<RevisionSnapshotData>> {
  241. Ok(None)
  242. }
  243. fn read_last_snapshot(&self) -> FlowyResult<Option<RevisionSnapshotData>> {
  244. Ok(None)
  245. }
  246. }
  247. pub struct RevisionMergeableMock {}
  248. impl RevisionMergeable for RevisionMergeableMock {
  249. fn combine_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
  250. let mut object = RevisionObjectMock::new("");
  251. for revision in revisions {
  252. if let Ok(other) = RevisionObjectMock::from_bytes(&revision.bytes) {
  253. object.compose(other)?;
  254. }
  255. }
  256. Ok(Bytes::from(object.to_bytes()))
  257. }
  258. }
  259. #[derive(Serialize, Deserialize)]
  260. pub struct InvalidRevisionObject {
  261. data: String,
  262. }
  263. impl InvalidRevisionObject {
  264. pub fn new() -> Self {
  265. InvalidRevisionObject {
  266. data: "".to_string(),
  267. }
  268. }
  269. pub(crate) fn to_bytes(&self) -> Vec<u8> {
  270. serde_json::to_vec(self).unwrap()
  271. }
  272. // fn from_bytes(bytes: &[u8]) -> Self {
  273. // serde_json::from_slice(bytes).unwrap()
  274. // }
  275. }
  276. #[derive(Serialize, Deserialize)]
  277. pub struct RevisionObjectMock {
  278. content: String,
  279. }
  280. impl RevisionObjectMock {
  281. pub fn new(s: &str) -> Self {
  282. Self {
  283. content: s.to_owned(),
  284. }
  285. }
  286. pub fn compose(&mut self, other: RevisionObjectMock) -> FlowyResult<()> {
  287. self.content.push_str(other.content.as_str());
  288. Ok(())
  289. }
  290. pub fn to_bytes(&self) -> Vec<u8> {
  291. serde_json::to_vec(self).unwrap()
  292. }
  293. pub fn from_bytes(bytes: &[u8]) -> FlowyResult<Self> {
  294. serde_json::from_slice(bytes).map_err(internal_error)
  295. }
  296. }
  297. pub struct RevisionObjectMockSerde();
  298. impl RevisionObjectDeserializer for RevisionObjectMockSerde {
  299. type Output = RevisionObjectMock;
  300. fn deserialize_revisions(
  301. _object_id: &str,
  302. revisions: Vec<Revision>,
  303. ) -> FlowyResult<Self::Output> {
  304. let mut object = RevisionObjectMock::new("");
  305. if revisions.is_empty() {
  306. return Ok(object);
  307. }
  308. for revision in revisions {
  309. if let Ok(revision_object) = RevisionObjectMock::from_bytes(&revision.bytes) {
  310. object.compose(revision_object)?;
  311. }
  312. }
  313. Ok(object)
  314. }
  315. fn recover_from_revisions(_revisions: Vec<Revision>) -> Option<(Self::Output, i64)> {
  316. None
  317. }
  318. }