folder_editor.rs 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. use crate::services::web_socket::make_folder_ws_manager;
  2. use flowy_collaboration::{
  3. client_folder::{FolderChange, FolderPad},
  4. entities::{revision::Revision, ws_data::ServerRevisionWSData},
  5. };
  6. use crate::manager::FolderId;
  7. use bytes::Bytes;
  8. use flowy_collaboration::util::make_delta_from_revisions;
  9. use flowy_error::{FlowyError, FlowyResult};
  10. use flowy_sync::{
  11. RevisionCloudService, RevisionCompactor, RevisionManager, RevisionObjectBuilder, RevisionWebSocket,
  12. RevisionWebSocketManager,
  13. };
  14. use lib_infra::future::FutureResult;
  15. use lib_ot::core::PlainTextAttributes;
  16. use parking_lot::RwLock;
  17. use std::sync::Arc;
  18. pub struct ClientFolderEditor {
  19. user_id: String,
  20. pub(crate) folder_id: FolderId,
  21. pub(crate) folder: Arc<RwLock<FolderPad>>,
  22. rev_manager: Arc<RevisionManager>,
  23. ws_manager: Arc<RevisionWebSocketManager>,
  24. }
  25. impl ClientFolderEditor {
  26. pub async fn new(
  27. user_id: &str,
  28. folder_id: &FolderId,
  29. token: &str,
  30. mut rev_manager: RevisionManager,
  31. web_socket: Arc<dyn RevisionWebSocket>,
  32. ) -> FlowyResult<Self> {
  33. let cloud = Arc::new(FolderRevisionCloudService {
  34. token: token.to_string(),
  35. });
  36. let folder = Arc::new(RwLock::new(rev_manager.load::<FolderPadBuilder>(Some(cloud)).await?));
  37. let rev_manager = Arc::new(rev_manager);
  38. let ws_manager = make_folder_ws_manager(
  39. user_id,
  40. folder_id.as_ref(),
  41. rev_manager.clone(),
  42. web_socket,
  43. folder.clone(),
  44. )
  45. .await;
  46. let user_id = user_id.to_owned();
  47. let folder_id = folder_id.to_owned();
  48. Ok(Self {
  49. user_id,
  50. folder_id,
  51. folder,
  52. rev_manager,
  53. ws_manager,
  54. })
  55. }
  56. pub async fn receive_ws_data(&self, data: ServerRevisionWSData) -> FlowyResult<()> {
  57. let _ = self.ws_manager.ws_passthrough_tx.send(data).await.map_err(|e| {
  58. let err_msg = format!("{} passthrough error: {}", self.folder_id, e);
  59. FlowyError::internal().context(err_msg)
  60. })?;
  61. Ok(())
  62. }
  63. pub(crate) fn apply_change(&self, change: FolderChange) -> FlowyResult<()> {
  64. let FolderChange { delta, md5 } = change;
  65. let (base_rev_id, rev_id) = self.rev_manager.next_rev_id_pair();
  66. let delta_data = delta.to_delta_bytes();
  67. let revision = Revision::new(
  68. &self.rev_manager.object_id,
  69. base_rev_id,
  70. rev_id,
  71. delta_data,
  72. &self.user_id,
  73. md5,
  74. );
  75. let _ = futures::executor::block_on(async {
  76. self.rev_manager
  77. .add_local_revision(&revision, Box::new(FolderRevisionCompactor()))
  78. .await
  79. })?;
  80. Ok(())
  81. }
  82. #[allow(dead_code)]
  83. pub fn folder_json(&self) -> FlowyResult<String> {
  84. let json = self.folder.read().to_json()?;
  85. Ok(json)
  86. }
  87. }
  88. struct FolderPadBuilder();
  89. impl RevisionObjectBuilder for FolderPadBuilder {
  90. type Output = FolderPad;
  91. fn build_object(_object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
  92. let pad = FolderPad::from_revisions(revisions)?;
  93. Ok(pad)
  94. }
  95. }
  96. struct FolderRevisionCloudService {
  97. #[allow(dead_code)]
  98. token: String,
  99. }
  100. impl RevisionCloudService for FolderRevisionCloudService {
  101. #[tracing::instrument(level = "trace", skip(self))]
  102. fn fetch_object(&self, _user_id: &str, _object_id: &str) -> FutureResult<Vec<Revision>, FlowyError> {
  103. FutureResult::new(async move { Ok(vec![]) })
  104. }
  105. }
  106. #[cfg(feature = "flowy_unit_test")]
  107. impl ClientFolderEditor {
  108. pub fn rev_manager(&self) -> Arc<RevisionManager> {
  109. self.rev_manager.clone()
  110. }
  111. }
  112. struct FolderRevisionCompactor();
  113. impl RevisionCompactor for FolderRevisionCompactor {
  114. fn bytes_from_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
  115. let delta = make_delta_from_revisions::<PlainTextAttributes>(revisions)?;
  116. Ok(delta.to_delta_bytes())
  117. }
  118. }