script.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. use bytes::Bytes;
  2. use flowy_error::{FlowyError, FlowyResult};
  3. use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, SyncRecord};
  4. use flowy_revision::{
  5. RevisionManager, RevisionMergeable, RevisionObjectDeserializer, RevisionPersistence,
  6. RevisionPersistenceConfiguration, RevisionSnapshotDiskCache, RevisionSnapshotInfo,
  7. REVISION_WRITE_INTERVAL_IN_MILLIS,
  8. };
  9. use flowy_sync::entities::document::DocumentPayloadPB;
  10. use flowy_sync::entities::revision::{Revision, RevisionRange};
  11. use flowy_sync::util::{make_operations_from_revisions, md5};
  12. use nanoid::nanoid;
  13. use parking_lot::RwLock;
  14. use serde::{Deserialize, Serialize};
  15. use std::sync::Arc;
  16. use std::time::Duration;
  17. pub enum RevisionScript {
  18. AddLocalRevision {
  19. content: String,
  20. base_rev_id: i64,
  21. rev_id: i64,
  22. },
  23. AddLocalRevision2 {
  24. content: String,
  25. pair_rev_id: (i64, i64),
  26. },
  27. AddInvalidLocalRevision {
  28. bytes: Vec<u8>,
  29. base_rev_id: i64,
  30. rev_id: i64,
  31. },
  32. AckRevision {
  33. rev_id: i64,
  34. },
  35. AssertNextSyncRevisionId {
  36. rev_id: Option<i64>,
  37. },
  38. AssertNumberOfSyncRevisions {
  39. num: usize,
  40. },
  41. AssertNumberOfRevisionsInDisk {
  42. num: usize,
  43. },
  44. AssertNextSyncRevisionContent {
  45. expected: String,
  46. },
  47. WaitWhenWriteToDisk,
  48. }
  49. pub struct RevisionTest {
  50. user_id: String,
  51. object_id: String,
  52. configuration: RevisionPersistenceConfiguration,
  53. rev_manager: Arc<RevisionManager<RevisionConnectionMock>>,
  54. }
  55. impl RevisionTest {
  56. pub async fn new() -> Self {
  57. Self::new_with_configuration(2).await
  58. }
  59. pub async fn new_with_configuration(merge_threshold: i64) -> Self {
  60. let user_id = nanoid!(10);
  61. let object_id = nanoid!(6);
  62. let configuration = RevisionPersistenceConfiguration::new(merge_threshold as usize);
  63. let disk_cache = RevisionDiskCacheMock::new(vec![]);
  64. let persistence = RevisionPersistence::new(&user_id, &object_id, disk_cache, configuration.clone());
  65. let compress = RevisionCompressMock {};
  66. let snapshot = RevisionSnapshotMock {};
  67. let mut rev_manager = RevisionManager::new(&user_id, &object_id, persistence, compress, snapshot);
  68. rev_manager.initialize::<RevisionObjectMockSerde>(None).await.unwrap();
  69. Self {
  70. user_id,
  71. object_id,
  72. configuration,
  73. rev_manager: Arc::new(rev_manager),
  74. }
  75. }
  76. pub async fn new_with_other(old_test: RevisionTest) -> Self {
  77. let records = old_test.rev_manager.get_all_revision_records().unwrap();
  78. let disk_cache = RevisionDiskCacheMock::new(records);
  79. let configuration = old_test.configuration;
  80. let persistence = RevisionPersistence::new(
  81. &old_test.user_id,
  82. &old_test.object_id,
  83. disk_cache,
  84. configuration.clone(),
  85. );
  86. let compress = RevisionCompressMock {};
  87. let snapshot = RevisionSnapshotMock {};
  88. let mut rev_manager =
  89. RevisionManager::new(&old_test.user_id, &old_test.object_id, persistence, compress, snapshot);
  90. rev_manager.initialize::<RevisionObjectMockSerde>(None).await.unwrap();
  91. Self {
  92. user_id: old_test.user_id,
  93. object_id: old_test.object_id,
  94. configuration,
  95. rev_manager: Arc::new(rev_manager),
  96. }
  97. }
  98. pub async fn run_scripts(&self, scripts: Vec<RevisionScript>) {
  99. for script in scripts {
  100. self.run_script(script).await;
  101. }
  102. }
  103. pub fn next_rev_id_pair(&self) -> (i64, i64) {
  104. self.rev_manager.next_rev_id_pair()
  105. }
  106. pub async fn run_script(&self, script: RevisionScript) {
  107. match script {
  108. RevisionScript::AddLocalRevision {
  109. content,
  110. base_rev_id,
  111. rev_id,
  112. } => {
  113. let object = RevisionObjectMock::new(&content);
  114. let bytes = object.to_bytes();
  115. let md5 = md5(&bytes);
  116. let revision = Revision::new(
  117. &self.rev_manager.object_id,
  118. base_rev_id,
  119. rev_id,
  120. Bytes::from(bytes),
  121. md5,
  122. );
  123. self.rev_manager.add_local_revision(&revision).await.unwrap();
  124. }
  125. RevisionScript::AddLocalRevision2 { content, pair_rev_id } => {
  126. let object = RevisionObjectMock::new(&content);
  127. let bytes = object.to_bytes();
  128. let md5 = md5(&bytes);
  129. let revision = Revision::new(
  130. &self.rev_manager.object_id,
  131. pair_rev_id.0,
  132. pair_rev_id.1,
  133. Bytes::from(bytes),
  134. md5,
  135. );
  136. self.rev_manager.add_local_revision(&revision).await.unwrap();
  137. }
  138. RevisionScript::AddInvalidLocalRevision {
  139. bytes,
  140. base_rev_id,
  141. rev_id,
  142. } => {
  143. let md5 = md5(&bytes);
  144. let revision = Revision::new(
  145. &self.rev_manager.object_id,
  146. base_rev_id,
  147. rev_id,
  148. Bytes::from(bytes),
  149. md5,
  150. );
  151. self.rev_manager.add_local_revision(&revision).await.unwrap();
  152. }
  153. RevisionScript::AckRevision { rev_id } => {
  154. //
  155. self.rev_manager.ack_revision(rev_id).await.unwrap()
  156. }
  157. RevisionScript::AssertNextSyncRevisionId { rev_id } => {
  158. assert_eq!(self.rev_manager.next_sync_rev_id().await, rev_id)
  159. }
  160. RevisionScript::AssertNumberOfSyncRevisions { num } => {
  161. assert_eq!(self.rev_manager.number_of_sync_revisions(), num)
  162. }
  163. RevisionScript::AssertNumberOfRevisionsInDisk { num } => {
  164. assert_eq!(self.rev_manager.number_of_revisions_in_disk(), num)
  165. }
  166. RevisionScript::AssertNextSyncRevisionContent { expected } => {
  167. //
  168. let rev_id = self.rev_manager.next_sync_rev_id().await.unwrap();
  169. let revision = self.rev_manager.get_revision(rev_id).await.unwrap();
  170. let object = RevisionObjectMock::from_bytes(&revision.bytes);
  171. assert_eq!(object.content, expected);
  172. }
  173. RevisionScript::WaitWhenWriteToDisk => {
  174. let milliseconds = 2 * REVISION_WRITE_INTERVAL_IN_MILLIS;
  175. tokio::time::sleep(Duration::from_millis(milliseconds)).await;
  176. }
  177. }
  178. }
  179. }
  180. pub struct RevisionDiskCacheMock {
  181. records: RwLock<Vec<SyncRecord>>,
  182. }
  183. impl RevisionDiskCacheMock {
  184. pub fn new(records: Vec<SyncRecord>) -> Self {
  185. Self {
  186. records: RwLock::new(records),
  187. }
  188. }
  189. }
  190. impl RevisionDiskCache<RevisionConnectionMock> for RevisionDiskCacheMock {
  191. type Error = FlowyError;
  192. fn create_revision_records(&self, revision_records: Vec<SyncRecord>) -> Result<(), Self::Error> {
  193. self.records.write().extend(revision_records);
  194. Ok(())
  195. }
  196. fn get_connection(&self) -> Result<RevisionConnectionMock, Self::Error> {
  197. todo!()
  198. }
  199. fn read_revision_records(
  200. &self,
  201. _object_id: &str,
  202. rev_ids: Option<Vec<i64>>,
  203. ) -> Result<Vec<SyncRecord>, Self::Error> {
  204. match rev_ids {
  205. None => Ok(self.records.read().clone()),
  206. Some(rev_ids) => Ok(self
  207. .records
  208. .read()
  209. .iter()
  210. .filter(|record| rev_ids.contains(&record.revision.rev_id))
  211. .cloned()
  212. .collect::<Vec<SyncRecord>>()),
  213. }
  214. }
  215. fn read_revision_records_with_range(
  216. &self,
  217. _object_id: &str,
  218. range: &RevisionRange,
  219. ) -> Result<Vec<SyncRecord>, Self::Error> {
  220. let read_guard = self.records.read();
  221. let records = range
  222. .iter()
  223. .flat_map(|rev_id| {
  224. read_guard
  225. .iter()
  226. .find(|record| record.revision.rev_id == rev_id)
  227. .cloned()
  228. })
  229. .collect::<Vec<SyncRecord>>();
  230. Ok(records)
  231. }
  232. fn update_revision_record(&self, changesets: Vec<RevisionChangeset>) -> FlowyResult<()> {
  233. for changeset in changesets {
  234. if let Some(record) = self
  235. .records
  236. .write()
  237. .iter_mut()
  238. .find(|record| record.revision.rev_id == *changeset.rev_id.as_ref())
  239. {
  240. record.state = changeset.state;
  241. }
  242. }
  243. Ok(())
  244. }
  245. fn delete_revision_records(&self, _object_id: &str, rev_ids: Option<Vec<i64>>) -> Result<(), Self::Error> {
  246. match rev_ids {
  247. None => {}
  248. Some(rev_ids) => {
  249. for rev_id in rev_ids {
  250. if let Some(index) = self
  251. .records
  252. .read()
  253. .iter()
  254. .position(|record| record.revision.rev_id == rev_id)
  255. {
  256. self.records.write().remove(index);
  257. }
  258. }
  259. }
  260. }
  261. Ok(())
  262. }
  263. fn delete_and_insert_records(
  264. &self,
  265. _object_id: &str,
  266. _deleted_rev_ids: Option<Vec<i64>>,
  267. _inserted_records: Vec<SyncRecord>,
  268. ) -> Result<(), Self::Error> {
  269. todo!()
  270. }
  271. }
  272. pub struct RevisionConnectionMock {}
  273. pub struct RevisionSnapshotMock {}
  274. impl RevisionSnapshotDiskCache for RevisionSnapshotMock {
  275. fn write_snapshot(&self, _object_id: &str, _rev_id: i64, _data: Vec<u8>) -> FlowyResult<()> {
  276. todo!()
  277. }
  278. fn read_snapshot(&self, _object_id: &str, _rev_id: i64) -> FlowyResult<RevisionSnapshotInfo> {
  279. todo!()
  280. }
  281. }
  282. pub struct RevisionCompressMock {}
  283. impl RevisionMergeable for RevisionCompressMock {
  284. fn combine_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
  285. let mut object = RevisionObjectMock::new("");
  286. for revision in revisions {
  287. let other = RevisionObjectMock::from_bytes(&revision.bytes);
  288. let _ = object.compose(other)?;
  289. }
  290. Ok(Bytes::from(object.to_bytes()))
  291. }
  292. }
  293. #[derive(Serialize, Deserialize)]
  294. pub struct InvalidRevisionObject {
  295. data: String,
  296. }
  297. impl InvalidRevisionObject {
  298. pub fn new() -> Vec<u8> {
  299. let object = InvalidRevisionObject { data: "".to_string() };
  300. object.to_bytes()
  301. }
  302. fn to_bytes(&self) -> Vec<u8> {
  303. serde_json::to_vec(self).unwrap()
  304. }
  305. fn from_bytes(bytes: &[u8]) -> Self {
  306. serde_json::from_slice(bytes).unwrap()
  307. }
  308. }
  309. #[derive(Serialize, Deserialize)]
  310. pub struct RevisionObjectMock {
  311. content: String,
  312. }
  313. impl RevisionObjectMock {
  314. pub fn new(s: &str) -> Self {
  315. Self { content: s.to_owned() }
  316. }
  317. pub fn compose(&mut self, other: RevisionObjectMock) -> FlowyResult<()> {
  318. self.content.push_str(other.content.as_str());
  319. Ok(())
  320. }
  321. pub fn to_bytes(&self) -> Vec<u8> {
  322. serde_json::to_vec(self).unwrap()
  323. }
  324. pub fn from_bytes(bytes: &[u8]) -> Self {
  325. serde_json::from_slice(bytes).unwrap()
  326. }
  327. }
  328. pub struct RevisionObjectMockSerde();
  329. impl RevisionObjectDeserializer for RevisionObjectMockSerde {
  330. type Output = RevisionObjectMock;
  331. fn deserialize_revisions(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
  332. let mut object = RevisionObjectMock::new("");
  333. if revisions.is_empty() {
  334. return Ok(object);
  335. }
  336. for revision in revisions {
  337. let revision_object = RevisionObjectMock::from_bytes(&revision.bytes);
  338. let _ = object.compose(revision_object)?;
  339. }
  340. Ok(object)
  341. }
  342. }