script.rs 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. use bytes::Bytes;
  2. use flowy_error::{FlowyError, FlowyResult};
  3. use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, SyncRecord};
  4. use flowy_revision::{
  5. RevisionCompress, RevisionManager, RevisionPersistence, RevisionSnapshotDiskCache, RevisionSnapshotInfo,
  6. };
  7. use flowy_sync::entities::revision::{Revision, RevisionRange};
  8. use flowy_sync::util::md5;
  9. use nanoid::nanoid;
  10. use parking_lot::RwLock;
  11. use serde::{Deserialize, Serialize};
  12. use std::sync::Arc;
  13. use std::time::Duration;
  14. use tokio::time::interval;
  15. pub enum RevisionScript {
  16. AddLocalRevision {
  17. content: String,
  18. base_rev_id: i64,
  19. rev_id: i64,
  20. },
  21. AckRevision {
  22. rev_id: i64,
  23. },
  24. AssertNextSyncRevisionId {
  25. rev_id: Option<i64>,
  26. },
  27. AssertNextSyncRevisionContent {
  28. expected: String,
  29. },
  30. Wait {
  31. milliseconds: u64,
  32. },
  33. AssertNextSyncRevision(Option<Revision>),
  34. }
  35. pub struct RevisionTest {
  36. rev_manager: Arc<RevisionManager<RevisionConnectionMock>>,
  37. }
  38. impl RevisionTest {
  39. pub async fn new() -> Self {
  40. let user_id = nanoid!(10);
  41. let object_id = nanoid!(6);
  42. let persistence = RevisionPersistence::new(&user_id, &object_id, RevisionDiskCacheMock::new());
  43. let compress = RevisionCompressMock {};
  44. let snapshot = RevisionSnapshotMock {};
  45. let rev_manager = RevisionManager::new(&user_id, &object_id, persistence, compress, snapshot);
  46. Self {
  47. rev_manager: Arc::new(rev_manager),
  48. }
  49. }
  50. pub async fn run_scripts(&self, scripts: Vec<RevisionScript>) {
  51. for script in scripts {
  52. self.run_script(script).await;
  53. }
  54. }
  55. pub fn next_rev_id_pair(&self) -> (i64, i64) {
  56. self.rev_manager.next_rev_id_pair()
  57. }
  58. pub async fn run_script(&self, script: RevisionScript) {
  59. match script {
  60. RevisionScript::AddLocalRevision {
  61. content,
  62. base_rev_id,
  63. rev_id,
  64. } => {
  65. let object = RevisionObjectMock::new(&content);
  66. let bytes = object.to_bytes();
  67. let md5 = md5(&bytes);
  68. let revision = Revision::new(
  69. &self.rev_manager.object_id,
  70. base_rev_id,
  71. rev_id,
  72. Bytes::from(bytes),
  73. md5,
  74. );
  75. self.rev_manager.add_local_revision(&revision).await.unwrap();
  76. }
  77. RevisionScript::AckRevision { rev_id } => {
  78. //
  79. self.rev_manager.ack_revision(rev_id).await.unwrap()
  80. }
  81. RevisionScript::AssertNextSyncRevisionId { rev_id } => {
  82. assert_eq!(self.rev_manager.next_sync_rev_id().await, rev_id)
  83. }
  84. RevisionScript::AssertNextSyncRevisionContent { expected } => {
  85. //
  86. let rev_id = self.rev_manager.next_sync_rev_id().await.unwrap();
  87. let revision = self.rev_manager.get_revision(rev_id).await.unwrap();
  88. let object = RevisionObjectMock::from_bytes(&revision.bytes);
  89. assert_eq!(object.content, expected);
  90. }
  91. RevisionScript::Wait { milliseconds } => {
  92. // let mut interval = interval(Duration::from_millis(milliseconds));
  93. // interval.tick().await;
  94. tokio::time::sleep(Duration::from_millis(milliseconds)).await;
  95. }
  96. RevisionScript::AssertNextSyncRevision(expected) => {
  97. let next_revision = self.rev_manager.next_sync_revision().await.unwrap();
  98. assert_eq!(next_revision, expected);
  99. }
  100. }
  101. }
  102. }
  103. pub struct RevisionDiskCacheMock {
  104. records: RwLock<Vec<SyncRecord>>,
  105. }
  106. impl RevisionDiskCacheMock {
  107. pub fn new() -> Self {
  108. Self {
  109. records: RwLock::new(vec![]),
  110. }
  111. }
  112. }
  113. impl RevisionDiskCache<RevisionConnectionMock> for RevisionDiskCacheMock {
  114. type Error = FlowyError;
  115. fn create_revision_records(&self, revision_records: Vec<SyncRecord>) -> Result<(), Self::Error> {
  116. self.records.write().extend(revision_records);
  117. Ok(())
  118. }
  119. fn get_connection(&self) -> Result<RevisionConnectionMock, Self::Error> {
  120. todo!()
  121. }
  122. fn read_revision_records(
  123. &self,
  124. object_id: &str,
  125. rev_ids: Option<Vec<i64>>,
  126. ) -> Result<Vec<SyncRecord>, Self::Error> {
  127. todo!()
  128. }
  129. fn read_revision_records_with_range(
  130. &self,
  131. object_id: &str,
  132. range: &RevisionRange,
  133. ) -> Result<Vec<SyncRecord>, Self::Error> {
  134. todo!()
  135. }
  136. fn update_revision_record(&self, changesets: Vec<RevisionChangeset>) -> FlowyResult<()> {
  137. for changeset in changesets {
  138. if let Some(record) = self
  139. .records
  140. .write()
  141. .iter_mut()
  142. .find(|record| record.revision.rev_id == *changeset.rev_id.as_ref())
  143. {
  144. record.state = changeset.state;
  145. }
  146. }
  147. Ok(())
  148. }
  149. fn delete_revision_records(&self, object_id: &str, rev_ids: Option<Vec<i64>>) -> Result<(), Self::Error> {
  150. match rev_ids {
  151. None => {}
  152. Some(rev_ids) => {
  153. for rev_id in rev_ids {
  154. if let Some(index) = self
  155. .records
  156. .read()
  157. .iter()
  158. .position(|record| record.revision.rev_id == rev_id)
  159. {
  160. self.records.write().remove(index);
  161. }
  162. }
  163. }
  164. }
  165. Ok(())
  166. }
  167. fn delete_and_insert_records(
  168. &self,
  169. object_id: &str,
  170. deleted_rev_ids: Option<Vec<i64>>,
  171. inserted_records: Vec<SyncRecord>,
  172. ) -> Result<(), Self::Error> {
  173. todo!()
  174. }
  175. }
  176. pub struct RevisionConnectionMock {}
  177. pub struct RevisionSnapshotMock {}
  178. impl RevisionSnapshotDiskCache for RevisionSnapshotMock {
  179. fn write_snapshot(&self, object_id: &str, rev_id: i64, data: Vec<u8>) -> FlowyResult<()> {
  180. todo!()
  181. }
  182. fn read_snapshot(&self, object_id: &str, rev_id: i64) -> FlowyResult<RevisionSnapshotInfo> {
  183. todo!()
  184. }
  185. }
  186. pub struct RevisionCompressMock {}
  187. impl RevisionCompress for RevisionCompressMock {
  188. fn combine_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
  189. let mut object = RevisionObjectMock::new("");
  190. for revision in revisions {
  191. let other = RevisionObjectMock::from_bytes(&revision.bytes);
  192. object.compose(other);
  193. }
  194. Ok(Bytes::from(object.to_bytes()))
  195. }
  196. }
  197. #[derive(Serialize, Deserialize)]
  198. pub struct RevisionObjectMock {
  199. content: String,
  200. }
  201. impl RevisionObjectMock {
  202. pub fn new(s: &str) -> Self {
  203. Self { content: s.to_owned() }
  204. }
  205. pub fn compose(&mut self, other: RevisionObjectMock) {
  206. self.content.push_str(other.content.as_str());
  207. }
  208. pub fn to_bytes(&self) -> Vec<u8> {
  209. serde_json::to_vec(self).unwrap()
  210. }
  211. pub fn from_bytes(bytes: &[u8]) -> Self {
  212. serde_json::from_slice(bytes).unwrap()
  213. }
  214. }