script.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. use bytes::Bytes;
  2. use flowy_error::{internal_error, FlowyError, FlowyResult};
  3. use flowy_revision::{
  4. RevisionManager, RevisionMergeable, RevisionObjectDeserializer, RevisionPersistence,
  5. RevisionPersistenceConfiguration, RevisionSnapshotData, RevisionSnapshotPersistence,
  6. REVISION_WRITE_INTERVAL_IN_MILLIS,
  7. };
  8. use flowy_revision_persistence::{RevisionChangeset, RevisionDiskCache, SyncRecord};
  9. use lib_infra::util::md5;
  10. use nanoid::nanoid;
  11. use parking_lot::RwLock;
  12. use revision_model::{Revision, RevisionRange};
  13. use serde::{Deserialize, Serialize};
  14. use std::sync::Arc;
  15. use std::time::Duration;
  16. pub enum RevisionScript {
  17. AddLocalRevision { content: String },
  18. AddLocalRevision2 { content: String },
  19. AddInvalidLocalRevision { bytes: Vec<u8> },
  20. AckRevision { rev_id: i64 },
  21. AssertNextSyncRevisionId { rev_id: Option<i64> },
  22. AssertNumberOfSyncRevisions { num: usize },
  23. AssertNumberOfRevisionsInDisk { num: usize },
  24. AssertNextSyncRevisionContent { expected: String },
  25. WaitWhenWriteToDisk,
  26. }
  27. pub struct RevisionTest {
  28. user_id: String,
  29. object_id: String,
  30. configuration: RevisionPersistenceConfiguration,
  31. rev_manager: Arc<RevisionManager<RevisionConnectionMock>>,
  32. }
  33. impl RevisionTest {
  34. pub async fn new() -> Self {
  35. Self::new_with_configuration(2).await
  36. }
  37. pub async fn new_with_configuration(merge_threshold: i64) -> Self {
  38. let user_id = nanoid!(10);
  39. let object_id = nanoid!(6);
  40. let configuration = RevisionPersistenceConfiguration::new(merge_threshold as usize, false);
  41. let disk_cache = RevisionDiskCacheMock::new(vec![]);
  42. let persistence = RevisionPersistence::new(&user_id, &object_id, disk_cache, configuration.clone());
  43. let compress = RevisionMergeableMock {};
  44. let snapshot = RevisionSnapshotMock {};
  45. let mut rev_manager = RevisionManager::new(&user_id, &object_id, persistence, compress, snapshot);
  46. rev_manager.initialize::<RevisionObjectMockSerde>(None).await.unwrap();
  47. Self {
  48. user_id,
  49. object_id,
  50. configuration,
  51. rev_manager: Arc::new(rev_manager),
  52. }
  53. }
  54. pub async fn new_with_other(old_test: RevisionTest) -> Self {
  55. let records = old_test.rev_manager.get_all_revision_records().unwrap();
  56. let disk_cache = RevisionDiskCacheMock::new(records);
  57. let configuration = old_test.configuration;
  58. let persistence = RevisionPersistence::new(
  59. &old_test.user_id,
  60. &old_test.object_id,
  61. disk_cache,
  62. configuration.clone(),
  63. );
  64. let compress = RevisionMergeableMock {};
  65. let snapshot = RevisionSnapshotMock {};
  66. let mut rev_manager =
  67. RevisionManager::new(&old_test.user_id, &old_test.object_id, persistence, compress, snapshot);
  68. rev_manager.initialize::<RevisionObjectMockSerde>(None).await.unwrap();
  69. Self {
  70. user_id: old_test.user_id,
  71. object_id: old_test.object_id,
  72. configuration,
  73. rev_manager: Arc::new(rev_manager),
  74. }
  75. }
  76. pub async fn run_scripts(&self, scripts: Vec<RevisionScript>) {
  77. for script in scripts {
  78. self.run_script(script).await;
  79. }
  80. }
  81. pub async fn run_script(&self, script: RevisionScript) {
  82. match script {
  83. RevisionScript::AddLocalRevision { content } => {
  84. let object = RevisionObjectMock::new(&content);
  85. let bytes = object.to_bytes();
  86. let md5 = md5(&bytes);
  87. self.rev_manager
  88. .add_local_revision(Bytes::from(bytes), md5)
  89. .await
  90. .unwrap();
  91. }
  92. RevisionScript::AddLocalRevision2 { content } => {
  93. let object = RevisionObjectMock::new(&content);
  94. let bytes = object.to_bytes();
  95. let md5 = md5(&bytes);
  96. self.rev_manager
  97. .add_local_revision(Bytes::from(bytes), md5)
  98. .await
  99. .unwrap();
  100. }
  101. RevisionScript::AddInvalidLocalRevision { bytes } => {
  102. let md5 = md5(&bytes);
  103. self.rev_manager
  104. .add_local_revision(Bytes::from(bytes), md5)
  105. .await
  106. .unwrap();
  107. }
  108. RevisionScript::AckRevision { rev_id } => {
  109. //
  110. self.rev_manager.ack_revision(rev_id).await.unwrap()
  111. }
  112. RevisionScript::AssertNextSyncRevisionId { rev_id } => {
  113. assert_eq!(self.rev_manager.next_sync_rev_id().await, rev_id)
  114. }
  115. RevisionScript::AssertNumberOfSyncRevisions { num } => {
  116. assert_eq!(self.rev_manager.number_of_sync_revisions(), num)
  117. }
  118. RevisionScript::AssertNumberOfRevisionsInDisk { num } => {
  119. assert_eq!(self.rev_manager.number_of_revisions_in_disk(), num)
  120. }
  121. RevisionScript::AssertNextSyncRevisionContent { expected } => {
  122. //
  123. let rev_id = self.rev_manager.next_sync_rev_id().await.unwrap();
  124. let revision = self.rev_manager.get_revision(rev_id).await.unwrap();
  125. let object = RevisionObjectMock::from_bytes(&revision.bytes).unwrap();
  126. assert_eq!(object.content, expected);
  127. }
  128. RevisionScript::WaitWhenWriteToDisk => {
  129. let milliseconds = 2 * REVISION_WRITE_INTERVAL_IN_MILLIS;
  130. tokio::time::sleep(Duration::from_millis(milliseconds)).await;
  131. }
  132. }
  133. }
  134. }
  135. pub struct RevisionDiskCacheMock {
  136. records: RwLock<Vec<SyncRecord>>,
  137. }
  138. impl RevisionDiskCacheMock {
  139. pub fn new(records: Vec<SyncRecord>) -> Self {
  140. Self {
  141. records: RwLock::new(records),
  142. }
  143. }
  144. }
  145. impl RevisionDiskCache<RevisionConnectionMock> for RevisionDiskCacheMock {
  146. type Error = FlowyError;
  147. fn create_revision_records(&self, revision_records: Vec<SyncRecord>) -> Result<(), Self::Error> {
  148. self.records.write().extend(revision_records);
  149. Ok(())
  150. }
  151. fn get_connection(&self) -> Result<RevisionConnectionMock, Self::Error> {
  152. todo!()
  153. }
  154. fn read_revision_records(
  155. &self,
  156. _object_id: &str,
  157. rev_ids: Option<Vec<i64>>,
  158. ) -> Result<Vec<SyncRecord>, Self::Error> {
  159. match rev_ids {
  160. None => Ok(self.records.read().clone()),
  161. Some(rev_ids) => Ok(self
  162. .records
  163. .read()
  164. .iter()
  165. .filter(|record| rev_ids.contains(&record.revision.rev_id))
  166. .cloned()
  167. .collect::<Vec<SyncRecord>>()),
  168. }
  169. }
  170. fn read_revision_records_with_range(
  171. &self,
  172. _object_id: &str,
  173. range: &RevisionRange,
  174. ) -> Result<Vec<SyncRecord>, Self::Error> {
  175. let read_guard = self.records.read();
  176. let records = range
  177. .iter()
  178. .flat_map(|rev_id| {
  179. read_guard
  180. .iter()
  181. .find(|record| record.revision.rev_id == rev_id)
  182. .cloned()
  183. })
  184. .collect::<Vec<SyncRecord>>();
  185. Ok(records)
  186. }
  187. fn update_revision_record(&self, changesets: Vec<RevisionChangeset>) -> FlowyResult<()> {
  188. for changeset in changesets {
  189. if let Some(record) = self
  190. .records
  191. .write()
  192. .iter_mut()
  193. .find(|record| record.revision.rev_id == changeset.rev_id)
  194. {
  195. record.state = changeset.state;
  196. }
  197. }
  198. Ok(())
  199. }
  200. fn delete_revision_records(&self, _object_id: &str, rev_ids: Option<Vec<i64>>) -> Result<(), Self::Error> {
  201. match rev_ids {
  202. None => {}
  203. Some(rev_ids) => {
  204. for rev_id in rev_ids {
  205. if let Some(index) = self
  206. .records
  207. .read()
  208. .iter()
  209. .position(|record| record.revision.rev_id == rev_id)
  210. {
  211. self.records.write().remove(index);
  212. }
  213. }
  214. }
  215. }
  216. Ok(())
  217. }
  218. fn delete_and_insert_records(
  219. &self,
  220. _object_id: &str,
  221. _deleted_rev_ids: Option<Vec<i64>>,
  222. _inserted_records: Vec<SyncRecord>,
  223. ) -> Result<(), Self::Error> {
  224. todo!()
  225. }
  226. }
  227. pub struct RevisionConnectionMock {}
  228. pub struct RevisionSnapshotMock {}
  229. impl RevisionSnapshotPersistence for RevisionSnapshotMock {
  230. fn write_snapshot(&self, _rev_id: i64, _data: Vec<u8>) -> FlowyResult<()> {
  231. Ok(())
  232. }
  233. fn read_snapshot(&self, _rev_id: i64) -> FlowyResult<Option<RevisionSnapshotData>> {
  234. Ok(None)
  235. }
  236. fn read_last_snapshot(&self) -> FlowyResult<Option<RevisionSnapshotData>> {
  237. Ok(None)
  238. }
  239. }
  240. pub struct RevisionMergeableMock {}
  241. impl RevisionMergeable for RevisionMergeableMock {
  242. fn combine_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
  243. let mut object = RevisionObjectMock::new("");
  244. for revision in revisions {
  245. if let Ok(other) = RevisionObjectMock::from_bytes(&revision.bytes) {
  246. object.compose(other)?;
  247. }
  248. }
  249. Ok(Bytes::from(object.to_bytes()))
  250. }
  251. }
  252. #[derive(Serialize, Deserialize)]
  253. pub struct InvalidRevisionObject {
  254. data: String,
  255. }
  256. impl InvalidRevisionObject {
  257. pub fn new() -> Self {
  258. InvalidRevisionObject { data: "".to_string() }
  259. }
  260. pub(crate) fn to_bytes(&self) -> Vec<u8> {
  261. serde_json::to_vec(self).unwrap()
  262. }
  263. // fn from_bytes(bytes: &[u8]) -> Self {
  264. // serde_json::from_slice(bytes).unwrap()
  265. // }
  266. }
  267. #[derive(Serialize, Deserialize)]
  268. pub struct RevisionObjectMock {
  269. content: String,
  270. }
  271. impl RevisionObjectMock {
  272. pub fn new(s: &str) -> Self {
  273. Self { content: s.to_owned() }
  274. }
  275. pub fn compose(&mut self, other: RevisionObjectMock) -> FlowyResult<()> {
  276. self.content.push_str(other.content.as_str());
  277. Ok(())
  278. }
  279. pub fn to_bytes(&self) -> Vec<u8> {
  280. serde_json::to_vec(self).unwrap()
  281. }
  282. pub fn from_bytes(bytes: &[u8]) -> FlowyResult<Self> {
  283. serde_json::from_slice(bytes).map_err(internal_error)
  284. }
  285. }
  286. pub struct RevisionObjectMockSerde();
  287. impl RevisionObjectDeserializer for RevisionObjectMockSerde {
  288. type Output = RevisionObjectMock;
  289. fn deserialize_revisions(_object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
  290. let mut object = RevisionObjectMock::new("");
  291. if revisions.is_empty() {
  292. return Ok(object);
  293. }
  294. for revision in revisions {
  295. if let Ok(revision_object) = RevisionObjectMock::from_bytes(&revision.bytes) {
  296. object.compose(revision_object)?;
  297. }
  298. }
  299. Ok(object)
  300. }
  301. fn recover_operations_from_revisions(_revisions: Vec<Revision>) -> Option<Self::Output> {
  302. None
  303. }
  304. }