persistence.rs 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. use crate::{
  2. context::FlowyPersistence,
  3. services::kv::{KVStore, KVTransaction, KeyValue},
  4. util::serde_ext::parse_from_bytes,
  5. };
  6. use anyhow::Context;
  7. use backend_service::errors::{internal_error, ServerError};
  8. use bytes::Bytes;
  9. use flowy_collaboration::protobuf::{
  10. CreateDocParams,
  11. DocIdentifier,
  12. DocumentInfo,
  13. RepeatedRevision,
  14. ResetDocumentParams,
  15. Revision,
  16. };
  17. use lib_ot::{core::OperationTransformable, rich_text::RichTextDelta};
  18. use protobuf::Message;
  19. use sqlx::PgPool;
  20. use std::sync::Arc;
  21. use uuid::Uuid;
  22. #[tracing::instrument(level = "debug", skip(kv_store), err)]
  23. pub(crate) async fn create_document(
  24. kv_store: &Arc<DocumentKVPersistence>,
  25. mut params: CreateDocParams,
  26. ) -> Result<(), ServerError> {
  27. let revisions = params.take_revisions().take_items();
  28. let _ = kv_store.batch_set_revision(revisions.into()).await?;
  29. Ok(())
  30. }
  31. #[tracing::instrument(level = "debug", skip(persistence), err)]
  32. pub(crate) async fn read_document(
  33. persistence: &Arc<FlowyPersistence>,
  34. params: DocIdentifier,
  35. ) -> Result<DocumentInfo, ServerError> {
  36. let _ = Uuid::parse_str(&params.doc_id).context("Parse document id to uuid failed")?;
  37. let kv_store = persistence.kv_store();
  38. let revisions = kv_store.batch_get_revisions(&params.doc_id, None).await?;
  39. make_doc_from_revisions(&params.doc_id, revisions)
  40. }
  41. #[tracing::instrument(level = "debug", skip(kv_store, params), fields(delta), err)]
  42. pub async fn reset_document(
  43. kv_store: &Arc<DocumentKVPersistence>,
  44. mut params: ResetDocumentParams,
  45. ) -> Result<(), ServerError> {
  46. let revisions = params.take_revisions().take_items();
  47. let doc_id = params.take_doc_id();
  48. kv_store
  49. .transaction(|mut transaction| {
  50. Box::pin(async move {
  51. let _ = transaction.batch_delete_key_start_with(&doc_id).await?;
  52. let items = revisions_to_key_value_items(revisions.into());
  53. let _ = transaction.batch_set(items).await?;
  54. Ok(())
  55. })
  56. })
  57. .await
  58. }
  59. #[tracing::instrument(level = "debug", skip(kv_store), err)]
  60. pub(crate) async fn delete_document(kv_store: &Arc<DocumentKVPersistence>, doc_id: Uuid) -> Result<(), ServerError> {
  61. let _ = kv_store.batch_delete_revisions(&doc_id.to_string(), None).await?;
  62. Ok(())
  63. }
  64. pub struct DocumentKVPersistence {
  65. inner: Arc<KVStore>,
  66. }
  67. impl std::ops::Deref for DocumentKVPersistence {
  68. type Target = Arc<KVStore>;
  69. fn deref(&self) -> &Self::Target { &self.inner }
  70. }
  71. impl std::ops::DerefMut for DocumentKVPersistence {
  72. fn deref_mut(&mut self) -> &mut Self::Target { &mut self.inner }
  73. }
  74. impl DocumentKVPersistence {
  75. pub(crate) fn new(kv_store: Arc<KVStore>) -> Self { DocumentKVPersistence { inner: kv_store } }
  76. pub(crate) async fn batch_set_revision(&self, revisions: Vec<Revision>) -> Result<(), ServerError> {
  77. let items = revisions_to_key_value_items(revisions);
  78. self.inner
  79. .transaction(|mut t| Box::pin(async move { t.batch_set(items).await }))
  80. .await
  81. }
  82. pub(crate) async fn get_doc_revisions(&self, doc_id: &str) -> Result<RepeatedRevision, ServerError> {
  83. let doc_id = doc_id.to_owned();
  84. let items = self
  85. .inner
  86. .transaction(|mut t| Box::pin(async move { t.batch_get_start_with(&doc_id).await }))
  87. .await?;
  88. Ok(key_value_items_to_revisions(items))
  89. }
  90. pub(crate) async fn batch_get_revisions<T: Into<Option<Vec<i64>>>>(
  91. &self,
  92. doc_id: &str,
  93. rev_ids: T,
  94. ) -> Result<RepeatedRevision, ServerError> {
  95. let rev_ids = rev_ids.into();
  96. let items = match rev_ids {
  97. None => {
  98. let doc_id = doc_id.to_owned();
  99. self.inner
  100. .transaction(|mut t| Box::pin(async move { t.batch_get_start_with(&doc_id).await }))
  101. .await?
  102. },
  103. Some(rev_ids) => {
  104. let keys = rev_ids
  105. .into_iter()
  106. .map(|rev_id| make_revision_key(doc_id, rev_id))
  107. .collect::<Vec<String>>();
  108. self.inner
  109. .transaction(|mut t| Box::pin(async move { t.batch_get(keys).await }))
  110. .await?
  111. },
  112. };
  113. Ok(key_value_items_to_revisions(items))
  114. }
  115. pub(crate) async fn batch_delete_revisions<T: Into<Option<Vec<i64>>>>(
  116. &self,
  117. doc_id: &str,
  118. rev_ids: T,
  119. ) -> Result<(), ServerError> {
  120. match rev_ids.into() {
  121. None => {
  122. let doc_id = doc_id.to_owned();
  123. self.inner
  124. .transaction(|mut t| Box::pin(async move { t.batch_delete_key_start_with(&doc_id).await }))
  125. .await
  126. },
  127. Some(rev_ids) => {
  128. let keys = rev_ids
  129. .into_iter()
  130. .map(|rev_id| make_revision_key(doc_id, rev_id))
  131. .collect::<Vec<String>>();
  132. self.inner
  133. .transaction(|mut t| Box::pin(async move { t.batch_delete(keys).await }))
  134. .await
  135. },
  136. }
  137. }
  138. }
  139. #[inline]
  140. fn revisions_to_key_value_items(revisions: Vec<Revision>) -> Vec<KeyValue> {
  141. revisions
  142. .into_iter()
  143. .map(|revision| {
  144. let key = make_revision_key(&revision.doc_id, revision.rev_id);
  145. let value = Bytes::from(revision.write_to_bytes().unwrap());
  146. KeyValue { key, value }
  147. })
  148. .collect::<Vec<KeyValue>>()
  149. }
  150. #[inline]
  151. fn key_value_items_to_revisions(items: Vec<KeyValue>) -> RepeatedRevision {
  152. let mut revisions = items
  153. .into_iter()
  154. .filter_map(|kv| parse_from_bytes::<Revision>(&kv.value).ok())
  155. .collect::<Vec<Revision>>();
  156. revisions.sort_by(|a, b| a.rev_id.cmp(&b.rev_id));
  157. let mut repeated_revision = RepeatedRevision::new();
  158. repeated_revision.set_items(revisions.into());
  159. repeated_revision
  160. }
  161. #[inline]
  162. fn make_revision_key(doc_id: &str, rev_id: i64) -> String { format!("{}:{}", doc_id, rev_id) }
  163. #[inline]
  164. fn make_doc_from_revisions(doc_id: &str, mut revisions: RepeatedRevision) -> Result<DocumentInfo, ServerError> {
  165. let revisions = revisions.take_items();
  166. if revisions.is_empty() {
  167. return Err(ServerError::record_not_found().context(format!("{} not exist", doc_id)));
  168. }
  169. let mut document_delta = RichTextDelta::new();
  170. let mut base_rev_id = 0;
  171. let mut rev_id = 0;
  172. // TODO: generate delta from revision should be wrapped into function.
  173. for revision in revisions {
  174. base_rev_id = revision.base_rev_id;
  175. rev_id = revision.rev_id;
  176. let delta = RichTextDelta::from_bytes(revision.delta_data).map_err(internal_error)?;
  177. document_delta = document_delta.compose(&delta).map_err(internal_error)?;
  178. }
  179. let text = document_delta.to_json();
  180. let mut document_info = DocumentInfo::new();
  181. document_info.set_doc_id(doc_id.to_owned());
  182. document_info.set_text(text);
  183. document_info.set_base_rev_id(base_rev_id);
  184. document_info.set_rev_id(rev_id);
  185. Ok(document_info)
  186. }