row_kv.rs 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. use async_trait::async_trait;
  2. use diesel::SqliteConnection;
  3. use flowy_error::{FlowyError, FlowyResult};
  4. use flowy_grid_data_model::entities::RawRow;
  5. use lib_infra::future::{BoxResultFuture, FutureResult};
  6. use lib_sqlite::{ConnectionManager, ConnectionPool};
  7. use std::sync::Arc;
  8. pub trait RowKVTransaction {
  9. fn get(&self, row_id: &str) -> FlowyResult<Option<RawRow>>;
  10. fn set(&self, row: RawRow) -> FlowyResult<()>;
  11. fn remove(&self, row_id: &str) -> FlowyResult<()>;
  12. fn batch_get(&self, ids: Vec<String>) -> FlowyResult<()>;
  13. fn batch_set(&self, rows: Vec<RawRow>) -> FlowyResult<()>;
  14. fn batch_delete(&self, ids: Vec<String>) -> FlowyResult<()>;
  15. }
  16. pub struct RowKVPersistence {
  17. pool: Arc<ConnectionPool>,
  18. }
  19. impl RowKVPersistence {
  20. pub fn new(pool: Arc<ConnectionPool>) -> Self {
  21. Self { pool }
  22. }
  23. pub fn begin_transaction<F, O>(&self, f: F) -> FlowyResult<O>
  24. where
  25. F: for<'a> FnOnce(Box<dyn RowKVTransaction + 'a>) -> FlowyResult<O>,
  26. {
  27. let conn = self.pool.get()?;
  28. conn.immediate_transaction::<_, FlowyError, _>(|| {
  29. let sql_transaction = SqliteTransaction { conn: &conn };
  30. f(Box::new(sql_transaction))
  31. })
  32. }
  33. }
  34. impl RowKVTransaction for RowKVPersistence {
  35. fn get(&self, row_id: &str) -> FlowyResult<Option<RawRow>> {
  36. self.begin_transaction(|transaction| transaction.get(row_id))
  37. }
  38. fn set(&self, row: RawRow) -> FlowyResult<()> {
  39. self.begin_transaction(|transaction| transaction.set(row))
  40. }
  41. fn remove(&self, row_id: &str) -> FlowyResult<()> {
  42. self.begin_transaction(|transaction| transaction.remove(row_id))
  43. }
  44. fn batch_get(&self, ids: Vec<String>) -> FlowyResult<()> {
  45. self.begin_transaction(|transaction| transaction.batch_get(ids))
  46. }
  47. fn batch_set(&self, rows: Vec<RawRow>) -> FlowyResult<()> {
  48. self.begin_transaction(|transaction| transaction.batch_set(rows))
  49. }
  50. fn batch_delete(&self, ids: Vec<String>) -> FlowyResult<()> {
  51. self.begin_transaction(|transaction| transaction.batch_delete(ids))
  52. }
  53. }
  54. pub struct SqliteTransaction<'a> {
  55. conn: &'a SqliteConnection,
  56. }
  57. #[async_trait]
  58. impl<'a> RowKVTransaction for SqliteTransaction<'a> {
  59. fn get(&self, row_id: &str) -> FlowyResult<Option<RawRow>> {
  60. todo!()
  61. }
  62. fn set(&self, row: RawRow) -> FlowyResult<()> {
  63. todo!()
  64. }
  65. fn remove(&self, row_id: &str) -> FlowyResult<()> {
  66. todo!()
  67. }
  68. fn batch_get(&self, ids: Vec<String>) -> FlowyResult<()> {
  69. todo!()
  70. }
  71. fn batch_set(&self, rows: Vec<RawRow>) -> FlowyResult<()> {
  72. todo!()
  73. }
  74. fn batch_delete(&self, ids: Vec<String>) -> FlowyResult<()> {
  75. todo!()
  76. }
  77. }