ws_receiver.rs 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. use crate::{
  2. context::FlowyPersistence,
  3. services::{
  4. folder::ws_actor::{FolderWSActorMessage, FolderWebSocketActor},
  5. web_socket::{WSClientData, WebSocketReceiver},
  6. },
  7. };
  8. use std::fmt::{Debug, Formatter};
  9. use crate::{context::FolderRevisionKV, services::kv::revision_kv::revisions_to_key_value_items};
  10. use flowy_collaboration::{
  11. entities::folder_info::FolderInfo,
  12. errors::CollaborateError,
  13. protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB},
  14. server_folder::{FolderCloudPersistence, ServerFolderManager},
  15. util::make_folder_from_revisions_pb,
  16. };
  17. use lib_infra::future::BoxResultFuture;
  18. use std::sync::Arc;
  19. use tokio::sync::{mpsc, oneshot};
  20. pub fn make_folder_ws_receiver(
  21. persistence: Arc<FlowyPersistence>,
  22. folder_manager: Arc<ServerFolderManager>,
  23. ) -> Arc<FolderWebSocketReceiver> {
  24. let (actor_msg_sender, rx) = tokio::sync::mpsc::channel(1000);
  25. let actor = FolderWebSocketActor::new(rx, folder_manager);
  26. tokio::task::spawn(actor.run());
  27. Arc::new(FolderWebSocketReceiver::new(persistence, actor_msg_sender))
  28. }
  29. pub struct FolderWebSocketReceiver {
  30. actor_msg_sender: mpsc::Sender<FolderWSActorMessage>,
  31. persistence: Arc<FlowyPersistence>,
  32. }
  33. impl FolderWebSocketReceiver {
  34. pub fn new(persistence: Arc<FlowyPersistence>, actor_msg_sender: mpsc::Sender<FolderWSActorMessage>) -> Self {
  35. Self {
  36. actor_msg_sender,
  37. persistence,
  38. }
  39. }
  40. }
  41. impl WebSocketReceiver for FolderWebSocketReceiver {
  42. fn receive(&self, data: WSClientData) {
  43. let (ret, rx) = oneshot::channel();
  44. let actor_msg_sender = self.actor_msg_sender.clone();
  45. let persistence = self.persistence.clone();
  46. actix_rt::spawn(async move {
  47. let msg = FolderWSActorMessage::ClientData {
  48. client_data: data,
  49. persistence,
  50. ret,
  51. };
  52. match actor_msg_sender.send(msg).await {
  53. Ok(_) => {}
  54. Err(e) => {
  55. log::error!("[FolderWebSocketReceiver]: send message to actor failed: {}", e);
  56. }
  57. }
  58. match rx.await {
  59. Ok(_) => {}
  60. Err(e) => log::error!("[FolderWebSocketReceiver]: message ret failed {:?}", e),
  61. };
  62. });
  63. }
  64. }
  65. pub struct HttpFolderCloudPersistence(pub Arc<FolderRevisionKV>);
  66. impl Debug for HttpFolderCloudPersistence {
  67. fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
  68. f.write_str("HttpFolderCloudPersistence")
  69. }
  70. }
  71. impl FolderCloudPersistence for HttpFolderCloudPersistence {
  72. fn read_folder(&self, _user_id: &str, folder_id: &str) -> BoxResultFuture<FolderInfo, CollaborateError> {
  73. let folder_store = self.0.clone();
  74. let folder_id = folder_id.to_owned();
  75. Box::pin(async move {
  76. let revisions = folder_store
  77. .get_revisions(&folder_id, None)
  78. .await
  79. .map_err(|e| e.to_collaborate_error())?;
  80. match make_folder_from_revisions_pb(&folder_id, revisions)? {
  81. Some(folder_info) => Ok(folder_info),
  82. None => Err(CollaborateError::record_not_found().context(format!("{} not exist", folder_id))),
  83. }
  84. })
  85. }
  86. fn create_folder(
  87. &self,
  88. _user_id: &str,
  89. folder_id: &str,
  90. mut repeated_revision: RepeatedRevisionPB,
  91. ) -> BoxResultFuture<Option<FolderInfo>, CollaborateError> {
  92. let folder_store = self.0.clone();
  93. let folder_id = folder_id.to_owned();
  94. Box::pin(async move {
  95. let folder_info = make_folder_from_revisions_pb(&folder_id, repeated_revision.clone())?;
  96. let revisions: Vec<RevisionPB> = repeated_revision.take_items().into();
  97. let _ = folder_store
  98. .set_revision(revisions)
  99. .await
  100. .map_err(|e| e.to_collaborate_error())?;
  101. Ok(folder_info)
  102. })
  103. }
  104. fn save_folder_revisions(
  105. &self,
  106. mut repeated_revision: RepeatedRevisionPB,
  107. ) -> BoxResultFuture<(), CollaborateError> {
  108. let folder_store = self.0.clone();
  109. Box::pin(async move {
  110. let revisions: Vec<RevisionPB> = repeated_revision.take_items().into();
  111. let _ = folder_store
  112. .set_revision(revisions)
  113. .await
  114. .map_err(|e| e.to_collaborate_error())?;
  115. Ok(())
  116. })
  117. }
  118. fn read_folder_revisions(
  119. &self,
  120. folder_id: &str,
  121. rev_ids: Option<Vec<i64>>,
  122. ) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError> {
  123. let folder_store = self.0.clone();
  124. let folder_id = folder_id.to_owned();
  125. Box::pin(async move {
  126. let mut repeated_revision = folder_store
  127. .get_revisions(&folder_id, rev_ids)
  128. .await
  129. .map_err(|e| e.to_collaborate_error())?;
  130. let revisions: Vec<RevisionPB> = repeated_revision.take_items().into();
  131. Ok(revisions)
  132. })
  133. }
  134. fn reset_folder(
  135. &self,
  136. folder_id: &str,
  137. mut repeated_revision: RepeatedRevisionPB,
  138. ) -> BoxResultFuture<(), CollaborateError> {
  139. let folder_store = self.0.clone();
  140. let folder_id = folder_id.to_owned();
  141. Box::pin(async move {
  142. let _ = folder_store
  143. .transaction(|mut transaction| {
  144. Box::pin(async move {
  145. let _ = transaction.batch_delete_key_start_with(&folder_id).await?;
  146. let items = revisions_to_key_value_items(repeated_revision.take_items().into())?;
  147. let _ = transaction.batch_set(items).await?;
  148. Ok(())
  149. })
  150. })
  151. .await
  152. .map_err(|e| e.to_collaborate_error())?;
  153. Ok(())
  154. })
  155. }
  156. }