crud.rs 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. use crate::{
  2. entities::doc::{DocTable, DOC_TABLE},
  3. sqlx_ext::{map_sqlx_error, DBTransaction, SqlBuilder},
  4. };
  5. use anyhow::Context;
  6. use backend_service::errors::ServerError;
  7. use flowy_document_infra::protobuf::{CreateDocParams, Doc, DocIdentifier, UpdateDocParams};
  8. use sqlx::{postgres::PgArguments, PgPool, Postgres};
  9. use uuid::Uuid;
  10. #[tracing::instrument(level = "debug", skip(transaction), err)]
  11. pub(crate) async fn create_doc(
  12. transaction: &mut DBTransaction<'_>,
  13. params: CreateDocParams,
  14. ) -> Result<(), ServerError> {
  15. let uuid = Uuid::parse_str(&params.id)?;
  16. let (sql, args) = NewDocSqlBuilder::new(uuid).data(params.data).build()?;
  17. let _ = sqlx::query_with(&sql, args)
  18. .execute(transaction)
  19. .await
  20. .map_err(map_sqlx_error)?;
  21. Ok(())
  22. }
  23. #[tracing::instrument(level = "debug", skip(pool), err)]
  24. pub(crate) async fn read_doc(pool: &PgPool, params: DocIdentifier) -> Result<Doc, ServerError> {
  25. let doc_id = Uuid::parse_str(&params.doc_id)?;
  26. let mut transaction = pool
  27. .begin()
  28. .await
  29. .context("Failed to acquire a Postgres connection to read doc")?;
  30. let builder = SqlBuilder::select(DOC_TABLE).add_field("*").and_where_eq("id", &doc_id);
  31. let (sql, args) = builder.build()?;
  32. // TODO: benchmark the speed of different documents with different size
  33. let doc: Doc = sqlx::query_as_with::<Postgres, DocTable, PgArguments>(&sql, args)
  34. .fetch_one(&mut transaction)
  35. .await
  36. .map_err(map_sqlx_error)?
  37. .into();
  38. transaction
  39. .commit()
  40. .await
  41. .context("Failed to commit SQL transaction to read doc.")?;
  42. Ok(doc)
  43. }
  44. #[tracing::instrument(level = "debug", skip(pool, params), fields(delta), err)]
  45. pub async fn update_doc(pool: &PgPool, mut params: UpdateDocParams) -> Result<(), ServerError> {
  46. let doc_id = Uuid::parse_str(&params.doc_id)?;
  47. let mut transaction = pool
  48. .begin()
  49. .await
  50. .context("Failed to acquire a Postgres connection to update doc")?;
  51. let data = Some(params.take_data());
  52. tracing::Span::current().record("result", &data.as_ref().unwrap_or(&"".to_owned()).as_str());
  53. let (sql, args) = SqlBuilder::update(DOC_TABLE)
  54. .add_some_arg("data", data)
  55. .add_arg("rev_id", params.rev_id)
  56. .and_where_eq("id", doc_id)
  57. .build()?;
  58. sqlx::query_with(&sql, args)
  59. .execute(&mut transaction)
  60. .await
  61. .map_err(map_sqlx_error)?;
  62. transaction
  63. .commit()
  64. .await
  65. .context("Failed to commit SQL transaction to update doc.")?;
  66. Ok(())
  67. }
  68. #[tracing::instrument(level = "debug", skip(transaction), err)]
  69. pub(crate) async fn delete_doc(transaction: &mut DBTransaction<'_>, doc_id: Uuid) -> Result<(), ServerError> {
  70. let (sql, args) = SqlBuilder::delete(DOC_TABLE).and_where_eq("id", doc_id).build()?;
  71. let _ = sqlx::query_with(&sql, args)
  72. .execute(transaction)
  73. .await
  74. .map_err(map_sqlx_error)?;
  75. Ok(())
  76. }
  77. pub struct NewDocSqlBuilder {
  78. table: DocTable,
  79. }
  80. impl NewDocSqlBuilder {
  81. pub fn new(id: Uuid) -> Self {
  82. let table = DocTable {
  83. id,
  84. data: "".to_owned(),
  85. rev_id: 0,
  86. };
  87. Self { table }
  88. }
  89. pub fn data(mut self, data: String) -> Self {
  90. self.table.data = data;
  91. self
  92. }
  93. pub fn build(self) -> Result<(String, PgArguments), ServerError> {
  94. let (sql, args) = SqlBuilder::create(DOC_TABLE)
  95. .add_arg("id", self.table.id)
  96. .add_arg("data", self.table.data)
  97. .add_arg("rev_id", self.table.rev_id)
  98. .build()?;
  99. Ok((sql, args))
  100. }
  101. }