use ::diesel::{query_dsl::*, ExpressionMethods}; use bytes::Bytes; use diesel::SqliteConnection; use flowy_database::{ prelude::*, schema::{kv_table, kv_table::dsl}, }; use flowy_error::{FlowyError, FlowyResult}; use flowy_grid_data_model::entities::{Field, GridIdentifiable, RawRow}; use lib_infra::future::{BoxResultFuture, FutureResult}; use lib_sqlite::{ConnectionManager, ConnectionPool}; use std::sync::Arc; #[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)] #[table_name = "kv_table"] #[primary_key(key)] pub struct KeyValue { key: String, value: Vec, } pub trait KVTransaction { fn get>(&self, key: &str) -> FlowyResult>; fn set>(&self, value: T) -> FlowyResult<()>; fn remove(&self, key: &str) -> FlowyResult<()>; fn batch_get>(&self, keys: Vec) -> FlowyResult>; fn batch_set>(&self, values: Vec) -> FlowyResult<()>; fn batch_remove(&self, keys: Vec) -> FlowyResult<()>; } pub struct GridKVPersistence { pool: Arc, } impl GridKVPersistence { pub fn new(pool: Arc) -> Self { Self { pool } } pub fn begin_transaction(&self, f: F) -> FlowyResult where F: for<'a> FnOnce(SqliteTransaction<'a>) -> FlowyResult, { let conn = self.pool.get()?; conn.immediate_transaction::<_, FlowyError, _>(|| { let sql_transaction = SqliteTransaction { conn: &conn }; f(sql_transaction) }) } } impl KVTransaction for GridKVPersistence { fn get>(&self, key: &str) -> FlowyResult> { self.begin_transaction(|transaction| transaction.get(key)) } fn set>(&self, value: T) -> FlowyResult<()> { self.begin_transaction(|transaction| transaction.set(value)) } fn remove(&self, key: &str) -> FlowyResult<()> { self.begin_transaction(|transaction| transaction.remove(key)) } fn batch_get>( &self, keys: Vec, ) -> FlowyResult> { self.begin_transaction(|transaction| transaction.batch_get(keys)) } fn batch_set>(&self, values: Vec) -> FlowyResult<()> { self.begin_transaction(|transaction| transaction.batch_set(values)) } fn batch_remove(&self, keys: Vec) -> FlowyResult<()> { self.begin_transaction(|transaction| transaction.batch_remove(keys)) } } pub struct SqliteTransaction<'a> { conn: &'a SqliteConnection, } impl<'a> KVTransaction for SqliteTransaction<'a> { fn get>(&self, key: &str) -> FlowyResult> { let item = dsl::kv_table .filter(kv_table::key.eq(key)) .first::(self.conn)?; let value = T::try_from(Bytes::from(item.value)).unwrap(); Ok(Some(value)) } fn set>(&self, value: T) -> FlowyResult<()> { let item: KeyValue = value.into(); let _ = diesel::replace_into(kv_table::table).values(&item).execute(self.conn)?; Ok(()) } fn remove(&self, key: &str) -> FlowyResult<()> { let sql = dsl::kv_table.filter(kv_table::key.eq(key)); let _ = diesel::delete(sql).execute(self.conn)?; Ok(()) } fn batch_get>( &self, keys: Vec, ) -> FlowyResult> { let items = dsl::kv_table .filter(kv_table::key.eq_any(&keys)) .load::(self.conn)?; let mut values = vec![]; for item in items { let value = T::try_from(Bytes::from(item.value)).unwrap(); values.push(value); } Ok(values) } fn batch_set>(&self, values: Vec) -> FlowyResult<()> { let items = values.into_iter().map(|value| value.into()).collect::>(); let _ = diesel::replace_into(kv_table::table) .values(&items) .execute(self.conn)?; Ok(()) } fn batch_remove(&self, keys: Vec) -> FlowyResult<()> { let sql = dsl::kv_table.filter(kv_table::key.eq_any(keys)); let _ = diesel::delete(sql).execute(self.conn)?; Ok(()) } } impl + GridIdentifiable> std::convert::From for KeyValue { fn from(value: T) -> Self { let key = value.id().to_string(); let bytes: Bytes = value.try_into().unwrap(); let value = bytes.to_vec(); KeyValue { key, value } } } // // impl std::convert::TryInto for KeyValue { // type Error = FlowyError; // // fn try_into(self) -> Result { // let bytes = Bytes::from(self.value); // RawRow::try_from(bytes) // .map_err(|e| FlowyError::internal().context(format!("Deserialize into raw row failed: {:?}", e))) // } // } // // impl std::convert::TryInto for KeyValue { // type Error = FlowyError; // // fn try_into(self) -> Result { // let bytes = Bytes::from(self.value); // Field::try_from(bytes) // .map_err(|e| FlowyError::internal().context(format!("Deserialize into field failed: {:?}", e))) // } // }