kv.rs 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. use crate::{
  2. services::kv::{KVAction, KVStore, KeyValue},
  3. util::sqlx_ext::{map_sqlx_error, DBTransaction, SqlBuilder},
  4. };
  5. use anyhow::Context;
  6. use async_trait::async_trait;
  7. use backend_service::errors::ServerError;
  8. use bytes::Bytes;
  9. use futures_core::future::BoxFuture;
  10. use lib_infra::future::{BoxResultFuture, FutureResultSend};
  11. use sql_builder::SqlBuilder as RawSqlBuilder;
  12. use sqlx::{
  13. postgres::{PgArguments, PgRow},
  14. Arguments,
  15. Error,
  16. PgPool,
  17. Postgres,
  18. Row,
  19. };
  20. use std::{future::Future, pin::Pin};
  21. const KV_TABLE: &str = "kv_table";
  22. pub(crate) struct PostgresKV {
  23. pub(crate) pg_pool: PgPool,
  24. }
  25. impl PostgresKV {
  26. async fn transaction<F, O>(&self, f: F) -> Result<O, ServerError>
  27. where
  28. F: for<'a> FnOnce(&'a mut DBTransaction<'_>) -> BoxFuture<'a, Result<O, ServerError>>,
  29. {
  30. let mut transaction = self
  31. .pg_pool
  32. .begin()
  33. .await
  34. .context("[KV]:Failed to acquire a Postgres connection")?;
  35. let result = f(&mut transaction).await;
  36. transaction
  37. .commit()
  38. .await
  39. .context("[KV]:Failed to commit SQL transaction.")?;
  40. result
  41. }
  42. }
  43. impl KVStore for PostgresKV {}
  44. pub(crate) struct PostgresTransaction<'a> {
  45. pub(crate) transaction: DBTransaction<'a>,
  46. }
  47. impl<'a> PostgresTransaction<'a> {}
  48. #[async_trait]
  49. impl KVAction for PostgresKV {
  50. async fn get(&self, key: &str) -> Result<Option<Bytes>, ServerError> {
  51. let id = key.to_string();
  52. self.transaction(|transaction| {
  53. Box::pin(async move {
  54. let (sql, args) = SqlBuilder::select(KV_TABLE)
  55. .add_field("*")
  56. .and_where_eq("id", &id)
  57. .build()?;
  58. let result = sqlx::query_as_with::<Postgres, KVTable, PgArguments>(&sql, args)
  59. .fetch_one(transaction)
  60. .await;
  61. let result = match result {
  62. Ok(val) => Ok(Some(Bytes::from(val.blob))),
  63. Err(error) => match error {
  64. Error::RowNotFound => Ok(None),
  65. _ => Err(map_sqlx_error(error)),
  66. },
  67. };
  68. result
  69. })
  70. })
  71. .await
  72. }
  73. async fn set(&self, key: &str, bytes: Bytes) -> Result<(), ServerError> {
  74. self.batch_set(vec![KeyValue {
  75. key: key.to_string(),
  76. value: bytes,
  77. }])
  78. .await
  79. }
  80. async fn remove(&self, key: &str) -> Result<(), ServerError> {
  81. let id = key.to_string();
  82. self.transaction(|transaction| {
  83. Box::pin(async move {
  84. let (sql, args) = SqlBuilder::delete(KV_TABLE).and_where_eq("id", &id).build()?;
  85. let _ = sqlx::query_with(&sql, args)
  86. .execute(transaction)
  87. .await
  88. .map_err(map_sqlx_error)?;
  89. Ok(())
  90. })
  91. })
  92. .await
  93. }
  94. async fn batch_set(&self, kvs: Vec<KeyValue>) -> Result<(), ServerError> {
  95. self.transaction(|transaction| {
  96. Box::pin(async move {
  97. let mut builder = RawSqlBuilder::insert_into(KV_TABLE);
  98. let m_builder = builder.field("id").field("blob");
  99. let mut args = PgArguments::default();
  100. kvs.iter().enumerate().for_each(|(index, _)| {
  101. let index = index * 2 + 1;
  102. m_builder.values(&[format!("${}", index), format!("${}", index + 1)]);
  103. });
  104. for kv in kvs {
  105. args.add(kv.key);
  106. args.add(kv.value.to_vec());
  107. }
  108. let sql = m_builder.sql()?;
  109. let _ = sqlx::query_with(&sql, args)
  110. .execute(transaction)
  111. .await
  112. .map_err(map_sqlx_error)?;
  113. Ok::<(), ServerError>(())
  114. })
  115. })
  116. .await
  117. }
  118. async fn batch_get(&self, keys: Vec<String>) -> Result<Vec<KeyValue>, ServerError> {
  119. self.transaction(|transaction| {
  120. Box::pin(async move {
  121. let sql = RawSqlBuilder::select_from(KV_TABLE)
  122. .field("id")
  123. .field("blob")
  124. .and_where_in_quoted("id", &keys)
  125. .sql()?;
  126. let rows = sqlx::query(&sql).fetch_all(transaction).await.map_err(map_sqlx_error)?;
  127. let kvs = rows_to_key_values(rows);
  128. Ok::<Vec<KeyValue>, ServerError>(kvs)
  129. })
  130. })
  131. .await
  132. }
  133. async fn batch_delete(&self, keys: Vec<String>) -> Result<(), ServerError> {
  134. self.transaction(|transaction| {
  135. Box::pin(async move {
  136. let sql = RawSqlBuilder::delete_from(KV_TABLE).and_where_in("id", &keys).sql()?;
  137. let _ = sqlx::query(&sql).execute(transaction).await.map_err(map_sqlx_error)?;
  138. Ok::<(), ServerError>(())
  139. })
  140. })
  141. .await
  142. }
  143. async fn batch_get_start_with(&self, key: &str) -> Result<Vec<KeyValue>, ServerError> {
  144. let prefix = key.to_owned();
  145. self.transaction(|transaction| {
  146. Box::pin(async move {
  147. let sql = RawSqlBuilder::select_from(KV_TABLE)
  148. .field("id")
  149. .field("blob")
  150. .and_where_like_left("id", &prefix)
  151. .sql()?;
  152. let rows = sqlx::query(&sql).fetch_all(transaction).await.map_err(map_sqlx_error)?;
  153. let kvs = rows_to_key_values(rows);
  154. Ok::<Vec<KeyValue>, ServerError>(kvs)
  155. })
  156. })
  157. .await
  158. }
  159. async fn batch_delete_key_start_with(&self, keyword: &str) -> Result<(), ServerError> {
  160. let keyword = keyword.to_owned();
  161. self.transaction(|transaction| {
  162. Box::pin(async move {
  163. let sql = RawSqlBuilder::delete_from(KV_TABLE)
  164. .and_where_like_left("id", &keyword)
  165. .sql()?;
  166. let _ = sqlx::query(&sql).execute(transaction).await.map_err(map_sqlx_error)?;
  167. Ok::<(), ServerError>(())
  168. })
  169. })
  170. .await
  171. }
  172. }
  173. fn rows_to_key_values(rows: Vec<PgRow>) -> Vec<KeyValue> {
  174. rows.into_iter()
  175. .map(|row| {
  176. let bytes: Vec<u8> = row.get("blob");
  177. KeyValue {
  178. key: row.get("id"),
  179. value: Bytes::from(bytes),
  180. }
  181. })
  182. .collect::<Vec<KeyValue>>()
  183. }
  184. #[derive(Debug, Clone, sqlx::FromRow)]
  185. struct KVTable {
  186. pub(crate) id: String,
  187. pub(crate) blob: Vec<u8>,
  188. }