folder_editor.rs 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. use flowy_sync::{
  2. client_folder::{FolderChange, FolderPad},
  3. entities::{revision::Revision, ws_data::ServerRevisionWSData},
  4. };
  5. use crate::manager::FolderId;
  6. use bytes::Bytes;
  7. use flowy_error::{FlowyError, FlowyResult};
  8. use flowy_sync::util::make_delta_from_revisions;
  9. use flowy_revision::{
  10. RevisionCloudService, RevisionCompactor, RevisionManager, RevisionObjectBuilder, RevisionWebSocket,
  11. };
  12. use lib_infra::future::FutureResult;
  13. use lib_ot::core::PlainTextAttributes;
  14. use parking_lot::RwLock;
  15. use std::sync::Arc;
  16. pub struct ClientFolderEditor {
  17. user_id: String,
  18. #[allow(dead_code)]
  19. pub(crate) folder_id: FolderId,
  20. pub(crate) folder: Arc<RwLock<FolderPad>>,
  21. rev_manager: Arc<RevisionManager>,
  22. #[cfg(feature = "sync")]
  23. ws_manager: Arc<flowy_revision::RevisionWebSocketManager>,
  24. }
  25. impl ClientFolderEditor {
  26. #[allow(unused_variables)]
  27. pub async fn new(
  28. user_id: &str,
  29. folder_id: &FolderId,
  30. token: &str,
  31. mut rev_manager: RevisionManager,
  32. web_socket: Arc<dyn RevisionWebSocket>,
  33. ) -> FlowyResult<Self> {
  34. let cloud = Arc::new(FolderRevisionCloudService {
  35. token: token.to_string(),
  36. });
  37. let folder = Arc::new(RwLock::new(rev_manager.load::<FolderPadBuilder>(Some(cloud)).await?));
  38. let rev_manager = Arc::new(rev_manager);
  39. #[cfg(feature = "sync")]
  40. let ws_manager = crate::services::web_socket::make_folder_ws_manager(
  41. user_id,
  42. folder_id.as_ref(),
  43. rev_manager.clone(),
  44. web_socket,
  45. folder.clone(),
  46. )
  47. .await;
  48. let user_id = user_id.to_owned();
  49. let folder_id = folder_id.to_owned();
  50. Ok(Self {
  51. user_id,
  52. folder_id,
  53. folder,
  54. rev_manager,
  55. #[cfg(feature = "sync")]
  56. ws_manager,
  57. })
  58. }
  59. #[cfg(feature = "sync")]
  60. pub async fn receive_ws_data(&self, data: ServerRevisionWSData) -> FlowyResult<()> {
  61. let _ = self.ws_manager.ws_passthrough_tx.send(data).await.map_err(|e| {
  62. let err_msg = format!("{} passthrough error: {}", self.folder_id, e);
  63. FlowyError::internal().context(err_msg)
  64. })?;
  65. Ok(())
  66. }
  67. #[cfg(not(feature = "sync"))]
  68. pub async fn receive_ws_data(&self, _data: ServerRevisionWSData) -> FlowyResult<()> {
  69. Ok(())
  70. }
  71. pub(crate) fn apply_change(&self, change: FolderChange) -> FlowyResult<()> {
  72. let FolderChange { delta, md5 } = change;
  73. let (base_rev_id, rev_id) = self.rev_manager.next_rev_id_pair();
  74. let delta_data = delta.to_delta_bytes();
  75. let revision = Revision::new(
  76. &self.rev_manager.object_id,
  77. base_rev_id,
  78. rev_id,
  79. delta_data,
  80. &self.user_id,
  81. md5,
  82. );
  83. let _ = futures::executor::block_on(async {
  84. self.rev_manager
  85. .add_local_revision(&revision, Box::new(FolderRevisionCompactor()))
  86. .await
  87. })?;
  88. Ok(())
  89. }
  90. #[allow(dead_code)]
  91. pub fn folder_json(&self) -> FlowyResult<String> {
  92. let json = self.folder.read().to_json()?;
  93. Ok(json)
  94. }
  95. }
  96. struct FolderPadBuilder();
  97. impl RevisionObjectBuilder for FolderPadBuilder {
  98. type Output = FolderPad;
  99. fn build_object(_object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
  100. let pad = FolderPad::from_revisions(revisions)?;
  101. Ok(pad)
  102. }
  103. }
  104. struct FolderRevisionCloudService {
  105. #[allow(dead_code)]
  106. token: String,
  107. }
  108. impl RevisionCloudService for FolderRevisionCloudService {
  109. #[tracing::instrument(level = "trace", skip(self))]
  110. fn fetch_object(&self, _user_id: &str, _object_id: &str) -> FutureResult<Vec<Revision>, FlowyError> {
  111. FutureResult::new(async move { Ok(vec![]) })
  112. }
  113. }
  114. #[cfg(feature = "flowy_unit_test")]
  115. impl ClientFolderEditor {
  116. pub fn rev_manager(&self) -> Arc<RevisionManager> {
  117. self.rev_manager.clone()
  118. }
  119. }
  120. struct FolderRevisionCompactor();
  121. impl RevisionCompactor for FolderRevisionCompactor {
  122. fn bytes_from_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
  123. let delta = make_delta_from_revisions::<PlainTextAttributes>(revisions)?;
  124. Ok(delta.to_delta_bytes())
  125. }
  126. }