folder_editor.rs 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. use crate::manager::FolderId;
  2. use bytes::Bytes;
  3. use flowy_error::{FlowyError, FlowyResult};
  4. use flowy_revision::{
  5. RevisionCloudService, RevisionCompress, RevisionManager, RevisionObjectDeserializer, RevisionObjectSerializer,
  6. RevisionWebSocket,
  7. };
  8. use flowy_sync::util::make_operations_from_revisions;
  9. use flowy_sync::{
  10. client_folder::{FolderChangeset, FolderPad},
  11. entities::{revision::Revision, ws_data::ServerRevisionWSData},
  12. };
  13. use lib_infra::future::FutureResult;
  14. use flowy_database::ConnectionPool;
  15. use lib_ot::core::EmptyAttributes;
  16. use parking_lot::RwLock;
  17. use std::sync::Arc;
  18. pub struct FolderEditor {
  19. user_id: String,
  20. #[allow(dead_code)]
  21. pub(crate) folder_id: FolderId,
  22. pub(crate) folder: Arc<RwLock<FolderPad>>,
  23. rev_manager: Arc<RevisionManager<Arc<ConnectionPool>>>,
  24. #[cfg(feature = "sync")]
  25. ws_manager: Arc<flowy_revision::RevisionWebSocketManager>,
  26. }
  27. impl FolderEditor {
  28. #[allow(unused_variables)]
  29. pub async fn new(
  30. user_id: &str,
  31. folder_id: &FolderId,
  32. token: &str,
  33. mut rev_manager: RevisionManager<Arc<ConnectionPool>>,
  34. web_socket: Arc<dyn RevisionWebSocket>,
  35. ) -> FlowyResult<Self> {
  36. let cloud = Arc::new(FolderRevisionCloudService {
  37. token: token.to_string(),
  38. });
  39. let folder = Arc::new(RwLock::new(rev_manager.load::<FolderRevisionSerde>(Some(cloud)).await?));
  40. let rev_manager = Arc::new(rev_manager);
  41. #[cfg(feature = "sync")]
  42. let ws_manager = crate::services::web_socket::make_folder_ws_manager(
  43. user_id,
  44. folder_id.as_ref(),
  45. rev_manager.clone(),
  46. web_socket,
  47. folder.clone(),
  48. )
  49. .await;
  50. let user_id = user_id.to_owned();
  51. let folder_id = folder_id.to_owned();
  52. Ok(Self {
  53. user_id,
  54. folder_id,
  55. folder,
  56. rev_manager,
  57. #[cfg(feature = "sync")]
  58. ws_manager,
  59. })
  60. }
  61. #[cfg(feature = "sync")]
  62. pub async fn receive_ws_data(&self, data: ServerRevisionWSData) -> FlowyResult<()> {
  63. let _ = self.ws_manager.ws_passthrough_tx.send(data).await.map_err(|e| {
  64. let err_msg = format!("{} passthrough error: {}", self.folder_id, e);
  65. FlowyError::internal().context(err_msg)
  66. })?;
  67. Ok(())
  68. }
  69. #[cfg(not(feature = "sync"))]
  70. pub async fn receive_ws_data(&self, _data: ServerRevisionWSData) -> FlowyResult<()> {
  71. Ok(())
  72. }
  73. pub(crate) fn apply_change(&self, change: FolderChangeset) -> FlowyResult<()> {
  74. let FolderChangeset { operations: delta, md5 } = change;
  75. let (base_rev_id, rev_id) = self.rev_manager.next_rev_id_pair();
  76. let delta_data = delta.json_bytes();
  77. let revision = Revision::new(
  78. &self.rev_manager.object_id,
  79. base_rev_id,
  80. rev_id,
  81. delta_data,
  82. &self.user_id,
  83. md5,
  84. );
  85. let _ = futures::executor::block_on(async { self.rev_manager.add_local_revision(&revision).await })?;
  86. Ok(())
  87. }
  88. #[allow(dead_code)]
  89. pub fn folder_json(&self) -> FlowyResult<String> {
  90. let json = self.folder.read().to_json()?;
  91. Ok(json)
  92. }
  93. }
  94. struct FolderRevisionSerde();
  95. impl RevisionObjectDeserializer for FolderRevisionSerde {
  96. type Output = FolderPad;
  97. fn deserialize_revisions(_object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
  98. let pad = FolderPad::from_revisions(revisions)?;
  99. Ok(pad)
  100. }
  101. }
  102. impl RevisionObjectSerializer for FolderRevisionSerde {
  103. fn combine_revisions(revisions: Vec<Revision>) -> FlowyResult<Bytes> {
  104. let operations = make_operations_from_revisions::<EmptyAttributes>(revisions)?;
  105. Ok(operations.json_bytes())
  106. }
  107. }
  108. pub struct FolderRevisionCompress();
  109. impl RevisionCompress for FolderRevisionCompress {
  110. fn combine_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
  111. FolderRevisionSerde::combine_revisions(revisions)
  112. }
  113. }
  114. struct FolderRevisionCloudService {
  115. #[allow(dead_code)]
  116. token: String,
  117. }
  118. impl RevisionCloudService for FolderRevisionCloudService {
  119. #[tracing::instrument(level = "trace", skip(self))]
  120. fn fetch_object(&self, _user_id: &str, _object_id: &str) -> FutureResult<Vec<Revision>, FlowyError> {
  121. FutureResult::new(async move { Ok(vec![]) })
  122. }
  123. }
  124. #[cfg(feature = "flowy_unit_test")]
  125. impl FolderEditor {
  126. pub fn rev_manager(&self) -> Arc<RevisionManager<Arc<ConnectionPool>>> {
  127. self.rev_manager.clone()
  128. }
  129. }