kv.rs 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. use crate::kv::schema::{kv_table, kv_table::dsl, KV_SQL};
  2. use ::diesel::{query_dsl::*, ExpressionMethods};
  3. use diesel::{Connection, SqliteConnection};
  4. use flowy_derive::ProtoBuf;
  5. use flowy_sqlite::{DBConnection, Database, PoolConfig};
  6. use lazy_static::lazy_static;
  7. use std::{collections::HashMap, path::Path, sync::RwLock};
  8. const DB_NAME: &str = "kv.db";
  9. lazy_static! {
  10. static ref KV_HOLDER: RwLock<KV> = RwLock::new(KV::new());
  11. }
  12. pub struct KV {
  13. database: Option<Database>,
  14. cache: HashMap<String, KeyValue>,
  15. }
  16. impl KV {
  17. fn new() -> Self {
  18. KV {
  19. database: None,
  20. cache: HashMap::new(),
  21. }
  22. }
  23. fn set(value: KeyValue) -> Result<(), String> {
  24. log::debug!("set value: {:?}", value);
  25. update_cache(value.clone());
  26. let _ = diesel::replace_into(kv_table::table)
  27. .values(&value)
  28. .execute(&*(get_connection()?))
  29. .map_err(|e| format!("KV set error: {:?}", e))?;
  30. Ok(())
  31. }
  32. fn get(key: &str) -> Result<KeyValue, String> {
  33. if let Some(value) = read_cache(key) {
  34. return Ok(value);
  35. }
  36. let conn = get_connection()?;
  37. let value = dsl::kv_table
  38. .filter(kv_table::key.eq(key))
  39. .first::<KeyValue>(&*conn)
  40. .map_err(|e| format!("KV get error: {:?}", e))?;
  41. update_cache(value.clone());
  42. Ok(value)
  43. }
  44. #[allow(dead_code)]
  45. pub fn remove(key: &str) -> Result<(), String> {
  46. log::debug!("remove key: {}", key);
  47. match KV_HOLDER.write() {
  48. Ok(mut guard) => {
  49. guard.cache.remove(key);
  50. },
  51. Err(e) => log::error!("Require write lock failed: {:?}", e),
  52. };
  53. let conn = get_connection()?;
  54. let sql = dsl::kv_table.filter(kv_table::key.eq(key));
  55. let _ = diesel::delete(sql)
  56. .execute(&*conn)
  57. .map_err(|e| format!("KV remove error: {:?}", e))?;
  58. Ok(())
  59. }
  60. pub fn init(root: &str) -> Result<(), String> {
  61. if !Path::new(root).exists() {
  62. return Err(format!("Init KVStore failed. {} not exists", root));
  63. }
  64. let pool_config = PoolConfig::default();
  65. let database = Database::new(root, DB_NAME, pool_config).unwrap();
  66. let conn = database.get_connection().unwrap();
  67. SqliteConnection::execute(&*conn, KV_SQL).unwrap();
  68. let mut store = KV_HOLDER
  69. .write()
  70. .map_err(|e| format!("KVStore write failed: {:?}", e))?;
  71. store.database = Some(database);
  72. Ok(())
  73. }
  74. }
  75. fn read_cache(key: &str) -> Option<KeyValue> {
  76. match KV_HOLDER.read() {
  77. Ok(guard) => guard.cache.get(key).cloned(),
  78. Err(e) => {
  79. log::error!("Require read lock failed: {:?}", e);
  80. None
  81. },
  82. }
  83. }
  84. fn update_cache(value: KeyValue) {
  85. match KV_HOLDER.write() {
  86. Ok(mut guard) => {
  87. guard.cache.insert(value.key.clone(), value);
  88. },
  89. Err(e) => log::error!("Require write lock failed: {:?}", e),
  90. };
  91. }
  92. macro_rules! impl_get_func {
  93. (
  94. $func_name:ident,
  95. $get_method:ident=>$target:ident
  96. ) => {
  97. impl KV {
  98. #[allow(dead_code)]
  99. pub fn $func_name(k: &str) -> Option<$target> {
  100. match KV::get(k) {
  101. Ok(item) => item.$get_method,
  102. Err(_) => None,
  103. }
  104. }
  105. }
  106. };
  107. }
  108. macro_rules! impl_set_func {
  109. ($func_name:ident,$set_method:ident,$key_type:ident) => {
  110. impl KV {
  111. #[allow(dead_code)]
  112. pub fn $func_name(key: &str, value: $key_type) {
  113. let mut item = KeyValue::new(key);
  114. item.$set_method = Some(value);
  115. match KV::set(item) {
  116. Ok(_) => {},
  117. Err(e) => {
  118. log::error!("{:?}", e)
  119. },
  120. };
  121. }
  122. }
  123. };
  124. }
  125. impl_set_func!(set_str, str_value, String);
  126. impl_set_func!(set_bool, bool_value, bool);
  127. impl_set_func!(set_int, int_value, i64);
  128. impl_set_func!(set_float, float_value, f64);
  129. impl_get_func!(get_str,str_value=>String);
  130. impl_get_func!(get_int,int_value=>i64);
  131. impl_get_func!(get_float,float_value=>f64);
  132. impl_get_func!(get_bool,bool_value=>bool);
  133. fn get_connection() -> Result<DBConnection, String> {
  134. match KV_HOLDER.read() {
  135. Ok(store) => {
  136. let conn = store
  137. .database
  138. .as_ref()
  139. .expect("KVStore is not init")
  140. .get_connection()
  141. .map_err(|e| format!("KVStore error: {:?}", e))?;
  142. Ok(conn)
  143. },
  144. Err(e) => {
  145. let msg = format!("KVStore get connection failed: {:?}", e);
  146. log::error!("{:?}", msg);
  147. Err(msg)
  148. },
  149. }
  150. }
  151. #[derive(Clone, Debug, ProtoBuf, Default, Queryable, Identifiable, Insertable, AsChangeset)]
  152. #[table_name = "kv_table"]
  153. #[primary_key(key)]
  154. pub struct KeyValue {
  155. #[pb(index = 1)]
  156. pub key: String,
  157. #[pb(index = 2, one_of)]
  158. pub str_value: Option<String>,
  159. #[pb(index = 3, one_of)]
  160. pub int_value: Option<i64>,
  161. #[pb(index = 4, one_of)]
  162. pub float_value: Option<f64>,
  163. #[pb(index = 5, one_of)]
  164. pub bool_value: Option<bool>,
  165. }
  166. impl KeyValue {
  167. pub fn new(key: &str) -> Self {
  168. KeyValue {
  169. key: key.to_string(),
  170. ..Default::default()
  171. }
  172. }
  173. }
  174. #[cfg(test)]
  175. mod tests {
  176. use crate::kv::KV;
  177. #[test]
  178. fn kv_store_test() {
  179. let dir = "./temp/";
  180. if !std::path::Path::new(dir).exists() {
  181. std::fs::create_dir_all(dir).unwrap();
  182. }
  183. KV::init(dir).unwrap();
  184. KV::set_str("1", "hello".to_string());
  185. assert_eq!(KV::get_str("1").unwrap(), "hello");
  186. assert_eq!(KV::get_str("2"), None);
  187. KV::set_bool("1", true);
  188. assert_eq!(KV::get_bool("1").unwrap(), true);
  189. assert_eq!(KV::get_bool("2"), None);
  190. }
  191. }