script.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. use bytes::Bytes;
  2. use flowy_error::{internal_error, FlowyError, FlowyResult};
  3. use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, SyncRecord};
  4. use flowy_revision::{
  5. RevisionManager, RevisionMergeable, RevisionObjectDeserializer, RevisionPersistence,
  6. RevisionPersistenceConfiguration, RevisionSnapshotDiskCache, RevisionSnapshotInfo,
  7. REVISION_WRITE_INTERVAL_IN_MILLIS,
  8. };
  9. use flowy_http_model::revision::{Revision, RevisionRange};
  10. use flowy_http_model::util::md5;
  11. use nanoid::nanoid;
  12. use parking_lot::RwLock;
  13. use serde::{Deserialize, Serialize};
  14. use std::sync::Arc;
  15. use std::time::Duration;
  16. pub enum RevisionScript {
  17. AddLocalRevision {
  18. content: String,
  19. base_rev_id: i64,
  20. rev_id: i64,
  21. },
  22. AddLocalRevision2 {
  23. content: String,
  24. pair_rev_id: (i64, i64),
  25. },
  26. AddInvalidLocalRevision {
  27. bytes: Vec<u8>,
  28. base_rev_id: i64,
  29. rev_id: i64,
  30. },
  31. AckRevision {
  32. rev_id: i64,
  33. },
  34. AssertNextSyncRevisionId {
  35. rev_id: Option<i64>,
  36. },
  37. AssertNumberOfSyncRevisions {
  38. num: usize,
  39. },
  40. AssertNumberOfRevisionsInDisk {
  41. num: usize,
  42. },
  43. AssertNextSyncRevisionContent {
  44. expected: String,
  45. },
  46. WaitWhenWriteToDisk,
  47. }
  48. pub struct RevisionTest {
  49. user_id: String,
  50. object_id: String,
  51. configuration: RevisionPersistenceConfiguration,
  52. rev_manager: Arc<RevisionManager<RevisionConnectionMock>>,
  53. }
  54. impl RevisionTest {
  55. pub async fn new() -> Self {
  56. Self::new_with_configuration(2).await
  57. }
  58. pub async fn new_with_configuration(merge_threshold: i64) -> Self {
  59. let user_id = nanoid!(10);
  60. let object_id = nanoid!(6);
  61. let configuration = RevisionPersistenceConfiguration::new(merge_threshold as usize, false);
  62. let disk_cache = RevisionDiskCacheMock::new(vec![]);
  63. let persistence = RevisionPersistence::new(&user_id, &object_id, disk_cache, configuration.clone());
  64. let compress = RevisionCompressMock {};
  65. let snapshot = RevisionSnapshotMock {};
  66. let mut rev_manager = RevisionManager::new(&user_id, &object_id, persistence, compress, snapshot);
  67. rev_manager.initialize::<RevisionObjectMockSerde>(None).await.unwrap();
  68. Self {
  69. user_id,
  70. object_id,
  71. configuration,
  72. rev_manager: Arc::new(rev_manager),
  73. }
  74. }
  75. pub async fn new_with_other(old_test: RevisionTest) -> Self {
  76. let records = old_test.rev_manager.get_all_revision_records().unwrap();
  77. let disk_cache = RevisionDiskCacheMock::new(records);
  78. let configuration = old_test.configuration;
  79. let persistence = RevisionPersistence::new(
  80. &old_test.user_id,
  81. &old_test.object_id,
  82. disk_cache,
  83. configuration.clone(),
  84. );
  85. let compress = RevisionCompressMock {};
  86. let snapshot = RevisionSnapshotMock {};
  87. let mut rev_manager =
  88. RevisionManager::new(&old_test.user_id, &old_test.object_id, persistence, compress, snapshot);
  89. rev_manager.initialize::<RevisionObjectMockSerde>(None).await.unwrap();
  90. Self {
  91. user_id: old_test.user_id,
  92. object_id: old_test.object_id,
  93. configuration,
  94. rev_manager: Arc::new(rev_manager),
  95. }
  96. }
  97. pub async fn run_scripts(&self, scripts: Vec<RevisionScript>) {
  98. for script in scripts {
  99. self.run_script(script).await;
  100. }
  101. }
  102. pub fn next_rev_id_pair(&self) -> (i64, i64) {
  103. self.rev_manager.next_rev_id_pair()
  104. }
  105. pub async fn run_script(&self, script: RevisionScript) {
  106. match script {
  107. RevisionScript::AddLocalRevision {
  108. content,
  109. base_rev_id,
  110. rev_id,
  111. } => {
  112. let object = RevisionObjectMock::new(&content);
  113. let bytes = object.to_bytes();
  114. let md5 = md5(&bytes);
  115. let revision = Revision::new(
  116. &self.rev_manager.object_id,
  117. base_rev_id,
  118. rev_id,
  119. Bytes::from(bytes),
  120. md5,
  121. );
  122. self.rev_manager.add_local_revision(&revision).await.unwrap();
  123. }
  124. RevisionScript::AddLocalRevision2 { content, pair_rev_id } => {
  125. let object = RevisionObjectMock::new(&content);
  126. let bytes = object.to_bytes();
  127. let md5 = md5(&bytes);
  128. let revision = Revision::new(
  129. &self.rev_manager.object_id,
  130. pair_rev_id.0,
  131. pair_rev_id.1,
  132. Bytes::from(bytes),
  133. md5,
  134. );
  135. self.rev_manager.add_local_revision(&revision).await.unwrap();
  136. }
  137. RevisionScript::AddInvalidLocalRevision {
  138. bytes,
  139. base_rev_id,
  140. rev_id,
  141. } => {
  142. let md5 = md5(&bytes);
  143. let revision = Revision::new(
  144. &self.rev_manager.object_id,
  145. base_rev_id,
  146. rev_id,
  147. Bytes::from(bytes),
  148. md5,
  149. );
  150. self.rev_manager.add_local_revision(&revision).await.unwrap();
  151. }
  152. RevisionScript::AckRevision { rev_id } => {
  153. //
  154. self.rev_manager.ack_revision(rev_id).await.unwrap()
  155. }
  156. RevisionScript::AssertNextSyncRevisionId { rev_id } => {
  157. assert_eq!(self.rev_manager.next_sync_rev_id().await, rev_id)
  158. }
  159. RevisionScript::AssertNumberOfSyncRevisions { num } => {
  160. assert_eq!(self.rev_manager.number_of_sync_revisions(), num)
  161. }
  162. RevisionScript::AssertNumberOfRevisionsInDisk { num } => {
  163. assert_eq!(self.rev_manager.number_of_revisions_in_disk(), num)
  164. }
  165. RevisionScript::AssertNextSyncRevisionContent { expected } => {
  166. //
  167. let rev_id = self.rev_manager.next_sync_rev_id().await.unwrap();
  168. let revision = self.rev_manager.get_revision(rev_id).await.unwrap();
  169. let object = RevisionObjectMock::from_bytes(&revision.bytes).unwrap();
  170. assert_eq!(object.content, expected);
  171. }
  172. RevisionScript::WaitWhenWriteToDisk => {
  173. let milliseconds = 2 * REVISION_WRITE_INTERVAL_IN_MILLIS;
  174. tokio::time::sleep(Duration::from_millis(milliseconds)).await;
  175. }
  176. }
  177. }
  178. }
  179. pub struct RevisionDiskCacheMock {
  180. records: RwLock<Vec<SyncRecord>>,
  181. }
  182. impl RevisionDiskCacheMock {
  183. pub fn new(records: Vec<SyncRecord>) -> Self {
  184. Self {
  185. records: RwLock::new(records),
  186. }
  187. }
  188. }
  189. impl RevisionDiskCache<RevisionConnectionMock> for RevisionDiskCacheMock {
  190. type Error = FlowyError;
  191. fn create_revision_records(&self, revision_records: Vec<SyncRecord>) -> Result<(), Self::Error> {
  192. self.records.write().extend(revision_records);
  193. Ok(())
  194. }
  195. fn get_connection(&self) -> Result<RevisionConnectionMock, Self::Error> {
  196. todo!()
  197. }
  198. fn read_revision_records(
  199. &self,
  200. _object_id: &str,
  201. rev_ids: Option<Vec<i64>>,
  202. ) -> Result<Vec<SyncRecord>, Self::Error> {
  203. match rev_ids {
  204. None => Ok(self.records.read().clone()),
  205. Some(rev_ids) => Ok(self
  206. .records
  207. .read()
  208. .iter()
  209. .filter(|record| rev_ids.contains(&record.revision.rev_id))
  210. .cloned()
  211. .collect::<Vec<SyncRecord>>()),
  212. }
  213. }
  214. fn read_revision_records_with_range(
  215. &self,
  216. _object_id: &str,
  217. range: &RevisionRange,
  218. ) -> Result<Vec<SyncRecord>, Self::Error> {
  219. let read_guard = self.records.read();
  220. let records = range
  221. .iter()
  222. .flat_map(|rev_id| {
  223. read_guard
  224. .iter()
  225. .find(|record| record.revision.rev_id == rev_id)
  226. .cloned()
  227. })
  228. .collect::<Vec<SyncRecord>>();
  229. Ok(records)
  230. }
  231. fn update_revision_record(&self, changesets: Vec<RevisionChangeset>) -> FlowyResult<()> {
  232. for changeset in changesets {
  233. if let Some(record) = self
  234. .records
  235. .write()
  236. .iter_mut()
  237. .find(|record| record.revision.rev_id == *changeset.rev_id.as_ref())
  238. {
  239. record.state = changeset.state;
  240. }
  241. }
  242. Ok(())
  243. }
  244. fn delete_revision_records(&self, _object_id: &str, rev_ids: Option<Vec<i64>>) -> Result<(), Self::Error> {
  245. match rev_ids {
  246. None => {}
  247. Some(rev_ids) => {
  248. for rev_id in rev_ids {
  249. if let Some(index) = self
  250. .records
  251. .read()
  252. .iter()
  253. .position(|record| record.revision.rev_id == rev_id)
  254. {
  255. self.records.write().remove(index);
  256. }
  257. }
  258. }
  259. }
  260. Ok(())
  261. }
  262. fn delete_and_insert_records(
  263. &self,
  264. _object_id: &str,
  265. _deleted_rev_ids: Option<Vec<i64>>,
  266. _inserted_records: Vec<SyncRecord>,
  267. ) -> Result<(), Self::Error> {
  268. todo!()
  269. }
  270. }
  271. pub struct RevisionConnectionMock {}
  272. pub struct RevisionSnapshotMock {}
  273. impl RevisionSnapshotDiskCache for RevisionSnapshotMock {
  274. fn write_snapshot(&self, _object_id: &str, _rev_id: i64, _data: Vec<u8>) -> FlowyResult<()> {
  275. todo!()
  276. }
  277. fn read_snapshot(&self, _object_id: &str, _rev_id: i64) -> FlowyResult<RevisionSnapshotInfo> {
  278. todo!()
  279. }
  280. }
  281. pub struct RevisionCompressMock {}
  282. impl RevisionMergeable for RevisionCompressMock {
  283. fn combine_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
  284. let mut object = RevisionObjectMock::new("");
  285. for revision in revisions {
  286. if let Ok(other) = RevisionObjectMock::from_bytes(&revision.bytes) {
  287. let _ = object.compose(other)?;
  288. }
  289. }
  290. Ok(Bytes::from(object.to_bytes()))
  291. }
  292. }
  293. #[derive(Serialize, Deserialize)]
  294. pub struct InvalidRevisionObject {
  295. data: String,
  296. }
  297. impl InvalidRevisionObject {
  298. pub fn new() -> Self {
  299. InvalidRevisionObject { data: "".to_string() }
  300. }
  301. pub(crate) fn to_bytes(&self) -> Vec<u8> {
  302. serde_json::to_vec(self).unwrap()
  303. }
  304. // fn from_bytes(bytes: &[u8]) -> Self {
  305. // serde_json::from_slice(bytes).unwrap()
  306. // }
  307. }
  308. #[derive(Serialize, Deserialize)]
  309. pub struct RevisionObjectMock {
  310. content: String,
  311. }
  312. impl RevisionObjectMock {
  313. pub fn new(s: &str) -> Self {
  314. Self { content: s.to_owned() }
  315. }
  316. pub fn compose(&mut self, other: RevisionObjectMock) -> FlowyResult<()> {
  317. self.content.push_str(other.content.as_str());
  318. Ok(())
  319. }
  320. pub fn to_bytes(&self) -> Vec<u8> {
  321. serde_json::to_vec(self).unwrap()
  322. }
  323. pub fn from_bytes(bytes: &[u8]) -> FlowyResult<Self> {
  324. serde_json::from_slice(bytes).map_err(internal_error)
  325. }
  326. }
  327. pub struct RevisionObjectMockSerde();
  328. impl RevisionObjectDeserializer for RevisionObjectMockSerde {
  329. type Output = RevisionObjectMock;
  330. fn deserialize_revisions(_object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
  331. let mut object = RevisionObjectMock::new("");
  332. if revisions.is_empty() {
  333. return Ok(object);
  334. }
  335. for revision in revisions {
  336. if let Ok(revision_object) = RevisionObjectMock::from_bytes(&revision.bytes) {
  337. let _ = object.compose(revision_object)?;
  338. }
  339. }
  340. Ok(object)
  341. }
  342. }