folder_editor.rs 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. use crate::manager::FolderId;
  2. use bytes::Bytes;
  3. use flowy_database::ConnectionPool;
  4. use flowy_error::{FlowyError, FlowyResult};
  5. use flowy_http_model::revision::Revision;
  6. use flowy_http_model::ws_data::ServerRevisionWSData;
  7. use flowy_revision::{
  8. RevisionCloudService, RevisionManager, RevisionMergeable, RevisionObjectDeserializer, RevisionObjectSerializer,
  9. RevisionWebSocket,
  10. };
  11. use flowy_sync::client_folder::{FolderChangeset, FolderPad};
  12. use flowy_sync::util::make_operations_from_revisions;
  13. use lib_infra::future::FutureResult;
  14. use lib_ot::core::EmptyAttributes;
  15. use parking_lot::RwLock;
  16. use std::sync::Arc;
  17. pub struct FolderEditor {
  18. #[allow(dead_code)]
  19. user_id: String,
  20. #[allow(dead_code)]
  21. folder_id: FolderId,
  22. pub(crate) folder: Arc<RwLock<FolderPad>>,
  23. rev_manager: Arc<RevisionManager<Arc<ConnectionPool>>>,
  24. #[cfg(feature = "sync")]
  25. ws_manager: Arc<flowy_revision::RevisionWebSocketManager>,
  26. }
  27. impl FolderEditor {
  28. #[allow(unused_variables)]
  29. pub async fn new(
  30. user_id: &str,
  31. folder_id: &FolderId,
  32. token: &str,
  33. mut rev_manager: RevisionManager<Arc<ConnectionPool>>,
  34. web_socket: Arc<dyn RevisionWebSocket>,
  35. ) -> FlowyResult<Self> {
  36. let cloud = Arc::new(FolderRevisionCloudService {
  37. token: token.to_string(),
  38. });
  39. let folder = Arc::new(RwLock::new(
  40. rev_manager.initialize::<FolderRevisionSerde>(Some(cloud)).await?,
  41. ));
  42. let rev_manager = Arc::new(rev_manager);
  43. #[cfg(feature = "sync")]
  44. let ws_manager = crate::services::web_socket::make_folder_ws_manager(
  45. user_id,
  46. folder_id.as_ref(),
  47. rev_manager.clone(),
  48. web_socket,
  49. folder.clone(),
  50. )
  51. .await;
  52. let user_id = user_id.to_owned();
  53. let folder_id = folder_id.to_owned();
  54. Ok(Self {
  55. user_id,
  56. folder_id,
  57. folder,
  58. rev_manager,
  59. #[cfg(feature = "sync")]
  60. ws_manager,
  61. })
  62. }
  63. #[cfg(feature = "sync")]
  64. pub async fn receive_ws_data(&self, data: ServerRevisionWSData) -> FlowyResult<()> {
  65. let _ = self.ws_manager.ws_passthrough_tx.send(data).await.map_err(|e| {
  66. let err_msg = format!("{} passthrough error: {}", self.folder_id, e);
  67. FlowyError::internal().context(err_msg)
  68. })?;
  69. Ok(())
  70. }
  71. #[cfg(not(feature = "sync"))]
  72. pub async fn receive_ws_data(&self, _data: ServerRevisionWSData) -> FlowyResult<()> {
  73. Ok(())
  74. }
  75. pub(crate) fn apply_change(&self, change: FolderChangeset) -> FlowyResult<()> {
  76. let FolderChangeset { operations: delta, md5 } = change;
  77. let delta_data = delta.json_bytes();
  78. let rev_manager = self.rev_manager.clone();
  79. tokio::spawn(async move {
  80. let _ = rev_manager.add_local_revision(delta_data, md5).await;
  81. });
  82. Ok(())
  83. }
  84. #[allow(dead_code)]
  85. pub fn folder_json(&self) -> FlowyResult<String> {
  86. let json = self.folder.read().to_json()?;
  87. Ok(json)
  88. }
  89. }
  90. struct FolderRevisionSerde();
  91. impl RevisionObjectDeserializer for FolderRevisionSerde {
  92. type Output = FolderPad;
  93. fn deserialize_revisions(_object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
  94. let pad = FolderPad::from_revisions(revisions)?;
  95. Ok(pad)
  96. }
  97. }
  98. impl RevisionObjectSerializer for FolderRevisionSerde {
  99. fn combine_revisions(revisions: Vec<Revision>) -> FlowyResult<Bytes> {
  100. let operations = make_operations_from_revisions::<EmptyAttributes>(revisions)?;
  101. Ok(operations.json_bytes())
  102. }
  103. }
  104. pub struct FolderRevisionMergeable();
  105. impl RevisionMergeable for FolderRevisionMergeable {
  106. fn combine_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
  107. FolderRevisionSerde::combine_revisions(revisions)
  108. }
  109. }
  110. struct FolderRevisionCloudService {
  111. #[allow(dead_code)]
  112. token: String,
  113. }
  114. impl RevisionCloudService for FolderRevisionCloudService {
  115. #[tracing::instrument(level = "trace", skip(self))]
  116. fn fetch_object(&self, _user_id: &str, _object_id: &str) -> FutureResult<Vec<Revision>, FlowyError> {
  117. FutureResult::new(async move { Ok(vec![]) })
  118. }
  119. }
  120. #[cfg(feature = "flowy_unit_test")]
  121. impl FolderEditor {
  122. pub fn rev_manager(&self) -> Arc<RevisionManager<Arc<ConnectionPool>>> {
  123. self.rev_manager.clone()
  124. }
  125. }