block_revision_pad.rs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491
  1. use crate::errors::{SyncError, SyncResult};
  2. use crate::util::cal_diff;
  3. use database_model::{
  4. gen_block_id, gen_row_id, CellRevision, DatabaseBlockRevision, RowChangeset, RowRevision,
  5. };
  6. use flowy_sync::util::make_operations_from_revisions;
  7. use lib_infra::util::md5;
  8. use lib_ot::core::{DeltaBuilder, DeltaOperations, EmptyAttributes, OperationTransform};
  9. use revision_model::Revision;
  10. use std::any::type_name;
  11. use std::borrow::Cow;
  12. use std::collections::HashMap;
  13. use std::sync::Arc;
  14. pub type DatabaseBlockOperations = DeltaOperations<EmptyAttributes>;
  15. pub type DatabaseBlockOperationsBuilder = DeltaBuilder;
  16. #[derive(Debug, Clone)]
  17. pub struct DatabaseBlockRevisionPad {
  18. block: DatabaseBlockRevision,
  19. operations: DatabaseBlockOperations,
  20. }
  21. impl std::ops::Deref for DatabaseBlockRevisionPad {
  22. type Target = DatabaseBlockRevision;
  23. fn deref(&self) -> &Self::Target {
  24. &self.block
  25. }
  26. }
  27. impl DatabaseBlockRevisionPad {
  28. pub fn duplicate_data(&self, duplicated_block_id: &str) -> DatabaseBlockRevision {
  29. let duplicated_rows = self
  30. .block
  31. .rows
  32. .iter()
  33. .map(|row| {
  34. let mut duplicated_row = row.as_ref().clone();
  35. duplicated_row.id = gen_row_id();
  36. duplicated_row.block_id = duplicated_block_id.to_string();
  37. Arc::new(duplicated_row)
  38. })
  39. .collect::<Vec<Arc<RowRevision>>>();
  40. DatabaseBlockRevision {
  41. block_id: duplicated_block_id.to_string(),
  42. rows: duplicated_rows,
  43. }
  44. }
  45. pub fn from_operations(operations: DatabaseBlockOperations) -> SyncResult<Self> {
  46. let s = operations.content()?;
  47. let revision: DatabaseBlockRevision = serde_json::from_str(&s).map_err(|e| {
  48. let msg = format!(
  49. "Deserialize operations to {} failed: {}",
  50. type_name::<DatabaseBlockRevision>(),
  51. e
  52. );
  53. tracing::error!("{}", s);
  54. SyncError::internal().context(msg)
  55. })?;
  56. Ok(Self {
  57. block: revision,
  58. operations,
  59. })
  60. }
  61. pub fn from_revisions(revisions: Vec<Revision>) -> SyncResult<Self> {
  62. let operations: DatabaseBlockOperations = make_operations_from_revisions(revisions)?;
  63. Self::from_operations(operations)
  64. }
  65. #[tracing::instrument(level = "trace", skip(self, row), err)]
  66. pub fn add_row_rev(
  67. &mut self,
  68. row: RowRevision,
  69. start_row_id: Option<String>,
  70. ) -> SyncResult<Option<DatabaseBlockRevisionChangeset>> {
  71. self.modify(|rows| {
  72. if let Some(start_row_id) = start_row_id {
  73. if !start_row_id.is_empty() {
  74. if let Some(index) = rows.iter().position(|row| row.id == start_row_id) {
  75. rows.insert(index + 1, Arc::new(row));
  76. return Ok(Some(()));
  77. }
  78. }
  79. }
  80. rows.push(Arc::new(row));
  81. Ok(Some(()))
  82. })
  83. }
  84. pub fn delete_rows(
  85. &mut self,
  86. row_ids: Vec<Cow<'_, String>>,
  87. ) -> SyncResult<Option<DatabaseBlockRevisionChangeset>> {
  88. self.modify(|rows| {
  89. rows.retain(|row| !row_ids.contains(&Cow::Borrowed(&row.id)));
  90. Ok(Some(()))
  91. })
  92. }
  93. pub fn get_row_rev(&self, row_id: &str) -> Option<(usize, Arc<RowRevision>)> {
  94. for (index, row) in self.block.rows.iter().enumerate() {
  95. if row.id == row_id {
  96. return Some((index, row.clone()));
  97. }
  98. }
  99. None
  100. }
  101. pub fn get_row_revs<T>(
  102. &self,
  103. row_ids: Option<Vec<Cow<'_, T>>>,
  104. ) -> SyncResult<Vec<Arc<RowRevision>>>
  105. where
  106. T: AsRef<str> + ToOwned + ?Sized,
  107. {
  108. match row_ids {
  109. None => Ok(self.block.rows.clone()),
  110. Some(row_ids) => {
  111. let row_map = self
  112. .block
  113. .rows
  114. .iter()
  115. .map(|row| (row.id.as_str(), row.clone()))
  116. .collect::<HashMap<&str, Arc<RowRevision>>>();
  117. Ok(
  118. row_ids
  119. .iter()
  120. .flat_map(|row_id| {
  121. let row_id = row_id.as_ref().as_ref();
  122. match row_map.get(row_id) {
  123. None => {
  124. tracing::error!("Can't find the row with id: {}", row_id);
  125. None
  126. },
  127. Some(row) => Some(row.clone()),
  128. }
  129. })
  130. .collect::<Vec<_>>(),
  131. )
  132. },
  133. }
  134. }
  135. pub fn get_cell_revs(
  136. &self,
  137. field_id: &str,
  138. row_ids: Option<Vec<Cow<'_, String>>>,
  139. ) -> SyncResult<Vec<CellRevision>> {
  140. let rows = self.get_row_revs(row_ids)?;
  141. let cell_revs = rows
  142. .iter()
  143. .flat_map(|row| {
  144. let cell_rev = row.cells.get(field_id)?;
  145. Some(cell_rev.clone())
  146. })
  147. .collect::<Vec<CellRevision>>();
  148. Ok(cell_revs)
  149. }
  150. pub fn number_of_rows(&self) -> i32 {
  151. self.block.rows.len() as i32
  152. }
  153. pub fn index_of_row(&self, row_id: &str) -> Option<usize> {
  154. self.block.rows.iter().position(|row| row.id == row_id)
  155. }
  156. pub fn update_row(
  157. &mut self,
  158. changeset: RowChangeset,
  159. ) -> SyncResult<Option<DatabaseBlockRevisionChangeset>> {
  160. let row_id = changeset.row_id.clone();
  161. self.modify_row(&row_id, |row| {
  162. let mut is_changed = None;
  163. if let Some(height) = changeset.height {
  164. row.height = height;
  165. is_changed = Some(());
  166. }
  167. if let Some(visibility) = changeset.visibility {
  168. row.visibility = visibility;
  169. is_changed = Some(());
  170. }
  171. if !changeset.cell_by_field_id.is_empty() {
  172. is_changed = Some(());
  173. changeset
  174. .cell_by_field_id
  175. .into_iter()
  176. .for_each(|(field_id, cell)| {
  177. row.cells.insert(field_id, cell);
  178. })
  179. }
  180. Ok(is_changed)
  181. })
  182. }
  183. pub fn move_row(
  184. &mut self,
  185. row_id: &str,
  186. from: usize,
  187. to: usize,
  188. ) -> SyncResult<Option<DatabaseBlockRevisionChangeset>> {
  189. self.modify(|row_revs| {
  190. if let Some(position) = row_revs.iter().position(|row_rev| row_rev.id == row_id) {
  191. debug_assert_eq!(from, position);
  192. let row_rev = row_revs.remove(position);
  193. if to > row_revs.len() {
  194. Err(SyncError::out_of_bound())
  195. } else {
  196. row_revs.insert(to, row_rev);
  197. Ok(Some(()))
  198. }
  199. } else {
  200. Ok(None)
  201. }
  202. })
  203. }
  204. pub fn modify<F>(&mut self, f: F) -> SyncResult<Option<DatabaseBlockRevisionChangeset>>
  205. where
  206. F: for<'a> FnOnce(&'a mut Vec<Arc<RowRevision>>) -> SyncResult<Option<()>>,
  207. {
  208. let cloned_self = self.clone();
  209. match f(&mut self.block.rows)? {
  210. None => Ok(None),
  211. Some(_) => {
  212. let old = cloned_self.revision_json()?;
  213. let new = self.revision_json()?;
  214. match cal_diff::<EmptyAttributes>(old, new) {
  215. None => Ok(None),
  216. Some(operations) => {
  217. tracing::trace!(
  218. "[{}] Composing operations {}",
  219. type_name::<DatabaseBlockRevision>(),
  220. operations.json_str()
  221. );
  222. self.operations = self.operations.compose(&operations)?;
  223. Ok(Some(DatabaseBlockRevisionChangeset {
  224. operations,
  225. md5: md5(&self.operations.json_bytes()),
  226. }))
  227. },
  228. }
  229. },
  230. }
  231. }
  232. fn modify_row<F>(
  233. &mut self,
  234. row_id: &str,
  235. f: F,
  236. ) -> SyncResult<Option<DatabaseBlockRevisionChangeset>>
  237. where
  238. F: FnOnce(&mut RowRevision) -> SyncResult<Option<()>>,
  239. {
  240. self.modify(|rows| {
  241. if let Some(row_rev) = rows.iter_mut().find(|row_rev| row_id == row_rev.id) {
  242. f(Arc::make_mut(row_rev))
  243. } else {
  244. tracing::warn!("[BlockMetaPad]: Can't find any row with id: {}", row_id);
  245. Ok(None)
  246. }
  247. })
  248. }
  249. pub fn revision_json(&self) -> SyncResult<String> {
  250. serde_json::to_string(&self.block)
  251. .map_err(|e| SyncError::internal().context(format!("serial block to json failed: {}", e)))
  252. }
  253. pub fn operations_json_str(&self) -> String {
  254. self.operations.json_str()
  255. }
  256. }
  257. pub struct DatabaseBlockRevisionChangeset {
  258. pub operations: DatabaseBlockOperations,
  259. /// md5: the md5 of the grid after applying the change.
  260. pub md5: String,
  261. }
  262. pub fn make_database_block_operations(
  263. block_rev: &DatabaseBlockRevision,
  264. ) -> DatabaseBlockOperations {
  265. let json = serde_json::to_string(&block_rev).unwrap();
  266. DatabaseBlockOperationsBuilder::new().insert(&json).build()
  267. }
  268. pub fn make_database_block_revisions(
  269. _user_id: &str,
  270. database_block_meta_data: &DatabaseBlockRevision,
  271. ) -> Vec<Revision> {
  272. let operations = make_database_block_operations(database_block_meta_data);
  273. let bytes = operations.json_bytes();
  274. let revision = Revision::initial_revision(&database_block_meta_data.block_id, bytes);
  275. vec![revision]
  276. }
  277. impl std::default::Default for DatabaseBlockRevisionPad {
  278. fn default() -> Self {
  279. let block_revision = DatabaseBlockRevision {
  280. block_id: gen_block_id(),
  281. rows: vec![],
  282. };
  283. let operations = make_database_block_operations(&block_revision);
  284. DatabaseBlockRevisionPad {
  285. block: block_revision,
  286. operations,
  287. }
  288. }
  289. }
  290. #[cfg(test)]
  291. mod tests {
  292. use crate::client_database::{DatabaseBlockOperations, DatabaseBlockRevisionPad};
  293. use database_model::{RowChangeset, RowRevision};
  294. use std::borrow::Cow;
  295. #[test]
  296. fn block_meta_add_row() {
  297. let mut pad = test_pad();
  298. let row = RowRevision {
  299. id: "1".to_string(),
  300. block_id: pad.block_id.clone(),
  301. cells: Default::default(),
  302. height: 0,
  303. visibility: false,
  304. };
  305. let change = pad.add_row_rev(row.clone(), None).unwrap().unwrap();
  306. assert_eq!(pad.rows.first().unwrap().as_ref(), &row);
  307. assert_eq!(
  308. change.operations.json_str(),
  309. r#"[{"retain":24},{"insert":"{\"id\":\"1\",\"block_id\":\"1\",\"cells\":[],\"height\":0,\"visibility\":false}"},{"retain":2}]"#
  310. );
  311. }
  312. #[test]
  313. fn block_meta_insert_row() {
  314. let mut pad = test_pad();
  315. let row_1 = test_row_rev("1", &pad);
  316. let row_2 = test_row_rev("2", &pad);
  317. let row_3 = test_row_rev("3", &pad);
  318. let change = pad.add_row_rev(row_1.clone(), None).unwrap().unwrap();
  319. assert_eq!(
  320. change.operations.json_str(),
  321. r#"[{"retain":24},{"insert":"{\"id\":\"1\",\"block_id\":\"1\",\"cells\":[],\"height\":0,\"visibility\":false}"},{"retain":2}]"#
  322. );
  323. let change = pad.add_row_rev(row_2.clone(), None).unwrap().unwrap();
  324. assert_eq!(
  325. change.operations.json_str(),
  326. r#"[{"retain":90},{"insert":",{\"id\":\"2\",\"block_id\":\"1\",\"cells\":[],\"height\":0,\"visibility\":false}"},{"retain":2}]"#
  327. );
  328. let change = pad
  329. .add_row_rev(row_3.clone(), Some("2".to_string()))
  330. .unwrap()
  331. .unwrap();
  332. assert_eq!(
  333. change.operations.json_str(),
  334. r#"[{"retain":157},{"insert":",{\"id\":\"3\",\"block_id\":\"1\",\"cells\":[],\"height\":0,\"visibility\":false}"},{"retain":2}]"#
  335. );
  336. assert_eq!(*pad.rows[0], row_1);
  337. assert_eq!(*pad.rows[1], row_2);
  338. assert_eq!(*pad.rows[2], row_3);
  339. }
  340. fn test_row_rev(id: &str, pad: &DatabaseBlockRevisionPad) -> RowRevision {
  341. RowRevision {
  342. id: id.to_string(),
  343. block_id: pad.block_id.clone(),
  344. cells: Default::default(),
  345. height: 0,
  346. visibility: false,
  347. }
  348. }
  349. #[test]
  350. fn block_meta_insert_row2() {
  351. let mut pad = test_pad();
  352. let row_1 = test_row_rev("1", &pad);
  353. let row_2 = test_row_rev("2", &pad);
  354. let row_3 = test_row_rev("3", &pad);
  355. let _ = pad.add_row_rev(row_1.clone(), None).unwrap().unwrap();
  356. let _ = pad.add_row_rev(row_2.clone(), None).unwrap().unwrap();
  357. let _ = pad
  358. .add_row_rev(row_3.clone(), Some("1".to_string()))
  359. .unwrap()
  360. .unwrap();
  361. assert_eq!(*pad.rows[0], row_1);
  362. assert_eq!(*pad.rows[1], row_3);
  363. assert_eq!(*pad.rows[2], row_2);
  364. }
  365. #[test]
  366. fn block_meta_insert_row3() {
  367. let mut pad = test_pad();
  368. let row_1 = test_row_rev("1", &pad);
  369. let row_2 = test_row_rev("2", &pad);
  370. let row_3 = test_row_rev("3", &pad);
  371. let _ = pad.add_row_rev(row_1.clone(), None).unwrap().unwrap();
  372. let _ = pad.add_row_rev(row_2.clone(), None).unwrap().unwrap();
  373. let _ = pad
  374. .add_row_rev(row_3.clone(), Some("".to_string()))
  375. .unwrap()
  376. .unwrap();
  377. assert_eq!(*pad.rows[0], row_1);
  378. assert_eq!(*pad.rows[1], row_2);
  379. assert_eq!(*pad.rows[2], row_3);
  380. }
  381. #[test]
  382. fn block_meta_delete_row() {
  383. let mut pad = test_pad();
  384. let pre_json_str = pad.operations_json_str();
  385. let row = RowRevision {
  386. id: "1".to_string(),
  387. block_id: pad.block_id.clone(),
  388. cells: Default::default(),
  389. height: 0,
  390. visibility: false,
  391. };
  392. let _ = pad.add_row_rev(row.clone(), None).unwrap().unwrap();
  393. let change = pad
  394. .delete_rows(vec![Cow::Borrowed(&row.id)])
  395. .unwrap()
  396. .unwrap();
  397. assert_eq!(
  398. change.operations.json_str(),
  399. r#"[{"retain":24},{"delete":66},{"retain":2}]"#
  400. );
  401. assert_eq!(pad.operations_json_str(), pre_json_str);
  402. }
  403. #[test]
  404. fn block_meta_update_row() {
  405. let mut pad = test_pad();
  406. let row = RowRevision {
  407. id: "1".to_string(),
  408. block_id: pad.block_id.clone(),
  409. cells: Default::default(),
  410. height: 0,
  411. visibility: false,
  412. };
  413. let changeset = RowChangeset {
  414. row_id: row.id.clone(),
  415. height: Some(100),
  416. visibility: Some(true),
  417. cell_by_field_id: Default::default(),
  418. };
  419. let _ = pad.add_row_rev(row, None).unwrap().unwrap();
  420. let change = pad.update_row(changeset).unwrap().unwrap();
  421. assert_eq!(
  422. change.operations.json_str(),
  423. r#"[{"retain":69},{"insert":"10"},{"retain":15},{"insert":"tru"},{"delete":4},{"retain":4}]"#
  424. );
  425. assert_eq!(
  426. pad.revision_json().unwrap(),
  427. r#"{"block_id":"1","rows":[{"id":"1","block_id":"1","cells":[],"height":100,"visibility":true}]}"#
  428. );
  429. }
  430. fn test_pad() -> DatabaseBlockRevisionPad {
  431. let operations =
  432. DatabaseBlockOperations::from_json(r#"[{"insert":"{\"block_id\":\"1\",\"rows\":[]}"}]"#)
  433. .unwrap();
  434. DatabaseBlockRevisionPad::from_operations(operations).unwrap()
  435. }
  436. }