grid_impl.rs 7.6 KB


  1. use crate::cache::disk::RevisionDiskCache;
  2. use crate::disk::{RevisionChangeset, RevisionRecord, RevisionState};
  3. use bytes::Bytes;
  4. use diesel::{sql_types::Integer, update, SqliteConnection};
  5. use flowy_database::{
  6. impl_sql_integer_expression, insert_or_ignore_into,
  7. prelude::*,
  8. schema::{grid_rev_table, grid_rev_table::dsl},
  9. ConnectionPool,
  10. };
  11. use flowy_error::{internal_error, FlowyError, FlowyResult};
  12. use flowy_sync::{
  13. entities::revision::{Revision, RevisionRange},
  14. util::md5,
  15. };
  16. use std::sync::Arc;
  17. pub struct SQLiteGridRevisionPersistence {
  18. user_id: String,
  19. pub(crate) pool: Arc<ConnectionPool>,
  20. }
  21. impl RevisionDiskCache for SQLiteGridRevisionPersistence {
  22. type Error = FlowyError;
  23. fn create_revision_records(&self, revision_records: Vec<RevisionRecord>) -> Result<(), Self::Error> {
  24. let conn = self.pool.get().map_err(internal_error)?;
  25. let _ = GridRevisionSql::create(revision_records, &*conn)?;
  26. Ok(())
  27. }
  28. fn read_revision_records(
  29. &self,
  30. object_id: &str,
  31. rev_ids: Option<Vec<i64>>,
  32. ) -> Result<Vec<RevisionRecord>, Self::Error> {
  33. let conn = self.pool.get().map_err(internal_error)?;
  34. let records = GridRevisionSql::read(&self.user_id, object_id, rev_ids, &*conn)?;
  35. Ok(records)
  36. }
  37. fn read_revision_records_with_range(
  38. &self,
  39. object_id: &str,
  40. range: &RevisionRange,
  41. ) -> Result<Vec<RevisionRecord>, Self::Error> {
  42. let conn = &*self.pool.get().map_err(internal_error)?;
  43. let revisions = GridRevisionSql::read_with_range(&self.user_id, object_id, range.clone(), conn)?;
  44. Ok(revisions)
  45. }
  46. fn update_revision_record(&self, changesets: Vec<RevisionChangeset>) -> FlowyResult<()> {
  47. let conn = &*self.pool.get().map_err(internal_error)?;
  48. let _ = conn.immediate_transaction::<_, FlowyError, _>(|| {
  49. for changeset in changesets {
  50. let _ = GridRevisionSql::update(changeset, conn)?;
  51. }
  52. Ok(())
  53. })?;
  54. Ok(())
  55. }
  56. fn delete_revision_records(&self, object_id: &str, rev_ids: Option<Vec<i64>>) -> Result<(), Self::Error> {
  57. let conn = &*self.pool.get().map_err(internal_error)?;
  58. let _ = GridRevisionSql::delete(object_id, rev_ids, conn)?;
  59. Ok(())
  60. }
  61. fn delete_and_insert_records(
  62. &self,
  63. object_id: &str,
  64. deleted_rev_ids: Option<Vec<i64>>,
  65. inserted_records: Vec<RevisionRecord>,
  66. ) -> Result<(), Self::Error> {
  67. let conn = self.pool.get().map_err(internal_error)?;
  68. conn.immediate_transaction::<_, FlowyError, _>(|| {
  69. let _ = GridRevisionSql::delete(object_id, deleted_rev_ids, &*conn)?;
  70. let _ = GridRevisionSql::create(inserted_records, &*conn)?;
  71. Ok(())
  72. })
  73. }
  74. }
  75. impl SQLiteGridRevisionPersistence {
  76. pub fn new(user_id: &str, pool: Arc<ConnectionPool>) -> Self {
  77. Self {
  78. user_id: user_id.to_owned(),
  79. pool,
  80. }
  81. }
  82. }
  83. struct GridRevisionSql();
  84. impl GridRevisionSql {
  85. fn create(revision_records: Vec<RevisionRecord>, conn: &SqliteConnection) -> Result<(), FlowyError> {
  86. // Batch insert: https://diesel.rs/guides/all-about-inserts.html
  87. let records = revision_records
  88. .into_iter()
  89. .map(|record| {
  90. tracing::trace!(
  91. "[GridRevisionSql] create revision: {}:{:?}",
  92. record.revision.object_id,
  93. record.revision.rev_id
  94. );
  95. let rev_state: GridRevisionState = record.state.into();
  96. (
  97. dsl::object_id.eq(record.revision.object_id),
  98. dsl::base_rev_id.eq(record.revision.base_rev_id),
  99. dsl::rev_id.eq(record.revision.rev_id),
  100. dsl::data.eq(record.revision.delta_data),
  101. dsl::state.eq(rev_state),
  102. )
  103. })
  104. .collect::<Vec<_>>();
  105. let _ = insert_or_ignore_into(dsl::grid_rev_table)
  106. .values(&records)
  107. .execute(conn)?;
  108. Ok(())
  109. }
  110. fn update(changeset: RevisionChangeset, conn: &SqliteConnection) -> Result<(), FlowyError> {
  111. let state: GridRevisionState = changeset.state.clone().into();
  112. let filter = dsl::grid_rev_table
  113. .filter(dsl::rev_id.eq(changeset.rev_id.as_ref()))
  114. .filter(dsl::object_id.eq(changeset.object_id));
  115. let _ = update(filter).set(dsl::state.eq(state)).execute(conn)?;
  116. tracing::debug!(
  117. "[GridRevisionSql] update revision:{} state:to {:?}",
  118. changeset.rev_id,
  119. changeset.state
  120. );
  121. Ok(())
  122. }
  123. fn read(
  124. user_id: &str,
  125. object_id: &str,
  126. rev_ids: Option<Vec<i64>>,
  127. conn: &SqliteConnection,
  128. ) -> Result<Vec<RevisionRecord>, FlowyError> {
  129. let mut sql = dsl::grid_rev_table.filter(dsl::object_id.eq(object_id)).into_boxed();
  130. if let Some(rev_ids) = rev_ids {
  131. sql = sql.filter(dsl::rev_id.eq_any(rev_ids));
  132. }
  133. let rows = sql.order(dsl::rev_id.asc()).load::<GridRevisionTable>(conn)?;
  134. let records = rows
  135. .into_iter()
  136. .map(|row| mk_revision_record_from_table(user_id, row))
  137. .collect::<Vec<_>>();
  138. Ok(records)
  139. }
  140. fn read_with_range(
  141. user_id: &str,
  142. object_id: &str,
  143. range: RevisionRange,
  144. conn: &SqliteConnection,
  145. ) -> Result<Vec<RevisionRecord>, FlowyError> {
  146. let rev_tables = dsl::grid_rev_table
  147. .filter(dsl::rev_id.ge(range.start))
  148. .filter(dsl::rev_id.le(range.end))
  149. .filter(dsl::object_id.eq(object_id))
  150. .order(dsl::rev_id.asc())
  151. .load::<GridRevisionTable>(conn)?;
  152. let revisions = rev_tables
  153. .into_iter()
  154. .map(|table| mk_revision_record_from_table(user_id, table))
  155. .collect::<Vec<_>>();
  156. Ok(revisions)
  157. }
  158. fn delete(object_id: &str, rev_ids: Option<Vec<i64>>, conn: &SqliteConnection) -> Result<(), FlowyError> {
  159. let mut sql = diesel::delete(dsl::grid_rev_table).into_boxed();
  160. sql = sql.filter(dsl::object_id.eq(object_id));
  161. if let Some(rev_ids) = rev_ids {
  162. tracing::trace!("[GridRevisionSql] Delete revision: {}:{:?}", object_id, rev_ids);
  163. sql = sql.filter(dsl::rev_id.eq_any(rev_ids));
  164. }
  165. let affected_row = sql.execute(conn)?;
  166. tracing::trace!("[GridRevisionSql] Delete {} rows", affected_row);
  167. Ok(())
  168. }
  169. }
  170. #[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)]
  171. #[table_name = "grid_rev_table"]
  172. struct GridRevisionTable {
  173. id: i32,
  174. object_id: String,
  175. base_rev_id: i64,
  176. rev_id: i64,
  177. data: Vec<u8>,
  178. state: GridRevisionState,
  179. }
  180. #[derive(Clone, Copy, PartialEq, Eq, Debug, Hash, FromSqlRow, AsExpression)]
  181. #[repr(i32)]
  182. #[sql_type = "Integer"]
  183. pub enum GridRevisionState {
  184. Sync = 0,
  185. Ack = 1,
  186. }
  187. impl_sql_integer_expression!(GridRevisionState);
  188. impl_rev_state_map!(GridRevisionState);
  189. impl std::default::Default for GridRevisionState {
  190. fn default() -> Self {
  191. GridRevisionState::Sync
  192. }
  193. }
  194. fn mk_revision_record_from_table(user_id: &str, table: GridRevisionTable) -> RevisionRecord {
  195. let md5 = md5(&table.data);
  196. let revision = Revision::new(
  197. &table.object_id,
  198. table.base_rev_id,
  199. table.rev_id,
  200. Bytes::from(table.data),
  201. user_id,
  202. md5,
  203. );
  204. RevisionRecord {
  205. revision,
  206. state: table.state.into(),
  207. write_to_disk: false,
  208. }
  209. }