script.rs 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. use bytes::Bytes;
  2. use flowy_error::{FlowyError, FlowyResult};
  3. use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, SyncRecord};
  4. use flowy_revision::{
  5. RevisionManager, RevisionMergeable, RevisionPersistence, RevisionPersistenceConfiguration,
  6. RevisionSnapshotDiskCache, RevisionSnapshotInfo,
  7. };
  8. use flowy_sync::entities::revision::{Revision, RevisionRange};
  9. use flowy_sync::util::md5;
  10. use nanoid::nanoid;
  11. use parking_lot::RwLock;
  12. use serde::{Deserialize, Serialize};
  13. use std::sync::Arc;
  14. use std::time::Duration;
  15. pub enum RevisionScript {
  16. AddLocalRevision {
  17. content: String,
  18. base_rev_id: i64,
  19. rev_id: i64,
  20. },
  21. AckRevision {
  22. rev_id: i64,
  23. },
  24. AssertNextSyncRevisionId {
  25. rev_id: Option<i64>,
  26. },
  27. AssertNumberOfSyncRevisions {
  28. num: usize,
  29. },
  30. AssertNextSyncRevisionContent {
  31. expected: String,
  32. },
  33. Wait {
  34. milliseconds: u64,
  35. },
  36. }
  37. pub struct RevisionTest {
  38. rev_manager: Arc<RevisionManager<RevisionConnectionMock>>,
  39. }
  40. impl RevisionTest {
  41. pub async fn new() -> Self {
  42. Self::new_with_configuration(2).await
  43. }
  44. pub async fn new_with_configuration(merge_when_excess_number_of_version: i64) -> Self {
  45. let user_id = nanoid!(10);
  46. let object_id = nanoid!(6);
  47. let configuration = RevisionPersistenceConfiguration::new(merge_when_excess_number_of_version as usize);
  48. let persistence = RevisionPersistence::new(&user_id, &object_id, RevisionDiskCacheMock::new(), configuration);
  49. let compress = RevisionCompressMock {};
  50. let snapshot = RevisionSnapshotMock {};
  51. let rev_manager = RevisionManager::new(&user_id, &object_id, persistence, compress, snapshot);
  52. Self {
  53. rev_manager: Arc::new(rev_manager),
  54. }
  55. }
  56. pub async fn run_scripts(&self, scripts: Vec<RevisionScript>) {
  57. for script in scripts {
  58. self.run_script(script).await;
  59. }
  60. }
  61. pub fn next_rev_id_pair(&self) -> (i64, i64) {
  62. self.rev_manager.next_rev_id_pair()
  63. }
  64. pub async fn run_script(&self, script: RevisionScript) {
  65. match script {
  66. RevisionScript::AddLocalRevision {
  67. content,
  68. base_rev_id,
  69. rev_id,
  70. } => {
  71. let object = RevisionObjectMock::new(&content);
  72. let bytes = object.to_bytes();
  73. let md5 = md5(&bytes);
  74. let revision = Revision::new(
  75. &self.rev_manager.object_id,
  76. base_rev_id,
  77. rev_id,
  78. Bytes::from(bytes),
  79. md5,
  80. );
  81. self.rev_manager.add_local_revision(&revision).await.unwrap();
  82. }
  83. RevisionScript::AckRevision { rev_id } => {
  84. //
  85. self.rev_manager.ack_revision(rev_id).await.unwrap()
  86. }
  87. RevisionScript::AssertNextSyncRevisionId { rev_id } => {
  88. assert_eq!(self.rev_manager.next_sync_rev_id().await, rev_id)
  89. }
  90. RevisionScript::AssertNumberOfSyncRevisions { num } => {
  91. assert_eq!(self.rev_manager.number_of_sync_revisions(), num)
  92. }
  93. RevisionScript::AssertNextSyncRevisionContent { expected } => {
  94. //
  95. let rev_id = self.rev_manager.next_sync_rev_id().await.unwrap();
  96. let revision = self.rev_manager.get_revision(rev_id).await.unwrap();
  97. let object = RevisionObjectMock::from_bytes(&revision.bytes);
  98. assert_eq!(object.content, expected);
  99. }
  100. RevisionScript::Wait { milliseconds } => {
  101. tokio::time::sleep(Duration::from_millis(milliseconds)).await;
  102. }
  103. }
  104. }
  105. }
  106. pub struct RevisionDiskCacheMock {
  107. records: RwLock<Vec<SyncRecord>>,
  108. }
  109. impl RevisionDiskCacheMock {
  110. pub fn new() -> Self {
  111. Self {
  112. records: RwLock::new(vec![]),
  113. }
  114. }
  115. }
  116. impl RevisionDiskCache<RevisionConnectionMock> for RevisionDiskCacheMock {
  117. type Error = FlowyError;
  118. fn create_revision_records(&self, revision_records: Vec<SyncRecord>) -> Result<(), Self::Error> {
  119. self.records.write().extend(revision_records);
  120. Ok(())
  121. }
  122. fn get_connection(&self) -> Result<RevisionConnectionMock, Self::Error> {
  123. todo!()
  124. }
  125. fn read_revision_records(
  126. &self,
  127. _object_id: &str,
  128. _rev_ids: Option<Vec<i64>>,
  129. ) -> Result<Vec<SyncRecord>, Self::Error> {
  130. todo!()
  131. }
  132. fn read_revision_records_with_range(
  133. &self,
  134. _object_id: &str,
  135. _range: &RevisionRange,
  136. ) -> Result<Vec<SyncRecord>, Self::Error> {
  137. todo!()
  138. }
  139. fn update_revision_record(&self, changesets: Vec<RevisionChangeset>) -> FlowyResult<()> {
  140. for changeset in changesets {
  141. if let Some(record) = self
  142. .records
  143. .write()
  144. .iter_mut()
  145. .find(|record| record.revision.rev_id == *changeset.rev_id.as_ref())
  146. {
  147. record.state = changeset.state;
  148. }
  149. }
  150. Ok(())
  151. }
  152. fn delete_revision_records(&self, _object_id: &str, rev_ids: Option<Vec<i64>>) -> Result<(), Self::Error> {
  153. match rev_ids {
  154. None => {}
  155. Some(rev_ids) => {
  156. for rev_id in rev_ids {
  157. if let Some(index) = self
  158. .records
  159. .read()
  160. .iter()
  161. .position(|record| record.revision.rev_id == rev_id)
  162. {
  163. self.records.write().remove(index);
  164. }
  165. }
  166. }
  167. }
  168. Ok(())
  169. }
  170. fn delete_and_insert_records(
  171. &self,
  172. _object_id: &str,
  173. _deleted_rev_ids: Option<Vec<i64>>,
  174. _inserted_records: Vec<SyncRecord>,
  175. ) -> Result<(), Self::Error> {
  176. todo!()
  177. }
  178. }
  179. pub struct RevisionConnectionMock {}
  180. pub struct RevisionSnapshotMock {}
  181. impl RevisionSnapshotDiskCache for RevisionSnapshotMock {
  182. fn write_snapshot(&self, _object_id: &str, _rev_id: i64, _data: Vec<u8>) -> FlowyResult<()> {
  183. todo!()
  184. }
  185. fn read_snapshot(&self, _object_id: &str, _rev_id: i64) -> FlowyResult<RevisionSnapshotInfo> {
  186. todo!()
  187. }
  188. }
  189. pub struct RevisionCompressMock {}
  190. impl RevisionMergeable for RevisionCompressMock {
  191. fn combine_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
  192. let mut object = RevisionObjectMock::new("");
  193. for revision in revisions {
  194. let other = RevisionObjectMock::from_bytes(&revision.bytes);
  195. object.compose(other);
  196. }
  197. Ok(Bytes::from(object.to_bytes()))
  198. }
  199. }
  200. #[derive(Serialize, Deserialize)]
  201. pub struct RevisionObjectMock {
  202. content: String,
  203. }
  204. impl RevisionObjectMock {
  205. pub fn new(s: &str) -> Self {
  206. Self { content: s.to_owned() }
  207. }
  208. pub fn compose(&mut self, other: RevisionObjectMock) {
  209. self.content.push_str(other.content.as_str());
  210. }
  211. pub fn to_bytes(&self) -> Vec<u8> {
  212. serde_json::to_vec(self).unwrap()
  213. }
  214. pub fn from_bytes(bytes: &[u8]) -> Self {
  215. serde_json::from_slice(bytes).unwrap()
  216. }
  217. }