grid_impl.rs 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. use bytes::Bytes;
  2. use diesel::{sql_types::Integer, update, SqliteConnection};
  3. use flowy_database::{
  4. impl_sql_integer_expression, insert_or_ignore_into,
  5. prelude::*,
  6. schema::{grid_rev_table, grid_rev_table::dsl},
  7. ConnectionPool,
  8. };
  9. use flowy_error::{internal_error, FlowyError, FlowyResult};
  10. use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, RevisionState, SyncRecord};
  11. use flowy_sync::{
  12. entities::revision::{Revision, RevisionRange},
  13. util::md5,
  14. };
  15. use std::sync::Arc;
  16. pub struct SQLiteGridRevisionPersistence {
  17. user_id: String,
  18. pub(crate) pool: Arc<ConnectionPool>,
  19. }
  20. impl RevisionDiskCache<Arc<ConnectionPool>> for SQLiteGridRevisionPersistence {
  21. type Error = FlowyError;
  22. fn create_revision_records(&self, revision_records: Vec<SyncRecord>) -> Result<(), Self::Error> {
  23. let conn = self.pool.get().map_err(internal_error)?;
  24. let _ = GridRevisionSql::create(revision_records, &*conn)?;
  25. Ok(())
  26. }
  27. fn get_connection(&self) -> Result<Arc<ConnectionPool>, Self::Error> {
  28. Ok(self.pool.clone())
  29. }
  30. fn read_revision_records(
  31. &self,
  32. object_id: &str,
  33. rev_ids: Option<Vec<i64>>,
  34. ) -> Result<Vec<SyncRecord>, Self::Error> {
  35. let conn = self.pool.get().map_err(internal_error)?;
  36. let records = GridRevisionSql::read(&self.user_id, object_id, rev_ids, &*conn)?;
  37. Ok(records)
  38. }
  39. fn read_revision_records_with_range(
  40. &self,
  41. object_id: &str,
  42. range: &RevisionRange,
  43. ) -> Result<Vec<SyncRecord>, Self::Error> {
  44. let conn = &*self.pool.get().map_err(internal_error)?;
  45. let revisions = GridRevisionSql::read_with_range(&self.user_id, object_id, range.clone(), conn)?;
  46. Ok(revisions)
  47. }
  48. fn update_revision_record(&self, changesets: Vec<RevisionChangeset>) -> FlowyResult<()> {
  49. let conn = &*self.pool.get().map_err(internal_error)?;
  50. let _ = conn.immediate_transaction::<_, FlowyError, _>(|| {
  51. for changeset in changesets {
  52. let _ = GridRevisionSql::update(changeset, conn)?;
  53. }
  54. Ok(())
  55. })?;
  56. Ok(())
  57. }
  58. fn delete_revision_records(&self, object_id: &str, rev_ids: Option<Vec<i64>>) -> Result<(), Self::Error> {
  59. let conn = &*self.pool.get().map_err(internal_error)?;
  60. let _ = GridRevisionSql::delete(object_id, rev_ids, conn)?;
  61. Ok(())
  62. }
  63. fn delete_and_insert_records(
  64. &self,
  65. object_id: &str,
  66. deleted_rev_ids: Option<Vec<i64>>,
  67. inserted_records: Vec<SyncRecord>,
  68. ) -> Result<(), Self::Error> {
  69. let conn = self.pool.get().map_err(internal_error)?;
  70. conn.immediate_transaction::<_, FlowyError, _>(|| {
  71. let _ = GridRevisionSql::delete(object_id, deleted_rev_ids, &*conn)?;
  72. let _ = GridRevisionSql::create(inserted_records, &*conn)?;
  73. Ok(())
  74. })
  75. }
  76. }
  77. impl SQLiteGridRevisionPersistence {
  78. pub fn new(user_id: &str, pool: Arc<ConnectionPool>) -> Self {
  79. Self {
  80. user_id: user_id.to_owned(),
  81. pool,
  82. }
  83. }
  84. }
  85. struct GridRevisionSql();
  86. impl GridRevisionSql {
  87. fn create(revision_records: Vec<SyncRecord>, conn: &SqliteConnection) -> Result<(), FlowyError> {
  88. // Batch insert: https://diesel.rs/guides/all-about-inserts.html
  89. let records = revision_records
  90. .into_iter()
  91. .map(|record| {
  92. tracing::trace!(
  93. "[GridRevisionSql] create revision: {}:{:?}",
  94. record.revision.object_id,
  95. record.revision.rev_id
  96. );
  97. let rev_state: GridRevisionState = record.state.into();
  98. (
  99. dsl::object_id.eq(record.revision.object_id),
  100. dsl::base_rev_id.eq(record.revision.base_rev_id),
  101. dsl::rev_id.eq(record.revision.rev_id),
  102. dsl::data.eq(record.revision.bytes),
  103. dsl::state.eq(rev_state),
  104. )
  105. })
  106. .collect::<Vec<_>>();
  107. let _ = insert_or_ignore_into(dsl::grid_rev_table)
  108. .values(&records)
  109. .execute(conn)?;
  110. Ok(())
  111. }
  112. fn update(changeset: RevisionChangeset, conn: &SqliteConnection) -> Result<(), FlowyError> {
  113. let state: GridRevisionState = changeset.state.clone().into();
  114. let filter = dsl::grid_rev_table
  115. .filter(dsl::rev_id.eq(changeset.rev_id.as_ref()))
  116. .filter(dsl::object_id.eq(changeset.object_id));
  117. let _ = update(filter).set(dsl::state.eq(state)).execute(conn)?;
  118. tracing::debug!(
  119. "[GridRevisionSql] update revision:{} state:to {:?}",
  120. changeset.rev_id,
  121. changeset.state
  122. );
  123. Ok(())
  124. }
  125. fn read(
  126. user_id: &str,
  127. object_id: &str,
  128. rev_ids: Option<Vec<i64>>,
  129. conn: &SqliteConnection,
  130. ) -> Result<Vec<SyncRecord>, FlowyError> {
  131. let mut sql = dsl::grid_rev_table.filter(dsl::object_id.eq(object_id)).into_boxed();
  132. if let Some(rev_ids) = rev_ids {
  133. sql = sql.filter(dsl::rev_id.eq_any(rev_ids));
  134. }
  135. let rows = sql.order(dsl::rev_id.asc()).load::<GridRevisionTable>(conn)?;
  136. let records = rows
  137. .into_iter()
  138. .map(|row| mk_revision_record_from_table(user_id, row))
  139. .collect::<Vec<_>>();
  140. Ok(records)
  141. }
  142. fn read_with_range(
  143. user_id: &str,
  144. object_id: &str,
  145. range: RevisionRange,
  146. conn: &SqliteConnection,
  147. ) -> Result<Vec<SyncRecord>, FlowyError> {
  148. let rev_tables = dsl::grid_rev_table
  149. .filter(dsl::rev_id.ge(range.start))
  150. .filter(dsl::rev_id.le(range.end))
  151. .filter(dsl::object_id.eq(object_id))
  152. .order(dsl::rev_id.asc())
  153. .load::<GridRevisionTable>(conn)?;
  154. let revisions = rev_tables
  155. .into_iter()
  156. .map(|table| mk_revision_record_from_table(user_id, table))
  157. .collect::<Vec<_>>();
  158. Ok(revisions)
  159. }
  160. fn delete(object_id: &str, rev_ids: Option<Vec<i64>>, conn: &SqliteConnection) -> Result<(), FlowyError> {
  161. let mut sql = diesel::delete(dsl::grid_rev_table).into_boxed();
  162. sql = sql.filter(dsl::object_id.eq(object_id));
  163. if let Some(rev_ids) = rev_ids {
  164. tracing::trace!("[GridRevisionSql] Delete revision: {}:{:?}", object_id, rev_ids);
  165. sql = sql.filter(dsl::rev_id.eq_any(rev_ids));
  166. }
  167. let affected_row = sql.execute(conn)?;
  168. tracing::trace!("[GridRevisionSql] Delete {} rows", affected_row);
  169. Ok(())
  170. }
  171. }
  172. #[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)]
  173. #[table_name = "grid_rev_table"]
  174. struct GridRevisionTable {
  175. id: i32,
  176. object_id: String,
  177. base_rev_id: i64,
  178. rev_id: i64,
  179. data: Vec<u8>,
  180. state: GridRevisionState,
  181. }
  182. #[derive(Clone, Copy, PartialEq, Eq, Debug, Hash, FromSqlRow, AsExpression)]
  183. #[repr(i32)]
  184. #[sql_type = "Integer"]
  185. pub enum GridRevisionState {
  186. Sync = 0,
  187. Ack = 1,
  188. }
  189. impl_sql_integer_expression!(GridRevisionState);
  190. impl_rev_state_map!(GridRevisionState);
  191. impl std::default::Default for GridRevisionState {
  192. fn default() -> Self {
  193. GridRevisionState::Sync
  194. }
  195. }
  196. fn mk_revision_record_from_table(_user_id: &str, table: GridRevisionTable) -> SyncRecord {
  197. let md5 = md5(&table.data);
  198. let revision = Revision::new(
  199. &table.object_id,
  200. table.base_rev_id,
  201. table.rev_id,
  202. Bytes::from(table.data),
  203. md5,
  204. );
  205. SyncRecord {
  206. revision,
  207. state: table.state.into(),
  208. write_to_disk: false,
  209. }
  210. }