folder_editor.rs 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. use crate::services::web_socket::make_folder_ws_manager;
  2. use flowy_collaboration::{
  3. client_folder::{FolderChange, FolderPad},
  4. entities::{revision::Revision, ws_data::ServerRevisionWSData},
  5. };
  6. use crate::controller::FolderId;
  7. use flowy_collaboration::util::make_delta_from_revisions;
  8. use flowy_error::{FlowyError, FlowyResult};
  9. use flowy_sync::{
  10. RevisionCache, RevisionCloudService, RevisionCompact, RevisionManager, RevisionObjectBuilder, RevisionWebSocket,
  11. RevisionWebSocketManager,
  12. };
  13. use lib_infra::future::FutureResult;
  14. use lib_ot::core::PlainAttributes;
  15. use lib_sqlite::ConnectionPool;
  16. use parking_lot::RwLock;
  17. use std::sync::Arc;
  18. pub struct FolderEditor {
  19. user_id: String,
  20. pub(crate) folder_id: FolderId,
  21. pub(crate) folder: Arc<RwLock<FolderPad>>,
  22. rev_manager: Arc<RevisionManager>,
  23. ws_manager: Arc<RevisionWebSocketManager>,
  24. }
  25. impl FolderEditor {
  26. pub async fn new(
  27. user_id: &str,
  28. folder_id: &FolderId,
  29. token: &str,
  30. pool: Arc<ConnectionPool>,
  31. web_socket: Arc<dyn RevisionWebSocket>,
  32. ) -> FlowyResult<Self> {
  33. let cache = Arc::new(RevisionCache::new(user_id, folder_id.as_ref(), pool));
  34. let mut rev_manager = RevisionManager::new(user_id, folder_id.as_ref(), cache);
  35. let cloud = Arc::new(FolderRevisionCloudServiceImpl {
  36. token: token.to_string(),
  37. });
  38. let folder = Arc::new(RwLock::new(
  39. rev_manager
  40. .load::<FolderPadBuilder, FolderRevisionCompact>(cloud)
  41. .await?,
  42. ));
  43. let rev_manager = Arc::new(rev_manager);
  44. let ws_manager = make_folder_ws_manager(
  45. user_id,
  46. folder_id.as_ref(),
  47. rev_manager.clone(),
  48. web_socket,
  49. folder.clone(),
  50. )
  51. .await;
  52. let user_id = user_id.to_owned();
  53. let folder_id = folder_id.to_owned();
  54. Ok(Self {
  55. user_id,
  56. folder_id,
  57. folder,
  58. rev_manager,
  59. ws_manager,
  60. })
  61. }
  62. pub async fn receive_ws_data(&self, data: ServerRevisionWSData) -> FlowyResult<()> {
  63. let _ = self.ws_manager.ws_passthrough_tx.send(data).await.map_err(|e| {
  64. let err_msg = format!("{} passthrough error: {}", self.folder_id, e);
  65. FlowyError::internal().context(err_msg)
  66. })?;
  67. Ok(())
  68. }
  69. pub(crate) fn apply_change(&self, change: FolderChange) -> FlowyResult<()> {
  70. let FolderChange { delta, md5 } = change;
  71. let (base_rev_id, rev_id) = self.rev_manager.next_rev_id_pair();
  72. let delta_data = delta.to_bytes();
  73. let revision = Revision::new(
  74. &self.rev_manager.object_id,
  75. base_rev_id,
  76. rev_id,
  77. delta_data,
  78. &self.user_id,
  79. md5,
  80. );
  81. let _ = futures::executor::block_on(async {
  82. self.rev_manager
  83. .add_local_revision::<FolderRevisionCompact>(&revision)
  84. .await
  85. })?;
  86. Ok(())
  87. }
  88. #[allow(dead_code)]
  89. pub fn folder_json(&self) -> FlowyResult<String> {
  90. let json = self.folder.read().to_json()?;
  91. Ok(json)
  92. }
  93. }
  94. struct FolderPadBuilder();
  95. impl RevisionObjectBuilder for FolderPadBuilder {
  96. type Output = FolderPad;
  97. fn build_with_revisions(_object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
  98. let pad = FolderPad::from_revisions(revisions)?;
  99. Ok(pad)
  100. }
  101. }
  102. struct FolderRevisionCloudServiceImpl {
  103. #[allow(dead_code)]
  104. token: String,
  105. }
  106. impl RevisionCloudService for FolderRevisionCloudServiceImpl {
  107. #[tracing::instrument(level = "trace", skip(self))]
  108. fn fetch_object(&self, _user_id: &str, _object_id: &str) -> FutureResult<Vec<Revision>, FlowyError> {
  109. FutureResult::new(async move { Ok(vec![]) })
  110. }
  111. }
  112. #[cfg(feature = "flowy_unit_test")]
  113. impl FolderEditor {
  114. pub fn rev_manager(&self) -> Arc<RevisionManager> {
  115. self.rev_manager.clone()
  116. }
  117. }
  118. struct FolderRevisionCompact();
  119. impl RevisionCompact for FolderRevisionCompact {
  120. fn compact_revisions(user_id: &str, object_id: &str, mut revisions: Vec<Revision>) -> FlowyResult<Revision> {
  121. if revisions.is_empty() {
  122. return Err(FlowyError::internal().context("Can't compact the empty folder's revisions"));
  123. }
  124. if revisions.len() == 1 {
  125. return Ok(revisions.pop().unwrap());
  126. }
  127. let first_revision = revisions.first().unwrap();
  128. let last_revision = revisions.last().unwrap();
  129. let (base_rev_id, rev_id) = first_revision.pair_rev_id();
  130. let md5 = last_revision.md5.clone();
  131. let delta = make_delta_from_revisions::<PlainAttributes>(revisions)?;
  132. let delta_data = delta.to_bytes();
  133. Ok(Revision::new(object_id, base_rev_id, rev_id, delta_data, user_id, md5))
  134. }
  135. }