folder_editor.rs 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. use crate::manager::FolderId;
  2. use bytes::Bytes;
  3. use flowy_error::{FlowyError, FlowyResult};
  4. use flowy_revision::{
  5. RevisionCloudService, RevisionManager, RevisionMergeable, RevisionObjectDeserializer, RevisionObjectSerializer,
  6. RevisionWebSocket,
  7. };
  8. use flowy_sync::util::make_operations_from_revisions;
  9. use flowy_sync::{
  10. client_folder::{FolderChangeset, FolderPad},
  11. entities::{revision::Revision, ws_data::ServerRevisionWSData},
  12. };
  13. use lib_infra::future::FutureResult;
  14. use flowy_database::ConnectionPool;
  15. use lib_ot::core::EmptyAttributes;
  16. use parking_lot::RwLock;
  17. use std::sync::Arc;
  18. pub struct FolderEditor {
  19. #[allow(dead_code)]
  20. user_id: String,
  21. pub(crate) folder: Arc<RwLock<FolderPad>>,
  22. rev_manager: Arc<RevisionManager<Arc<ConnectionPool>>>,
  23. #[cfg(feature = "sync")]
  24. ws_manager: Arc<flowy_revision::RevisionWebSocketManager>,
  25. }
  26. impl FolderEditor {
  27. #[allow(unused_variables)]
  28. pub async fn new(
  29. user_id: &str,
  30. folder_id: &FolderId,
  31. token: &str,
  32. mut rev_manager: RevisionManager<Arc<ConnectionPool>>,
  33. web_socket: Arc<dyn RevisionWebSocket>,
  34. ) -> FlowyResult<Self> {
  35. let cloud = Arc::new(FolderRevisionCloudService {
  36. token: token.to_string(),
  37. });
  38. let folder = Arc::new(RwLock::new(rev_manager.load::<FolderRevisionSerde>(Some(cloud)).await?));
  39. let rev_manager = Arc::new(rev_manager);
  40. #[cfg(feature = "sync")]
  41. let ws_manager = crate::services::web_socket::make_folder_ws_manager(
  42. user_id,
  43. folder_id.as_ref(),
  44. rev_manager.clone(),
  45. web_socket,
  46. folder.clone(),
  47. )
  48. .await;
  49. let user_id = user_id.to_owned();
  50. let folder_id = folder_id.to_owned();
  51. Ok(Self {
  52. user_id,
  53. folder,
  54. rev_manager,
  55. #[cfg(feature = "sync")]
  56. ws_manager,
  57. })
  58. }
  59. #[cfg(feature = "sync")]
  60. pub async fn receive_ws_data(&self, data: ServerRevisionWSData) -> FlowyResult<()> {
  61. let _ = self.ws_manager.ws_passthrough_tx.send(data).await.map_err(|e| {
  62. let err_msg = format!("{} passthrough error: {}", self.folder_id, e);
  63. FlowyError::internal().context(err_msg)
  64. })?;
  65. Ok(())
  66. }
  67. #[cfg(not(feature = "sync"))]
  68. pub async fn receive_ws_data(&self, _data: ServerRevisionWSData) -> FlowyResult<()> {
  69. Ok(())
  70. }
  71. pub(crate) fn apply_change(&self, change: FolderChangeset) -> FlowyResult<()> {
  72. let FolderChangeset { operations: delta, md5 } = change;
  73. let (base_rev_id, rev_id) = self.rev_manager.next_rev_id_pair();
  74. let delta_data = delta.json_bytes();
  75. let revision = Revision::new(&self.rev_manager.object_id, base_rev_id, rev_id, delta_data, md5);
  76. let _ = futures::executor::block_on(async { self.rev_manager.add_local_revision(&revision).await })?;
  77. Ok(())
  78. }
  79. #[allow(dead_code)]
  80. pub fn folder_json(&self) -> FlowyResult<String> {
  81. let json = self.folder.read().to_json()?;
  82. Ok(json)
  83. }
  84. }
  85. struct FolderRevisionSerde();
  86. impl RevisionObjectDeserializer for FolderRevisionSerde {
  87. type Output = FolderPad;
  88. fn deserialize_revisions(_object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
  89. let pad = FolderPad::from_revisions(revisions)?;
  90. Ok(pad)
  91. }
  92. }
  93. impl RevisionObjectSerializer for FolderRevisionSerde {
  94. fn combine_revisions(revisions: Vec<Revision>) -> FlowyResult<Bytes> {
  95. let operations = make_operations_from_revisions::<EmptyAttributes>(revisions)?;
  96. Ok(operations.json_bytes())
  97. }
  98. }
  99. pub struct FolderRevisionCompress();
  100. impl RevisionMergeable for FolderRevisionCompress {
  101. fn combine_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
  102. FolderRevisionSerde::combine_revisions(revisions)
  103. }
  104. }
  105. struct FolderRevisionCloudService {
  106. #[allow(dead_code)]
  107. token: String,
  108. }
  109. impl RevisionCloudService for FolderRevisionCloudService {
  110. #[tracing::instrument(level = "trace", skip(self))]
  111. fn fetch_object(&self, _user_id: &str, _object_id: &str) -> FutureResult<Vec<Revision>, FlowyError> {
  112. FutureResult::new(async move { Ok(vec![]) })
  113. }
  114. }
  115. #[cfg(feature = "flowy_unit_test")]
  116. impl FolderEditor {
  117. pub fn rev_manager(&self) -> Arc<RevisionManager<Arc<ConnectionPool>>> {
  118. self.rev_manager.clone()
  119. }
  120. }