database_revision_pad.rs 14 KB


  1. use crate::errors::{internal_sync_error, SyncError, SyncResult};
  2. use crate::util::cal_diff;
  3. use database_model::{
  4. gen_block_id, gen_database_id, DatabaseBlockMetaRevision, DatabaseBlockMetaRevisionChangeset,
  5. DatabaseRevision, FieldRevision, FieldTypeRevision,
  6. };
  7. use flowy_sync::util::make_operations_from_revisions;
  8. use lib_infra::util::md5;
  9. use lib_infra::util::move_vec_element;
  10. use lib_ot::core::{DeltaOperationBuilder, DeltaOperations, EmptyAttributes, OperationTransform};
  11. use revision_model::Revision;
  12. use std::collections::HashMap;
  13. use std::sync::Arc;
  14. pub type DatabaseOperations = DeltaOperations<EmptyAttributes>;
  15. pub type DatabaseOperationsBuilder = DeltaOperationBuilder<EmptyAttributes>;
  16. #[derive(Clone)]
  17. pub struct DatabaseRevisionPad {
  18. database_rev: Arc<DatabaseRevision>,
  19. operations: DatabaseOperations,
  20. }
  21. pub trait JsonDeserializer {
  22. fn deserialize(&self, type_option_data: Vec<u8>) -> SyncResult<String>;
  23. }
  24. impl DatabaseRevisionPad {
  25. pub fn database_id(&self) -> String {
  26. self.database_rev.database_id.clone()
  27. }
  28. pub async fn duplicate_database_block_meta(
  29. &self,
  30. ) -> (Vec<FieldRevision>, Vec<DatabaseBlockMetaRevision>) {
  31. let fields = self
  32. .database_rev
  33. .fields
  34. .iter()
  35. .map(|field_rev| field_rev.as_ref().clone())
  36. .collect();
  37. let blocks = self
  38. .database_rev
  39. .blocks
  40. .iter()
  41. .map(|block| {
  42. let mut duplicated_block = (**block).clone();
  43. duplicated_block.block_id = gen_block_id();
  44. duplicated_block
  45. })
  46. .collect::<Vec<DatabaseBlockMetaRevision>>();
  47. (fields, blocks)
  48. }
  49. pub fn from_operations(operations: DatabaseOperations) -> SyncResult<Self> {
  50. let content = operations.content()?;
  51. let database_rev: DatabaseRevision = serde_json::from_str(&content).map_err(|e| {
  52. let msg = format!("Deserialize operations to database failed: {}", e);
  53. SyncError::internal().context(msg)
  54. })?;
  55. Ok(Self {
  56. database_rev: Arc::new(database_rev),
  57. operations,
  58. })
  59. }
  60. pub fn from_revisions(revisions: Vec<Revision>) -> SyncResult<Self> {
  61. let operations: DatabaseOperations = make_operations_from_revisions(revisions)?;
  62. Self::from_operations(operations)
  63. }
  64. #[tracing::instrument(level = "debug", skip_all, err)]
  65. pub fn create_field_rev(
  66. &mut self,
  67. new_field_rev: FieldRevision,
  68. start_field_id: Option<String>,
  69. ) -> SyncResult<Option<DatabaseRevisionChangeset>> {
  70. self.modify_database(|grid_meta| {
  71. // Check if the field exists or not
  72. if grid_meta
  73. .fields
  74. .iter()
  75. .any(|field_rev| field_rev.id == new_field_rev.id)
  76. {
  77. tracing::error!("Duplicate grid field");
  78. return Ok(None);
  79. }
  80. let insert_index = match start_field_id {
  81. None => None,
  82. Some(start_field_id) => grid_meta
  83. .fields
  84. .iter()
  85. .position(|field| field.id == start_field_id),
  86. };
  87. let new_field_rev = Arc::new(new_field_rev);
  88. match insert_index {
  89. None => grid_meta.fields.push(new_field_rev),
  90. Some(index) => grid_meta.fields.insert(index, new_field_rev),
  91. }
  92. Ok(Some(()))
  93. })
  94. }
  95. pub fn delete_field_rev(
  96. &mut self,
  97. field_id: &str,
  98. ) -> SyncResult<Option<DatabaseRevisionChangeset>> {
  99. self.modify_database(|database| {
  100. match database
  101. .fields
  102. .iter()
  103. .position(|field| field.id == field_id)
  104. {
  105. None => Ok(None),
  106. Some(index) => {
  107. if database.fields[index].is_primary {
  108. Err(SyncError::can_not_delete_primary_field())
  109. } else {
  110. database.fields.remove(index);
  111. Ok(Some(()))
  112. }
  113. },
  114. }
  115. })
  116. }
  117. pub fn duplicate_field_rev(
  118. &mut self,
  119. field_id: &str,
  120. duplicated_field_id: &str,
  121. ) -> SyncResult<Option<DatabaseRevisionChangeset>> {
  122. self.modify_database(|grid_meta| {
  123. match grid_meta
  124. .fields
  125. .iter()
  126. .position(|field| field.id == field_id)
  127. {
  128. None => Ok(None),
  129. Some(index) => {
  130. let mut duplicate_field_rev = grid_meta.fields[index].as_ref().clone();
  131. duplicate_field_rev.id = duplicated_field_id.to_string();
  132. duplicate_field_rev.name = format!("{} (copy)", duplicate_field_rev.name);
  133. grid_meta
  134. .fields
  135. .insert(index + 1, Arc::new(duplicate_field_rev));
  136. Ok(Some(()))
  137. },
  138. }
  139. })
  140. }
  141. /// Modifies the current field type of the [FieldTypeRevision]
  142. ///
  143. /// # Arguments
  144. ///
  145. /// * `field_id`: the id of the field
  146. /// * `field_type`: the new field type of the field
  147. /// * `make_default_type_option`: create the field type's type-option data
  148. /// * `type_option_transform`: create the field type's type-option data
  149. ///
  150. ///
  151. pub fn switch_to_field<DT, TT, T>(
  152. &mut self,
  153. field_id: &str,
  154. new_field_type: T,
  155. make_default_type_option: DT,
  156. type_option_transform: TT,
  157. ) -> SyncResult<Option<DatabaseRevisionChangeset>>
  158. where
  159. DT: FnOnce() -> String,
  160. TT: FnOnce(FieldTypeRevision, Option<String>, String) -> String,
  161. T: Into<FieldTypeRevision>,
  162. {
  163. let new_field_type = new_field_type.into();
  164. self.modify_database(|database_rev| {
  165. match database_rev
  166. .fields
  167. .iter_mut()
  168. .find(|field_rev| field_rev.id == field_id)
  169. {
  170. None => {
  171. tracing::warn!("Can not find the field with id: {}", field_id);
  172. Ok(None)
  173. },
  174. Some(field_rev) => {
  175. let mut_field_rev = Arc::make_mut(field_rev);
  176. let old_field_type_rev = mut_field_rev.ty;
  177. let old_field_type_option = mut_field_rev
  178. .get_type_option_str(mut_field_rev.ty)
  179. .map(|value| value.to_owned());
  180. match mut_field_rev.get_type_option_str(new_field_type) {
  181. Some(new_field_type_option) => {
  182. let transformed_type_option = type_option_transform(
  183. old_field_type_rev,
  184. old_field_type_option,
  185. new_field_type_option.to_owned(),
  186. );
  187. mut_field_rev.insert_type_option_str(&new_field_type, transformed_type_option);
  188. },
  189. None => {
  190. // If the type-option data isn't exist before, creating the default type-option data.
  191. let new_field_type_option = make_default_type_option();
  192. let transformed_type_option = type_option_transform(
  193. old_field_type_rev,
  194. old_field_type_option,
  195. new_field_type_option,
  196. );
  197. mut_field_rev.insert_type_option_str(&new_field_type, transformed_type_option);
  198. },
  199. }
  200. mut_field_rev.ty = new_field_type;
  201. Ok(Some(()))
  202. },
  203. }
  204. })
  205. }
  206. pub fn replace_field_rev(
  207. &mut self,
  208. field_rev: Arc<FieldRevision>,
  209. ) -> SyncResult<Option<DatabaseRevisionChangeset>> {
  210. self.modify_database(|grid_meta| {
  211. match grid_meta
  212. .fields
  213. .iter()
  214. .position(|field| field.id == field_rev.id)
  215. {
  216. None => Ok(None),
  217. Some(index) => {
  218. grid_meta.fields.remove(index);
  219. grid_meta.fields.insert(index, field_rev);
  220. Ok(Some(()))
  221. },
  222. }
  223. })
  224. }
  225. pub fn move_field(
  226. &mut self,
  227. field_id: &str,
  228. from_index: usize,
  229. to_index: usize,
  230. ) -> SyncResult<Option<DatabaseRevisionChangeset>> {
  231. self.modify_database(|grid_meta| {
  232. match move_vec_element(
  233. &mut grid_meta.fields,
  234. |field| field.id == field_id,
  235. from_index,
  236. to_index,
  237. )
  238. .map_err(internal_sync_error)?
  239. {
  240. true => Ok(Some(())),
  241. false => Ok(None),
  242. }
  243. })
  244. }
  245. pub fn contain_field(&self, field_id: &str) -> bool {
  246. self
  247. .database_rev
  248. .fields
  249. .iter()
  250. .any(|field| field.id == field_id)
  251. }
  252. pub fn get_field_rev(&self, field_id: &str) -> Option<(usize, &Arc<FieldRevision>)> {
  253. self
  254. .database_rev
  255. .fields
  256. .iter()
  257. .enumerate()
  258. .find(|(_, field)| field.id == field_id)
  259. }
  260. pub fn get_field_revs(
  261. &self,
  262. field_ids: Option<Vec<String>>,
  263. ) -> SyncResult<Vec<Arc<FieldRevision>>> {
  264. match field_ids {
  265. None => Ok(self.database_rev.fields.clone()),
  266. Some(field_ids) => {
  267. let field_by_field_id = self
  268. .database_rev
  269. .fields
  270. .iter()
  271. .map(|field| (&field.id, field))
  272. .collect::<HashMap<&String, &Arc<FieldRevision>>>();
  273. let fields = field_ids
  274. .iter()
  275. .flat_map(|field_id| match field_by_field_id.get(&field_id) {
  276. None => {
  277. tracing::error!("Can't find the field with id: {}", field_id);
  278. None
  279. },
  280. Some(field) => Some((*field).clone()),
  281. })
  282. .collect::<Vec<Arc<FieldRevision>>>();
  283. Ok(fields)
  284. },
  285. }
  286. }
  287. pub fn create_block_meta_rev(
  288. &mut self,
  289. block: DatabaseBlockMetaRevision,
  290. ) -> SyncResult<Option<DatabaseRevisionChangeset>> {
  291. self.modify_database(|grid_meta| {
  292. if grid_meta.blocks.iter().any(|b| b.block_id == block.block_id) {
  293. tracing::warn!("Duplicate grid block");
  294. Ok(None)
  295. } else {
  296. match grid_meta.blocks.last() {
  297. None => grid_meta.blocks.push(Arc::new(block)),
  298. Some(last_block) => {
  299. if last_block.start_row_index > block.start_row_index
  300. && last_block.len() > block.start_row_index
  301. {
  302. let msg = "GridBlock's start_row_index should be greater than the last_block's start_row_index and its len".to_string();
  303. return Err(SyncError::internal().context(msg))
  304. }
  305. grid_meta.blocks.push(Arc::new(block));
  306. }
  307. }
  308. Ok(Some(()))
  309. }
  310. })
  311. }
  312. pub fn get_block_meta_revs(&self) -> Vec<Arc<DatabaseBlockMetaRevision>> {
  313. self.database_rev.blocks.clone()
  314. }
  315. pub fn update_block_rev(
  316. &mut self,
  317. changeset: DatabaseBlockMetaRevisionChangeset,
  318. ) -> SyncResult<Option<DatabaseRevisionChangeset>> {
  319. let block_id = changeset.block_id.clone();
  320. self.modify_block(&block_id, |block| {
  321. let mut is_changed = None;
  322. if let Some(row_count) = changeset.row_count {
  323. block.row_count = row_count;
  324. is_changed = Some(());
  325. }
  326. if let Some(start_row_index) = changeset.start_row_index {
  327. block.start_row_index = start_row_index;
  328. is_changed = Some(());
  329. }
  330. Ok(is_changed)
  331. })
  332. }
  333. pub fn database_md5(&self) -> String {
  334. md5(&self.operations.json_bytes())
  335. }
  336. pub fn operations_json_str(&self) -> String {
  337. self.operations.json_str()
  338. }
  339. pub fn get_fields(&self) -> &[Arc<FieldRevision>] {
  340. &self.database_rev.fields
  341. }
  342. fn modify_database<F>(&mut self, f: F) -> SyncResult<Option<DatabaseRevisionChangeset>>
  343. where
  344. F: FnOnce(&mut DatabaseRevision) -> SyncResult<Option<()>>,
  345. {
  346. let cloned_database = self.database_rev.clone();
  347. match f(Arc::make_mut(&mut self.database_rev))? {
  348. None => Ok(None),
  349. Some(_) => {
  350. let old = make_database_rev_json_str(&cloned_database)?;
  351. let new = self.json_str()?;
  352. match cal_diff::<EmptyAttributes>(old, new) {
  353. None => Ok(None),
  354. Some(operations) => {
  355. self.operations = self.operations.compose(&operations)?;
  356. Ok(Some(DatabaseRevisionChangeset {
  357. operations,
  358. md5: self.database_md5(),
  359. }))
  360. },
  361. }
  362. },
  363. }
  364. }
  365. fn modify_block<F>(
  366. &mut self,
  367. block_id: &str,
  368. f: F,
  369. ) -> SyncResult<Option<DatabaseRevisionChangeset>>
  370. where
  371. F: FnOnce(&mut DatabaseBlockMetaRevision) -> SyncResult<Option<()>>,
  372. {
  373. self.modify_database(|grid_rev| {
  374. match grid_rev
  375. .blocks
  376. .iter()
  377. .position(|block| block.block_id == block_id)
  378. {
  379. None => {
  380. tracing::warn!("[GridMetaPad]: Can't find any block with id: {}", block_id);
  381. Ok(None)
  382. },
  383. Some(index) => {
  384. let block_rev = Arc::make_mut(&mut grid_rev.blocks[index]);
  385. f(block_rev)
  386. },
  387. }
  388. })
  389. }
  390. pub fn modify_field<F>(
  391. &mut self,
  392. field_id: &str,
  393. f: F,
  394. ) -> SyncResult<Option<DatabaseRevisionChangeset>>
  395. where
  396. F: FnOnce(&mut FieldRevision) -> SyncResult<Option<()>>,
  397. {
  398. self.modify_database(|grid_rev| {
  399. match grid_rev
  400. .fields
  401. .iter()
  402. .position(|field| field.id == field_id)
  403. {
  404. None => {
  405. tracing::warn!("[GridMetaPad]: Can't find any field with id: {}", field_id);
  406. Ok(None)
  407. },
  408. Some(index) => {
  409. let mut_field_rev = Arc::make_mut(&mut grid_rev.fields[index]);
  410. f(mut_field_rev)
  411. },
  412. }
  413. })
  414. }
  415. pub fn json_str(&self) -> SyncResult<String> {
  416. make_database_rev_json_str(&self.database_rev)
  417. }
  418. }
  419. pub fn make_database_rev_json_str(grid_revision: &DatabaseRevision) -> SyncResult<String> {
  420. let json = serde_json::to_string(grid_revision)
  421. .map_err(|err| internal_sync_error(format!("Serialize grid to json str failed. {:?}", err)))?;
  422. Ok(json)
  423. }
  424. pub struct DatabaseRevisionChangeset {
  425. pub operations: DatabaseOperations,
  426. /// md5: the md5 of the grid after applying the change.
  427. pub md5: String,
  428. }
  429. pub fn make_database_operations(grid_rev: &DatabaseRevision) -> DatabaseOperations {
  430. let json = serde_json::to_string(&grid_rev).unwrap();
  431. DatabaseOperationsBuilder::new().insert(&json).build()
  432. }
  433. pub fn make_database_revisions(_user_id: &str, grid_rev: &DatabaseRevision) -> Vec<Revision> {
  434. let operations = make_database_operations(grid_rev);
  435. let bytes = operations.json_bytes();
  436. let revision = Revision::initial_revision(&grid_rev.database_id, bytes);
  437. vec![revision]
  438. }
  439. impl std::default::Default for DatabaseRevisionPad {
  440. fn default() -> Self {
  441. let database = DatabaseRevision::new(&gen_database_id());
  442. let operations = make_database_operations(&database);
  443. DatabaseRevisionPad {
  444. database_rev: Arc::new(database),
  445. operations,
  446. }
  447. }
  448. }