kv_persistence.rs 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. use ::diesel::{query_dsl::*, ExpressionMethods};
  2. use bytes::Bytes;
  3. use diesel::SqliteConnection;
  4. use flowy_database::{
  5. prelude::*,
  6. schema::{kv_table, kv_table::dsl},
  7. };
  8. use flowy_error::{FlowyError, FlowyResult};
  9. use flowy_grid_data_model::entities::{Field, GridIdentifiable, RawRow};
  10. use lib_infra::future::{BoxResultFuture, FutureResult};
  11. use lib_sqlite::{ConnectionManager, ConnectionPool};
  12. use std::sync::Arc;
  13. #[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)]
  14. #[table_name = "kv_table"]
  15. #[primary_key(key)]
  16. pub struct KeyValue {
  17. key: String,
  18. value: Vec<u8>,
  19. }
  20. pub trait KVTransaction {
  21. fn get<T: TryFrom<Bytes, Error = ::protobuf::ProtobufError>>(&self, key: &str) -> FlowyResult<Option<T>>;
  22. fn set<T: Into<KeyValue>>(&self, value: T) -> FlowyResult<()>;
  23. fn remove(&self, key: &str) -> FlowyResult<()>;
  24. fn batch_get<T: TryFrom<Bytes, Error = ::protobuf::ProtobufError>>(&self, keys: Vec<String>)
  25. -> FlowyResult<Vec<T>>;
  26. fn batch_set<T: Into<KeyValue>>(&self, values: Vec<T>) -> FlowyResult<()>;
  27. fn batch_remove(&self, keys: Vec<String>) -> FlowyResult<()>;
  28. }
  29. pub struct GridKVPersistence {
  30. pool: Arc<ConnectionPool>,
  31. }
  32. impl GridKVPersistence {
  33. pub fn new(pool: Arc<ConnectionPool>) -> Self {
  34. Self { pool }
  35. }
  36. pub fn begin_transaction<F, O>(&self, f: F) -> FlowyResult<O>
  37. where
  38. F: for<'a> FnOnce(SqliteTransaction<'a>) -> FlowyResult<O>,
  39. {
  40. let conn = self.pool.get()?;
  41. conn.immediate_transaction::<_, FlowyError, _>(|| {
  42. let sql_transaction = SqliteTransaction { conn: &conn };
  43. f(sql_transaction)
  44. })
  45. }
  46. }
  47. impl KVTransaction for GridKVPersistence {
  48. fn get<T: TryFrom<Bytes, Error = ::protobuf::ProtobufError>>(&self, key: &str) -> FlowyResult<Option<T>> {
  49. self.begin_transaction(|transaction| transaction.get(key))
  50. }
  51. fn set<T: Into<KeyValue>>(&self, value: T) -> FlowyResult<()> {
  52. self.begin_transaction(|transaction| transaction.set(value))
  53. }
  54. fn remove(&self, key: &str) -> FlowyResult<()> {
  55. self.begin_transaction(|transaction| transaction.remove(key))
  56. }
  57. fn batch_get<T: TryFrom<Bytes, Error = ::protobuf::ProtobufError>>(
  58. &self,
  59. keys: Vec<String>,
  60. ) -> FlowyResult<Vec<T>> {
  61. self.begin_transaction(|transaction| transaction.batch_get(keys))
  62. }
  63. fn batch_set<T: Into<KeyValue>>(&self, values: Vec<T>) -> FlowyResult<()> {
  64. self.begin_transaction(|transaction| transaction.batch_set(values))
  65. }
  66. fn batch_remove(&self, keys: Vec<String>) -> FlowyResult<()> {
  67. self.begin_transaction(|transaction| transaction.batch_remove(keys))
  68. }
  69. }
  70. pub struct SqliteTransaction<'a> {
  71. conn: &'a SqliteConnection,
  72. }
  73. impl<'a> KVTransaction for SqliteTransaction<'a> {
  74. fn get<T: TryFrom<Bytes, Error = ::protobuf::ProtobufError>>(&self, key: &str) -> FlowyResult<Option<T>> {
  75. let item = dsl::kv_table
  76. .filter(kv_table::key.eq(key))
  77. .first::<KeyValue>(self.conn)?;
  78. let value = T::try_from(Bytes::from(item.value)).unwrap();
  79. Ok(Some(value))
  80. }
  81. fn set<T: Into<KeyValue>>(&self, value: T) -> FlowyResult<()> {
  82. let item: KeyValue = value.into();
  83. let _ = diesel::replace_into(kv_table::table).values(&item).execute(self.conn)?;
  84. Ok(())
  85. }
  86. fn remove(&self, key: &str) -> FlowyResult<()> {
  87. let sql = dsl::kv_table.filter(kv_table::key.eq(key));
  88. let _ = diesel::delete(sql).execute(self.conn)?;
  89. Ok(())
  90. }
  91. fn batch_get<T: TryFrom<Bytes, Error = ::protobuf::ProtobufError>>(
  92. &self,
  93. keys: Vec<String>,
  94. ) -> FlowyResult<Vec<T>> {
  95. let items = dsl::kv_table
  96. .filter(kv_table::key.eq_any(&keys))
  97. .load::<KeyValue>(self.conn)?;
  98. let mut values = vec![];
  99. for item in items {
  100. let value = T::try_from(Bytes::from(item.value)).unwrap();
  101. values.push(value);
  102. }
  103. Ok(values)
  104. }
  105. fn batch_set<T: Into<KeyValue>>(&self, values: Vec<T>) -> FlowyResult<()> {
  106. let items = values.into_iter().map(|value| value.into()).collect::<Vec<KeyValue>>();
  107. let _ = diesel::replace_into(kv_table::table)
  108. .values(&items)
  109. .execute(self.conn)?;
  110. Ok(())
  111. }
  112. fn batch_remove(&self, keys: Vec<String>) -> FlowyResult<()> {
  113. let sql = dsl::kv_table.filter(kv_table::key.eq_any(keys));
  114. let _ = diesel::delete(sql).execute(self.conn)?;
  115. Ok(())
  116. }
  117. }
  118. impl<T: TryInto<Bytes, Error = ::protobuf::ProtobufError> + GridIdentifiable> std::convert::From<T> for KeyValue {
  119. fn from(value: T) -> Self {
  120. let key = value.id().to_string();
  121. let bytes: Bytes = value.try_into().unwrap();
  122. let value = bytes.to_vec();
  123. KeyValue { key, value }
  124. }
  125. }
  126. //
  127. // impl std::convert::TryInto<RawRow> for KeyValue {
  128. // type Error = FlowyError;
  129. //
  130. // fn try_into(self) -> Result<RawRow, Self::Error> {
  131. // let bytes = Bytes::from(self.value);
  132. // RawRow::try_from(bytes)
  133. // .map_err(|e| FlowyError::internal().context(format!("Deserialize into raw row failed: {:?}", e)))
  134. // }
  135. // }
  136. //
  137. // impl std::convert::TryInto<Field> for KeyValue {
  138. // type Error = FlowyError;
  139. //
  140. // fn try_into(self) -> Result<Field, Self::Error> {
  141. // let bytes = Bytes::from(self.value);
  142. // Field::try_from(bytes)
  143. // .map_err(|e| FlowyError::internal().context(format!("Deserialize into field failed: {:?}", e)))
  144. // }
  145. // }