script.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. use bytes::Bytes;
  2. use flowy_error::{internal_error, FlowyError, FlowyResult};
  3. use flowy_revision::{
  4. RevisionManager, RevisionMergeable, RevisionObjectDeserializer, RevisionPersistence,
  5. RevisionPersistenceConfiguration, RevisionSnapshot, RevisionSnapshotDiskCache, REVISION_WRITE_INTERVAL_IN_MILLIS,
  6. };
  7. use flowy_revision_persistence::{RevisionChangeset, RevisionDiskCache, SyncRecord};
  8. use flowy_http_model::revision::{Revision, RevisionRange};
  9. use flowy_http_model::util::md5;
  10. use nanoid::nanoid;
  11. use parking_lot::RwLock;
  12. use serde::{Deserialize, Serialize};
  13. use std::sync::Arc;
  14. use std::time::Duration;
  15. pub enum RevisionScript {
  16. AddLocalRevision { content: String },
  17. AddLocalRevision2 { content: String },
  18. AddInvalidLocalRevision { bytes: Vec<u8> },
  19. AckRevision { rev_id: i64 },
  20. AssertNextSyncRevisionId { rev_id: Option<i64> },
  21. AssertNumberOfSyncRevisions { num: usize },
  22. AssertNumberOfRevisionsInDisk { num: usize },
  23. AssertNextSyncRevisionContent { expected: String },
  24. WaitWhenWriteToDisk,
  25. }
  26. pub struct RevisionTest {
  27. user_id: String,
  28. object_id: String,
  29. configuration: RevisionPersistenceConfiguration,
  30. rev_manager: Arc<RevisionManager<RevisionConnectionMock>>,
  31. }
  32. impl RevisionTest {
  33. pub async fn new() -> Self {
  34. Self::new_with_configuration(2).await
  35. }
  36. pub async fn new_with_configuration(merge_threshold: i64) -> Self {
  37. let user_id = nanoid!(10);
  38. let object_id = nanoid!(6);
  39. let configuration = RevisionPersistenceConfiguration::new(merge_threshold as usize, false);
  40. let disk_cache = RevisionDiskCacheMock::new(vec![]);
  41. let persistence = RevisionPersistence::new(&user_id, &object_id, disk_cache, configuration.clone());
  42. let compress = RevisionMergeableMock {};
  43. let snapshot = RevisionSnapshotMock {};
  44. let mut rev_manager = RevisionManager::new(&user_id, &object_id, persistence, compress, snapshot);
  45. rev_manager.initialize::<RevisionObjectMockSerde>(None).await.unwrap();
  46. Self {
  47. user_id,
  48. object_id,
  49. configuration,
  50. rev_manager: Arc::new(rev_manager),
  51. }
  52. }
  53. pub async fn new_with_other(old_test: RevisionTest) -> Self {
  54. let records = old_test.rev_manager.get_all_revision_records().unwrap();
  55. let disk_cache = RevisionDiskCacheMock::new(records);
  56. let configuration = old_test.configuration;
  57. let persistence = RevisionPersistence::new(
  58. &old_test.user_id,
  59. &old_test.object_id,
  60. disk_cache,
  61. configuration.clone(),
  62. );
  63. let compress = RevisionMergeableMock {};
  64. let snapshot = RevisionSnapshotMock {};
  65. let mut rev_manager =
  66. RevisionManager::new(&old_test.user_id, &old_test.object_id, persistence, compress, snapshot);
  67. rev_manager.initialize::<RevisionObjectMockSerde>(None).await.unwrap();
  68. Self {
  69. user_id: old_test.user_id,
  70. object_id: old_test.object_id,
  71. configuration,
  72. rev_manager: Arc::new(rev_manager),
  73. }
  74. }
  75. pub async fn run_scripts(&self, scripts: Vec<RevisionScript>) {
  76. for script in scripts {
  77. self.run_script(script).await;
  78. }
  79. }
  80. pub async fn run_script(&self, script: RevisionScript) {
  81. match script {
  82. RevisionScript::AddLocalRevision { content } => {
  83. let object = RevisionObjectMock::new(&content);
  84. let bytes = object.to_bytes();
  85. let md5 = md5(&bytes);
  86. self.rev_manager
  87. .add_local_revision(Bytes::from(bytes), md5)
  88. .await
  89. .unwrap();
  90. }
  91. RevisionScript::AddLocalRevision2 { content } => {
  92. let object = RevisionObjectMock::new(&content);
  93. let bytes = object.to_bytes();
  94. let md5 = md5(&bytes);
  95. self.rev_manager
  96. .add_local_revision(Bytes::from(bytes), md5)
  97. .await
  98. .unwrap();
  99. }
  100. RevisionScript::AddInvalidLocalRevision { bytes } => {
  101. let md5 = md5(&bytes);
  102. self.rev_manager
  103. .add_local_revision(Bytes::from(bytes), md5)
  104. .await
  105. .unwrap();
  106. }
  107. RevisionScript::AckRevision { rev_id } => {
  108. //
  109. self.rev_manager.ack_revision(rev_id).await.unwrap()
  110. }
  111. RevisionScript::AssertNextSyncRevisionId { rev_id } => {
  112. assert_eq!(self.rev_manager.next_sync_rev_id().await, rev_id)
  113. }
  114. RevisionScript::AssertNumberOfSyncRevisions { num } => {
  115. assert_eq!(self.rev_manager.number_of_sync_revisions(), num)
  116. }
  117. RevisionScript::AssertNumberOfRevisionsInDisk { num } => {
  118. assert_eq!(self.rev_manager.number_of_revisions_in_disk(), num)
  119. }
  120. RevisionScript::AssertNextSyncRevisionContent { expected } => {
  121. //
  122. let rev_id = self.rev_manager.next_sync_rev_id().await.unwrap();
  123. let revision = self.rev_manager.get_revision(rev_id).await.unwrap();
  124. let object = RevisionObjectMock::from_bytes(&revision.bytes).unwrap();
  125. assert_eq!(object.content, expected);
  126. }
  127. RevisionScript::WaitWhenWriteToDisk => {
  128. let milliseconds = 2 * REVISION_WRITE_INTERVAL_IN_MILLIS;
  129. tokio::time::sleep(Duration::from_millis(milliseconds)).await;
  130. }
  131. }
  132. }
  133. }
  134. pub struct RevisionDiskCacheMock {
  135. records: RwLock<Vec<SyncRecord>>,
  136. }
  137. impl RevisionDiskCacheMock {
  138. pub fn new(records: Vec<SyncRecord>) -> Self {
  139. Self {
  140. records: RwLock::new(records),
  141. }
  142. }
  143. }
  144. impl RevisionDiskCache<RevisionConnectionMock> for RevisionDiskCacheMock {
  145. type Error = FlowyError;
  146. fn create_revision_records(&self, revision_records: Vec<SyncRecord>) -> Result<(), Self::Error> {
  147. self.records.write().extend(revision_records);
  148. Ok(())
  149. }
  150. fn get_connection(&self) -> Result<RevisionConnectionMock, Self::Error> {
  151. todo!()
  152. }
  153. fn read_revision_records(
  154. &self,
  155. _object_id: &str,
  156. rev_ids: Option<Vec<i64>>,
  157. ) -> Result<Vec<SyncRecord>, Self::Error> {
  158. match rev_ids {
  159. None => Ok(self.records.read().clone()),
  160. Some(rev_ids) => Ok(self
  161. .records
  162. .read()
  163. .iter()
  164. .filter(|record| rev_ids.contains(&record.revision.rev_id))
  165. .cloned()
  166. .collect::<Vec<SyncRecord>>()),
  167. }
  168. }
  169. fn read_revision_records_with_range(
  170. &self,
  171. _object_id: &str,
  172. range: &RevisionRange,
  173. ) -> Result<Vec<SyncRecord>, Self::Error> {
  174. let read_guard = self.records.read();
  175. let records = range
  176. .iter()
  177. .flat_map(|rev_id| {
  178. read_guard
  179. .iter()
  180. .find(|record| record.revision.rev_id == rev_id)
  181. .cloned()
  182. })
  183. .collect::<Vec<SyncRecord>>();
  184. Ok(records)
  185. }
  186. fn update_revision_record(&self, changesets: Vec<RevisionChangeset>) -> FlowyResult<()> {
  187. for changeset in changesets {
  188. if let Some(record) = self
  189. .records
  190. .write()
  191. .iter_mut()
  192. .find(|record| record.revision.rev_id == changeset.rev_id)
  193. {
  194. record.state = changeset.state;
  195. }
  196. }
  197. Ok(())
  198. }
  199. fn delete_revision_records(&self, _object_id: &str, rev_ids: Option<Vec<i64>>) -> Result<(), Self::Error> {
  200. match rev_ids {
  201. None => {}
  202. Some(rev_ids) => {
  203. for rev_id in rev_ids {
  204. if let Some(index) = self
  205. .records
  206. .read()
  207. .iter()
  208. .position(|record| record.revision.rev_id == rev_id)
  209. {
  210. self.records.write().remove(index);
  211. }
  212. }
  213. }
  214. }
  215. Ok(())
  216. }
  217. fn delete_and_insert_records(
  218. &self,
  219. _object_id: &str,
  220. _deleted_rev_ids: Option<Vec<i64>>,
  221. _inserted_records: Vec<SyncRecord>,
  222. ) -> Result<(), Self::Error> {
  223. todo!()
  224. }
  225. }
  226. pub struct RevisionConnectionMock {}
  227. pub struct RevisionSnapshotMock {}
  228. impl RevisionSnapshotDiskCache for RevisionSnapshotMock {
  229. fn write_snapshot(&self, _rev_id: i64, _data: Vec<u8>) -> FlowyResult<()> {
  230. todo!()
  231. }
  232. fn read_snapshot(&self, _rev_id: i64) -> FlowyResult<Option<RevisionSnapshot>> {
  233. todo!()
  234. }
  235. fn read_last_snapshot(&self) -> FlowyResult<Option<RevisionSnapshot>> {
  236. Ok(None)
  237. }
  238. }
  239. pub struct RevisionMergeableMock {}
  240. impl RevisionMergeable for RevisionMergeableMock {
  241. fn combine_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
  242. let mut object = RevisionObjectMock::new("");
  243. for revision in revisions {
  244. if let Ok(other) = RevisionObjectMock::from_bytes(&revision.bytes) {
  245. object.compose(other)?;
  246. }
  247. }
  248. Ok(Bytes::from(object.to_bytes()))
  249. }
  250. }
  251. #[derive(Serialize, Deserialize)]
  252. pub struct InvalidRevisionObject {
  253. data: String,
  254. }
  255. impl InvalidRevisionObject {
  256. pub fn new() -> Self {
  257. InvalidRevisionObject { data: "".to_string() }
  258. }
  259. pub(crate) fn to_bytes(&self) -> Vec<u8> {
  260. serde_json::to_vec(self).unwrap()
  261. }
  262. // fn from_bytes(bytes: &[u8]) -> Self {
  263. // serde_json::from_slice(bytes).unwrap()
  264. // }
  265. }
  266. #[derive(Serialize, Deserialize)]
  267. pub struct RevisionObjectMock {
  268. content: String,
  269. }
  270. impl RevisionObjectMock {
  271. pub fn new(s: &str) -> Self {
  272. Self { content: s.to_owned() }
  273. }
  274. pub fn compose(&mut self, other: RevisionObjectMock) -> FlowyResult<()> {
  275. self.content.push_str(other.content.as_str());
  276. Ok(())
  277. }
  278. pub fn to_bytes(&self) -> Vec<u8> {
  279. serde_json::to_vec(self).unwrap()
  280. }
  281. pub fn from_bytes(bytes: &[u8]) -> FlowyResult<Self> {
  282. serde_json::from_slice(bytes).map_err(internal_error)
  283. }
  284. }
  285. pub struct RevisionObjectMockSerde();
  286. impl RevisionObjectDeserializer for RevisionObjectMockSerde {
  287. type Output = RevisionObjectMock;
  288. fn deserialize_revisions(_object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
  289. let mut object = RevisionObjectMock::new("");
  290. if revisions.is_empty() {
  291. return Ok(object);
  292. }
  293. for revision in revisions {
  294. if let Ok(revision_object) = RevisionObjectMock::from_bytes(&revision.bytes) {
  295. object.compose(revision_object)?;
  296. }
  297. }
  298. Ok(object)
  299. }
  300. }