folder_editor.rs 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. use crate::manager::FolderId;
  2. use bytes::Bytes;
  3. use flowy_client_sync::client_folder::{FolderChangeset, FolderOperations, FolderPad};
  4. use flowy_client_sync::make_operations_from_revisions;
  5. use flowy_client_sync::util::recover_operation_from_revisions;
  6. use flowy_error::{FlowyError, FlowyResult};
  7. use flowy_revision::{
  8. RevisionCloudService, RevisionManager, RevisionMergeable, RevisionObjectDeserializer,
  9. RevisionObjectSerializer, RevisionWebSocket,
  10. };
  11. use flowy_sqlite::ConnectionPool;
  12. use lib_infra::future::FutureResult;
  13. use lib_ot::core::EmptyAttributes;
  14. use parking_lot::RwLock;
  15. use revision_model::Revision;
  16. use std::sync::Arc;
  17. use ws_model::ws_revision::ServerRevisionWSData;
  18. pub struct FolderEditor {
  19. #[allow(dead_code)]
  20. user_id: String,
  21. #[allow(dead_code)]
  22. folder_id: FolderId,
  23. pub(crate) folder: Arc<RwLock<FolderPad>>,
  24. rev_manager: Arc<RevisionManager<Arc<ConnectionPool>>>,
  25. #[cfg(feature = "sync")]
  26. ws_manager: Arc<flowy_revision::RevisionWebSocketManager>,
  27. }
  28. impl FolderEditor {
  29. #[allow(unused_variables)]
  30. pub async fn new(
  31. user_id: &str,
  32. folder_id: &FolderId,
  33. token: &str,
  34. mut rev_manager: RevisionManager<Arc<ConnectionPool>>,
  35. web_socket: Arc<dyn RevisionWebSocket>,
  36. ) -> FlowyResult<Self> {
  37. let cloud = Arc::new(FolderRevisionCloudService {
  38. token: token.to_string(),
  39. });
  40. let folder = Arc::new(RwLock::new(
  41. rev_manager
  42. .initialize::<FolderRevisionSerde>(Some(cloud))
  43. .await?,
  44. ));
  45. let rev_manager = Arc::new(rev_manager);
  46. #[cfg(feature = "sync")]
  47. let ws_manager = crate::services::web_socket::make_folder_ws_manager(
  48. user_id,
  49. folder_id.as_ref(),
  50. rev_manager.clone(),
  51. web_socket,
  52. folder.clone(),
  53. )
  54. .await;
  55. let user_id = user_id.to_owned();
  56. let folder_id = folder_id.to_owned();
  57. Ok(Self {
  58. user_id,
  59. folder_id,
  60. folder,
  61. rev_manager,
  62. #[cfg(feature = "sync")]
  63. ws_manager,
  64. })
  65. }
  66. #[cfg(feature = "sync")]
  67. pub async fn receive_ws_data(&self, data: ServerRevisionWSData) -> FlowyResult<()> {
  68. let _ = self
  69. .ws_manager
  70. .ws_passthrough_tx
  71. .send(data)
  72. .await
  73. .map_err(|e| {
  74. let err_msg = format!("{} passthrough error: {}", self.folder_id, e);
  75. FlowyError::internal().context(err_msg)
  76. })?;
  77. Ok(())
  78. }
  79. #[cfg(not(feature = "sync"))]
  80. pub async fn receive_ws_data(&self, _data: ServerRevisionWSData) -> FlowyResult<()> {
  81. Ok(())
  82. }
  83. pub(crate) fn apply_change(&self, change: FolderChangeset) -> FlowyResult<()> {
  84. let FolderChangeset {
  85. operations: delta,
  86. md5,
  87. } = change;
  88. let delta_data = delta.json_bytes();
  89. let rev_manager = self.rev_manager.clone();
  90. tokio::spawn(async move {
  91. let _ = rev_manager.add_local_revision(delta_data, md5).await;
  92. });
  93. Ok(())
  94. }
  95. #[allow(dead_code)]
  96. pub fn folder_json(&self) -> FlowyResult<String> {
  97. let json = self.folder.read().to_json()?;
  98. Ok(json)
  99. }
  100. }
  101. struct FolderRevisionSerde();
  102. impl RevisionObjectDeserializer for FolderRevisionSerde {
  103. type Output = FolderPad;
  104. fn deserialize_revisions(
  105. _object_id: &str,
  106. revisions: Vec<Revision>,
  107. ) -> FlowyResult<Self::Output> {
  108. let operations: FolderOperations = make_operations_from_revisions(revisions)?;
  109. Ok(FolderPad::from_operations(operations)?)
  110. }
  111. fn recover_from_revisions(revisions: Vec<Revision>) -> Option<(Self::Output, i64)> {
  112. if let Some((operations, rev_id)) = recover_operation_from_revisions(revisions, |operations| {
  113. FolderPad::from_operations(operations.clone()).is_ok()
  114. }) {
  115. if let Ok(pad) = FolderPad::from_operations(operations) {
  116. return Some((pad, rev_id));
  117. }
  118. }
  119. None
  120. }
  121. }
  122. impl RevisionObjectSerializer for FolderRevisionSerde {
  123. fn combine_revisions(revisions: Vec<Revision>) -> FlowyResult<Bytes> {
  124. let operations = make_operations_from_revisions::<EmptyAttributes>(revisions)?;
  125. Ok(operations.json_bytes())
  126. }
  127. }
  128. pub struct FolderRevisionMergeable();
  129. impl RevisionMergeable for FolderRevisionMergeable {
  130. fn combine_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
  131. FolderRevisionSerde::combine_revisions(revisions)
  132. }
  133. }
  134. struct FolderRevisionCloudService {
  135. #[allow(dead_code)]
  136. token: String,
  137. }
  138. impl RevisionCloudService for FolderRevisionCloudService {
  139. #[tracing::instrument(level = "trace", skip(self))]
  140. fn fetch_object(
  141. &self,
  142. _user_id: &str,
  143. _object_id: &str,
  144. ) -> FutureResult<Vec<Revision>, FlowyError> {
  145. FutureResult::new(async move { Ok(vec![]) })
  146. }
  147. }
  148. #[cfg(feature = "flowy_unit_test")]
  149. impl FolderEditor {
  150. pub fn rev_manager(&self) -> Arc<RevisionManager<Arc<ConnectionPool>>> {
  151. self.rev_manager.clone()
  152. }
  153. }