document_impl.rs 8.7 KB


  1. use crate::cache::disk::RevisionDiskCache;
  2. use crate::disk::{RevisionChangeset, RevisionRecord};
  3. use bytes::Bytes;
  4. use diesel::{sql_types::Integer, update, SqliteConnection};
  5. use flowy_database::{
  6. impl_sql_integer_expression, insert_or_ignore_into,
  7. prelude::*,
  8. schema::{rev_table, rev_table::dsl},
  9. ConnectionPool,
  10. };
  11. use flowy_error::{internal_error, FlowyError, FlowyResult};
  12. use flowy_sync::{
  13. entities::revision::{RevType, Revision, RevisionRange},
  14. util::md5,
  15. };
  16. use std::sync::Arc;
  17. pub struct SQLiteTextBlockRevisionPersistence {
  18. user_id: String,
  19. pub(crate) pool: Arc<ConnectionPool>,
  20. }
  21. impl RevisionDiskCache for SQLiteTextBlockRevisionPersistence {
  22. type Error = FlowyError;
  23. fn create_revision_records(&self, revision_records: Vec<RevisionRecord>) -> Result<(), Self::Error> {
  24. let conn = self.pool.get().map_err(internal_error)?;
  25. let _ = TextRevisionSql::create(revision_records, &*conn)?;
  26. Ok(())
  27. }
  28. fn read_revision_records(
  29. &self,
  30. object_id: &str,
  31. rev_ids: Option<Vec<i64>>,
  32. ) -> Result<Vec<RevisionRecord>, Self::Error> {
  33. let conn = self.pool.get().map_err(internal_error)?;
  34. let records = TextRevisionSql::read(&self.user_id, object_id, rev_ids, &*conn)?;
  35. Ok(records)
  36. }
  37. fn read_revision_records_with_range(
  38. &self,
  39. object_id: &str,
  40. range: &RevisionRange,
  41. ) -> Result<Vec<RevisionRecord>, Self::Error> {
  42. let conn = &*self.pool.get().map_err(internal_error)?;
  43. let revisions = TextRevisionSql::read_with_range(&self.user_id, object_id, range.clone(), conn)?;
  44. Ok(revisions)
  45. }
  46. fn update_revision_record(&self, changesets: Vec<RevisionChangeset>) -> FlowyResult<()> {
  47. let conn = &*self.pool.get().map_err(internal_error)?;
  48. let _ = conn.immediate_transaction::<_, FlowyError, _>(|| {
  49. for changeset in changesets {
  50. let _ = TextRevisionSql::update(changeset, conn)?;
  51. }
  52. Ok(())
  53. })?;
  54. Ok(())
  55. }
  56. fn delete_revision_records(&self, object_id: &str, rev_ids: Option<Vec<i64>>) -> Result<(), Self::Error> {
  57. let conn = &*self.pool.get().map_err(internal_error)?;
  58. let _ = TextRevisionSql::delete(object_id, rev_ids, conn)?;
  59. Ok(())
  60. }
  61. fn delete_and_insert_records(
  62. &self,
  63. object_id: &str,
  64. deleted_rev_ids: Option<Vec<i64>>,
  65. inserted_records: Vec<RevisionRecord>,
  66. ) -> Result<(), Self::Error> {
  67. let conn = self.pool.get().map_err(internal_error)?;
  68. conn.immediate_transaction::<_, FlowyError, _>(|| {
  69. let _ = TextRevisionSql::delete(object_id, deleted_rev_ids, &*conn)?;
  70. let _ = TextRevisionSql::create(inserted_records, &*conn)?;
  71. Ok(())
  72. })
  73. }
  74. }
  75. impl SQLiteTextBlockRevisionPersistence {
  76. pub fn new(user_id: &str, pool: Arc<ConnectionPool>) -> Self {
  77. Self {
  78. user_id: user_id.to_owned(),
  79. pool,
  80. }
  81. }
  82. }
  83. struct TextRevisionSql {}
  84. impl TextRevisionSql {
  85. fn create(revision_records: Vec<RevisionRecord>, conn: &SqliteConnection) -> Result<(), FlowyError> {
  86. // Batch insert: https://diesel.rs/guides/all-about-inserts.html
  87. let records = revision_records
  88. .into_iter()
  89. .map(|record| {
  90. tracing::trace!(
  91. "[TextRevisionSql] create revision: {}:{:?}",
  92. record.revision.object_id,
  93. record.revision.rev_id
  94. );
  95. let rev_state: TextRevisionState = record.state.into();
  96. (
  97. dsl::doc_id.eq(record.revision.object_id),
  98. dsl::base_rev_id.eq(record.revision.base_rev_id),
  99. dsl::rev_id.eq(record.revision.rev_id),
  100. dsl::data.eq(record.revision.delta_data),
  101. dsl::state.eq(rev_state),
  102. dsl::ty.eq(RevTableType::Local),
  103. )
  104. })
  105. .collect::<Vec<_>>();
  106. let _ = insert_or_ignore_into(dsl::rev_table).values(&records).execute(conn)?;
  107. Ok(())
  108. }
  109. fn update(changeset: RevisionChangeset, conn: &SqliteConnection) -> Result<(), FlowyError> {
  110. let state: TextRevisionState = changeset.state.clone().into();
  111. let filter = dsl::rev_table
  112. .filter(dsl::rev_id.eq(changeset.rev_id.as_ref()))
  113. .filter(dsl::doc_id.eq(changeset.object_id));
  114. let _ = update(filter).set(dsl::state.eq(state)).execute(conn)?;
  115. tracing::debug!(
  116. "[TextRevisionSql] update revision:{} state:to {:?}",
  117. changeset.rev_id,
  118. changeset.state
  119. );
  120. Ok(())
  121. }
  122. fn read(
  123. user_id: &str,
  124. object_id: &str,
  125. rev_ids: Option<Vec<i64>>,
  126. conn: &SqliteConnection,
  127. ) -> Result<Vec<RevisionRecord>, FlowyError> {
  128. let mut sql = dsl::rev_table.filter(dsl::doc_id.eq(object_id)).into_boxed();
  129. if let Some(rev_ids) = rev_ids {
  130. sql = sql.filter(dsl::rev_id.eq_any(rev_ids));
  131. }
  132. let rows = sql.order(dsl::rev_id.asc()).load::<RevisionTable>(conn)?;
  133. let records = rows
  134. .into_iter()
  135. .map(|row| mk_revision_record_from_table(user_id, row))
  136. .collect::<Vec<_>>();
  137. Ok(records)
  138. }
  139. fn read_with_range(
  140. user_id: &str,
  141. object_id: &str,
  142. range: RevisionRange,
  143. conn: &SqliteConnection,
  144. ) -> Result<Vec<RevisionRecord>, FlowyError> {
  145. let rev_tables = dsl::rev_table
  146. .filter(dsl::rev_id.ge(range.start))
  147. .filter(dsl::rev_id.le(range.end))
  148. .filter(dsl::doc_id.eq(object_id))
  149. .order(dsl::rev_id.asc())
  150. .load::<RevisionTable>(conn)?;
  151. let revisions = rev_tables
  152. .into_iter()
  153. .map(|table| mk_revision_record_from_table(user_id, table))
  154. .collect::<Vec<_>>();
  155. Ok(revisions)
  156. }
  157. fn delete(object_id: &str, rev_ids: Option<Vec<i64>>, conn: &SqliteConnection) -> Result<(), FlowyError> {
  158. let mut sql = diesel::delete(dsl::rev_table).into_boxed();
  159. sql = sql.filter(dsl::doc_id.eq(object_id));
  160. if let Some(rev_ids) = rev_ids {
  161. tracing::trace!("[TextRevisionSql] Delete revision: {}:{:?}", object_id, rev_ids);
  162. sql = sql.filter(dsl::rev_id.eq_any(rev_ids));
  163. }
  164. let affected_row = sql.execute(conn)?;
  165. tracing::trace!("[TextRevisionSql] Delete {} rows", affected_row);
  166. Ok(())
  167. }
  168. }
  169. #[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)]
  170. #[table_name = "rev_table"]
  171. struct RevisionTable {
  172. id: i32,
  173. doc_id: String,
  174. base_rev_id: i64,
  175. rev_id: i64,
  176. data: Vec<u8>,
  177. state: TextRevisionState,
  178. ty: RevTableType, // Deprecated
  179. }
  180. #[derive(Clone, Copy, PartialEq, Eq, Debug, Hash, FromSqlRow, AsExpression)]
  181. #[repr(i32)]
  182. #[sql_type = "Integer"]
  183. enum TextRevisionState {
  184. Sync = 0,
  185. Ack = 1,
  186. }
  187. impl_sql_integer_expression!(TextRevisionState);
  188. impl_rev_state_map!(TextRevisionState);
  189. impl std::default::Default for TextRevisionState {
  190. fn default() -> Self {
  191. TextRevisionState::Sync
  192. }
  193. }
  194. fn mk_revision_record_from_table(user_id: &str, table: RevisionTable) -> RevisionRecord {
  195. let md5 = md5(&table.data);
  196. let revision = Revision::new(
  197. &table.doc_id,
  198. table.base_rev_id,
  199. table.rev_id,
  200. Bytes::from(table.data),
  201. user_id,
  202. md5,
  203. );
  204. RevisionRecord {
  205. revision,
  206. state: table.state.into(),
  207. write_to_disk: false,
  208. }
  209. }
  210. #[derive(Clone, Copy, PartialEq, Eq, Debug, Hash, FromSqlRow, AsExpression)]
  211. #[repr(i32)]
  212. #[sql_type = "Integer"]
  213. pub enum RevTableType {
  214. Local = 0,
  215. Remote = 1,
  216. }
  217. impl_sql_integer_expression!(RevTableType);
  218. impl std::default::Default for RevTableType {
  219. fn default() -> Self {
  220. RevTableType::Local
  221. }
  222. }
  223. impl std::convert::From<i32> for RevTableType {
  224. fn from(value: i32) -> Self {
  225. match value {
  226. 0 => RevTableType::Local,
  227. 1 => RevTableType::Remote,
  228. o => {
  229. tracing::error!("Unsupported rev type {}, fallback to RevTableType::Local", o);
  230. RevTableType::Local
  231. }
  232. }
  233. }
  234. }
  235. impl std::convert::From<RevType> for RevTableType {
  236. fn from(ty: RevType) -> Self {
  237. match ty {
  238. RevType::DeprecatedLocal => RevTableType::Local,
  239. RevType::DeprecatedRemote => RevTableType::Remote,
  240. }
  241. }
  242. }
  243. impl std::convert::From<RevTableType> for RevType {
  244. fn from(ty: RevTableType) -> Self {
  245. match ty {
  246. RevTableType::Local => RevType::DeprecatedLocal,
  247. RevTableType::Remote => RevType::DeprecatedRemote,
  248. }
  249. }
  250. }