ws_receiver.rs 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. use crate::{
  2. context::{DocumentRevisionKV, FlowyPersistence},
  3. services::{
  4. document::{
  5. persistence::{create_document, read_document},
  6. ws_actor::{DocumentWSActorMessage, DocumentWebSocketActor},
  7. },
  8. kv::revision_kv::revisions_to_key_value_items,
  9. web_socket::{WSClientData, WebSocketReceiver},
  10. },
  11. };
  12. use backend_service::errors::ServerError;
  13. use flowy_collaboration::{
  14. entities::document_info::DocumentInfo,
  15. errors::CollaborateError,
  16. protobuf::{
  17. CreateDocParams as CreateDocParamsPB, DocumentId, RepeatedRevision as RepeatedRevisionPB,
  18. Revision as RevisionPB,
  19. },
  20. server_document::{DocumentCloudPersistence, ServerDocumentManager},
  21. util::make_document_info_from_revisions_pb,
  22. };
  23. use lib_infra::future::BoxResultFuture;
  24. use std::{
  25. convert::TryInto,
  26. fmt::{Debug, Formatter},
  27. sync::Arc,
  28. };
  29. use tokio::sync::{mpsc, oneshot};
  30. pub fn make_document_ws_receiver(
  31. persistence: Arc<FlowyPersistence>,
  32. document_manager: Arc<ServerDocumentManager>,
  33. ) -> Arc<DocumentWebSocketReceiver> {
  34. let (actor_msg_sender, rx) = tokio::sync::mpsc::channel(1000);
  35. let actor = DocumentWebSocketActor::new(rx, document_manager);
  36. tokio::task::spawn(actor.run());
  37. Arc::new(DocumentWebSocketReceiver::new(persistence, actor_msg_sender))
  38. }
  39. pub struct DocumentWebSocketReceiver {
  40. actor_msg_sender: mpsc::Sender<DocumentWSActorMessage>,
  41. persistence: Arc<FlowyPersistence>,
  42. }
  43. impl DocumentWebSocketReceiver {
  44. pub fn new(persistence: Arc<FlowyPersistence>, actor_msg_sender: mpsc::Sender<DocumentWSActorMessage>) -> Self {
  45. Self {
  46. actor_msg_sender,
  47. persistence,
  48. }
  49. }
  50. }
  51. impl WebSocketReceiver for DocumentWebSocketReceiver {
  52. fn receive(&self, data: WSClientData) {
  53. let (ret, rx) = oneshot::channel();
  54. let actor_msg_sender = self.actor_msg_sender.clone();
  55. let persistence = self.persistence.clone();
  56. actix_rt::spawn(async move {
  57. let msg = DocumentWSActorMessage::ClientData {
  58. client_data: data,
  59. persistence,
  60. ret,
  61. };
  62. match actor_msg_sender.send(msg).await {
  63. Ok(_) => {}
  64. Err(e) => tracing::error!("[DocumentWebSocketReceiver]: send message to actor failed: {}", e),
  65. }
  66. match rx.await {
  67. Ok(_) => {}
  68. Err(e) => tracing::error!("[DocumentWebSocketReceiver]: message ret failed {:?}", e),
  69. };
  70. });
  71. }
  72. }
  73. pub struct HttpDocumentCloudPersistence(pub Arc<DocumentRevisionKV>);
  74. impl Debug for HttpDocumentCloudPersistence {
  75. fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
  76. f.write_str("HttpDocumentCloudPersistence")
  77. }
  78. }
  79. impl DocumentCloudPersistence for HttpDocumentCloudPersistence {
  80. fn read_document(&self, doc_id: &str) -> BoxResultFuture<DocumentInfo, CollaborateError> {
  81. let params = DocumentId {
  82. doc_id: doc_id.to_string(),
  83. ..Default::default()
  84. };
  85. let document_store = self.0.clone();
  86. Box::pin(async move {
  87. let mut pb_doc = read_document(&document_store, params)
  88. .await
  89. .map_err(|e| e.to_collaborate_error())?;
  90. let doc = (&mut pb_doc)
  91. .try_into()
  92. .map_err(|e| CollaborateError::internal().context(e))?;
  93. Ok(doc)
  94. })
  95. }
  96. fn create_document(
  97. &self,
  98. doc_id: &str,
  99. repeated_revision: RepeatedRevisionPB,
  100. ) -> BoxResultFuture<Option<DocumentInfo>, CollaborateError> {
  101. let document_store = self.0.clone();
  102. let doc_id = doc_id.to_owned();
  103. Box::pin(async move {
  104. let document_info = make_document_info_from_revisions_pb(&doc_id, repeated_revision.clone())?;
  105. let doc_id = doc_id.to_owned();
  106. let mut params = CreateDocParamsPB::new();
  107. params.set_id(doc_id);
  108. params.set_revisions(repeated_revision);
  109. let _ = create_document(&document_store, params)
  110. .await
  111. .map_err(|e| e.to_collaborate_error())?;
  112. Ok(document_info)
  113. })
  114. }
  115. fn read_document_revisions(
  116. &self,
  117. doc_id: &str,
  118. rev_ids: Option<Vec<i64>>,
  119. ) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError> {
  120. let document_store = self.0.clone();
  121. let doc_id = doc_id.to_owned();
  122. let f = || async move {
  123. let mut repeated_revision = document_store.get_revisions(&doc_id, rev_ids).await?;
  124. Ok::<Vec<RevisionPB>, ServerError>(repeated_revision.take_items().into())
  125. };
  126. Box::pin(async move { f().await.map_err(|e| e.to_collaborate_error()) })
  127. }
  128. fn save_document_revisions(
  129. &self,
  130. mut repeated_revision: RepeatedRevisionPB,
  131. ) -> BoxResultFuture<(), CollaborateError> {
  132. let document_store = self.0.clone();
  133. let f = || async move {
  134. let revisions = repeated_revision.take_items().into();
  135. let _ = document_store.set_revision(revisions).await?;
  136. Ok::<(), ServerError>(())
  137. };
  138. Box::pin(async move { f().await.map_err(|e| e.to_collaborate_error()) })
  139. }
  140. fn reset_document(
  141. &self,
  142. doc_id: &str,
  143. mut repeated_revision: RepeatedRevisionPB,
  144. ) -> BoxResultFuture<(), CollaborateError> {
  145. let document_store = self.0.clone();
  146. let doc_id = doc_id.to_owned();
  147. let f = || async move {
  148. document_store
  149. .transaction(|mut transaction| {
  150. Box::pin(async move {
  151. let _ = transaction.batch_delete_key_start_with(&doc_id).await?;
  152. let items = revisions_to_key_value_items(repeated_revision.take_items().into())?;
  153. let _ = transaction.batch_set(items).await?;
  154. Ok(())
  155. })
  156. })
  157. .await
  158. };
  159. Box::pin(async move { f().await.map_err(|e| e.to_collaborate_error()) })
  160. }
  161. }