block_impl.rs 7.3 KB


  1. use bytes::Bytes;
  2. use diesel::{sql_types::Integer, update, SqliteConnection};
  3. use flowy_error::{internal_error, FlowyError, FlowyResult};
  4. use flowy_revision_persistence::{RevisionChangeset, RevisionDiskCache, RevisionState, SyncRecord};
  5. use flowy_sqlite::{
  6. impl_sql_integer_expression, insert_or_ignore_into,
  7. prelude::*,
  8. schema::{grid_meta_rev_table, grid_meta_rev_table::dsl},
  9. ConnectionPool,
  10. };
  11. use lib_infra::util::md5;
  12. use revision_model::{Revision, RevisionRange};
  13. use std::sync::Arc;
  14. pub struct SQLiteDatabaseBlockRevisionPersistence {
  15. user_id: String,
  16. pub(crate) pool: Arc<ConnectionPool>,
  17. }
  18. impl RevisionDiskCache<Arc<ConnectionPool>> for SQLiteDatabaseBlockRevisionPersistence {
  19. type Error = FlowyError;
  20. fn create_revision_records(&self, revision_records: Vec<SyncRecord>) -> Result<(), Self::Error> {
  21. let conn = self.pool.get().map_err(internal_error)?;
  22. DatabaseBlockMetaRevisionSql::create(revision_records, &conn)?;
  23. Ok(())
  24. }
  25. fn get_connection(&self) -> Result<Arc<ConnectionPool>, Self::Error> {
  26. Ok(self.pool.clone())
  27. }
  28. fn read_revision_records(
  29. &self,
  30. object_id: &str,
  31. rev_ids: Option<Vec<i64>>,
  32. ) -> Result<Vec<SyncRecord>, Self::Error> {
  33. let conn = self.pool.get().map_err(internal_error)?;
  34. let records = DatabaseBlockMetaRevisionSql::read(&self.user_id, object_id, rev_ids, &conn)?;
  35. Ok(records)
  36. }
  37. fn read_revision_records_with_range(
  38. &self,
  39. object_id: &str,
  40. range: &RevisionRange,
  41. ) -> Result<Vec<SyncRecord>, Self::Error> {
  42. let conn = &*self.pool.get().map_err(internal_error)?;
  43. let revisions =
  44. DatabaseBlockMetaRevisionSql::read_with_range(&self.user_id, object_id, range.clone(), conn)?;
  45. Ok(revisions)
  46. }
  47. fn update_revision_record(&self, changesets: Vec<RevisionChangeset>) -> FlowyResult<()> {
  48. let conn = &*self.pool.get().map_err(internal_error)?;
  49. conn.immediate_transaction::<_, FlowyError, _>(|| {
  50. for changeset in changesets {
  51. DatabaseBlockMetaRevisionSql::update(changeset, conn)?;
  52. }
  53. Ok(())
  54. })?;
  55. Ok(())
  56. }
  57. fn delete_revision_records(
  58. &self,
  59. object_id: &str,
  60. rev_ids: Option<Vec<i64>>,
  61. ) -> Result<(), Self::Error> {
  62. let conn = &*self.pool.get().map_err(internal_error)?;
  63. DatabaseBlockMetaRevisionSql::delete(object_id, rev_ids, conn)?;
  64. Ok(())
  65. }
  66. fn delete_and_insert_records(
  67. &self,
  68. object_id: &str,
  69. deleted_rev_ids: Option<Vec<i64>>,
  70. inserted_records: Vec<SyncRecord>,
  71. ) -> Result<(), Self::Error> {
  72. let conn = self.pool.get().map_err(internal_error)?;
  73. conn.immediate_transaction::<_, FlowyError, _>(|| {
  74. DatabaseBlockMetaRevisionSql::delete(object_id, deleted_rev_ids, &conn)?;
  75. DatabaseBlockMetaRevisionSql::create(inserted_records, &conn)?;
  76. Ok(())
  77. })
  78. }
  79. }
  80. impl SQLiteDatabaseBlockRevisionPersistence {
  81. pub fn new(user_id: &str, pool: Arc<ConnectionPool>) -> Self {
  82. Self {
  83. user_id: user_id.to_owned(),
  84. pool,
  85. }
  86. }
  87. }
  88. struct DatabaseBlockMetaRevisionSql();
  89. impl DatabaseBlockMetaRevisionSql {
  90. fn create(revision_records: Vec<SyncRecord>, conn: &SqliteConnection) -> Result<(), FlowyError> {
  91. // Batch insert: https://diesel.rs/guides/all-about-inserts.html
  92. let records = revision_records
  93. .into_iter()
  94. .map(|record| {
  95. tracing::trace!(
  96. "[{}] create revision: {}:{:?}",
  97. std::any::type_name::<Self>(),
  98. record.revision.object_id,
  99. record.revision.rev_id
  100. );
  101. let rev_state: GridBlockRevisionState = record.state.into();
  102. (
  103. dsl::object_id.eq(record.revision.object_id),
  104. dsl::base_rev_id.eq(record.revision.base_rev_id),
  105. dsl::rev_id.eq(record.revision.rev_id),
  106. dsl::data.eq(record.revision.bytes),
  107. dsl::state.eq(rev_state),
  108. )
  109. })
  110. .collect::<Vec<_>>();
  111. let _ = insert_or_ignore_into(dsl::grid_meta_rev_table)
  112. .values(&records)
  113. .execute(conn)?;
  114. Ok(())
  115. }
  116. fn update(changeset: RevisionChangeset, conn: &SqliteConnection) -> Result<(), FlowyError> {
  117. let state: GridBlockRevisionState = changeset.state.clone().into();
  118. let filter = dsl::grid_meta_rev_table
  119. .filter(dsl::rev_id.eq(changeset.rev_id))
  120. .filter(dsl::object_id.eq(changeset.object_id));
  121. let _ = update(filter).set(dsl::state.eq(state)).execute(conn)?;
  122. tracing::debug!(
  123. "[{}] update revision:{} state:to {:?}",
  124. std::any::type_name::<Self>(),
  125. changeset.rev_id,
  126. changeset.state
  127. );
  128. Ok(())
  129. }
  130. fn read(
  131. user_id: &str,
  132. object_id: &str,
  133. rev_ids: Option<Vec<i64>>,
  134. conn: &SqliteConnection,
  135. ) -> Result<Vec<SyncRecord>, FlowyError> {
  136. let mut sql = dsl::grid_meta_rev_table
  137. .filter(dsl::object_id.eq(object_id))
  138. .into_boxed();
  139. if let Some(rev_ids) = rev_ids {
  140. sql = sql.filter(dsl::rev_id.eq_any(rev_ids));
  141. }
  142. let rows = sql
  143. .order(dsl::rev_id.asc())
  144. .load::<GridBlockRevisionTable>(conn)?;
  145. let records = rows
  146. .into_iter()
  147. .map(|row| mk_revision_record_from_table(user_id, row))
  148. .collect::<Vec<_>>();
  149. Ok(records)
  150. }
  151. fn read_with_range(
  152. user_id: &str,
  153. object_id: &str,
  154. range: RevisionRange,
  155. conn: &SqliteConnection,
  156. ) -> Result<Vec<SyncRecord>, FlowyError> {
  157. let rev_tables = dsl::grid_meta_rev_table
  158. .filter(dsl::rev_id.ge(range.start))
  159. .filter(dsl::rev_id.le(range.end))
  160. .filter(dsl::object_id.eq(object_id))
  161. .order(dsl::rev_id.asc())
  162. .load::<GridBlockRevisionTable>(conn)?;
  163. let revisions = rev_tables
  164. .into_iter()
  165. .map(|table| mk_revision_record_from_table(user_id, table))
  166. .collect::<Vec<_>>();
  167. Ok(revisions)
  168. }
  169. fn delete(
  170. object_id: &str,
  171. rev_ids: Option<Vec<i64>>,
  172. conn: &SqliteConnection,
  173. ) -> Result<(), FlowyError> {
  174. let mut sql = diesel::delete(dsl::grid_meta_rev_table).into_boxed();
  175. sql = sql.filter(dsl::object_id.eq(object_id));
  176. if let Some(rev_ids) = rev_ids {
  177. tracing::trace!(
  178. "[{}] Delete revision: {}:{:?}",
  179. std::any::type_name::<Self>(),
  180. object_id,
  181. rev_ids
  182. );
  183. sql = sql.filter(dsl::rev_id.eq_any(rev_ids));
  184. }
  185. let affected_row = sql.execute(conn)?;
  186. tracing::trace!(
  187. "[{}] Delete {} rows",
  188. std::any::type_name::<Self>(),
  189. affected_row
  190. );
  191. Ok(())
  192. }
  193. }
  194. #[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)]
  195. #[table_name = "grid_meta_rev_table"]
  196. struct GridBlockRevisionTable {
  197. id: i32,
  198. object_id: String,
  199. base_rev_id: i64,
  200. rev_id: i64,
  201. data: Vec<u8>,
  202. state: GridBlockRevisionState,
  203. }
  204. #[derive(Clone, Copy, PartialEq, Eq, Debug, Hash, FromSqlRow, AsExpression)]
  205. #[repr(i32)]
  206. #[sql_type = "Integer"]
  207. pub enum GridBlockRevisionState {
  208. Sync = 0,
  209. Ack = 1,
  210. }
  211. impl_sql_integer_expression!(GridBlockRevisionState);
  212. impl_rev_state_map!(GridBlockRevisionState);
  213. impl std::default::Default for GridBlockRevisionState {
  214. fn default() -> Self {
  215. GridBlockRevisionState::Sync
  216. }
  217. }
  218. fn mk_revision_record_from_table(_user_id: &str, table: GridBlockRevisionTable) -> SyncRecord {
  219. let md5 = md5(&table.data);
  220. let revision = Revision::new(
  221. &table.object_id,
  222. table.base_rev_id,
  223. table.rev_id,
  224. Bytes::from(table.data),
  225. md5,
  226. );
  227. SyncRecord {
  228. revision,
  229. state: table.state.into(),
  230. write_to_disk: false,
  231. }
  232. }