ws_receiver.rs 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. use crate::{
  2. context::FlowyPersistence,
  3. services::{
  4. document::{
  5. persistence::{create_document, read_document, revisions_to_key_value_items, DocumentKVPersistence},
  6. ws_actor::{DocumentWebSocketActor, WSActorMessage},
  7. },
  8. web_socket::{WSClientData, WebSocketReceiver},
  9. },
  10. };
  11. use backend_service::errors::ServerError;
  12. use flowy_collaboration::{
  13. entities::doc::DocumentInfo,
  14. errors::CollaborateError,
  15. protobuf::{
  16. CreateDocParams as CreateDocParamsPB,
  17. DocumentId,
  18. RepeatedRevision as RepeatedRevisionPB,
  19. Revision as RevisionPB,
  20. },
  21. server_document::{DocumentCloudPersistence, ServerDocumentManager},
  22. util::repeated_revision_from_repeated_revision_pb,
  23. };
  24. use lib_infra::future::BoxResultFuture;
  25. use std::{
  26. convert::TryInto,
  27. fmt::{Debug, Formatter},
  28. sync::Arc,
  29. };
  30. use tokio::sync::{mpsc, oneshot};
  31. pub fn make_document_ws_receiver(
  32. persistence: Arc<FlowyPersistence>,
  33. document_manager: Arc<ServerDocumentManager>,
  34. ) -> Arc<DocumentWebSocketReceiver> {
  35. let (ws_sender, rx) = tokio::sync::mpsc::channel(100);
  36. let actor = DocumentWebSocketActor::new(rx, document_manager);
  37. tokio::task::spawn(actor.run());
  38. Arc::new(DocumentWebSocketReceiver::new(persistence, ws_sender))
  39. }
  40. pub struct DocumentWebSocketReceiver {
  41. ws_sender: mpsc::Sender<WSActorMessage>,
  42. persistence: Arc<FlowyPersistence>,
  43. }
  44. impl DocumentWebSocketReceiver {
  45. pub fn new(persistence: Arc<FlowyPersistence>, ws_sender: mpsc::Sender<WSActorMessage>) -> Self {
  46. Self { ws_sender, persistence }
  47. }
  48. }
  49. impl WebSocketReceiver for DocumentWebSocketReceiver {
  50. fn receive(&self, data: WSClientData) {
  51. let (ret, rx) = oneshot::channel();
  52. let sender = self.ws_sender.clone();
  53. let persistence = self.persistence.clone();
  54. actix_rt::spawn(async move {
  55. let msg = WSActorMessage::ClientData {
  56. client_data: data,
  57. persistence,
  58. ret,
  59. };
  60. match sender.send(msg).await {
  61. Ok(_) => {},
  62. Err(e) => log::error!("{}", e),
  63. }
  64. match rx.await {
  65. Ok(_) => {},
  66. Err(e) => log::error!("{:?}", e),
  67. };
  68. });
  69. }
  70. }
  71. pub struct HttpDocumentCloudPersistence(pub Arc<DocumentKVPersistence>);
  72. impl Debug for HttpDocumentCloudPersistence {
  73. fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("DocumentPersistenceImpl") }
  74. }
  75. impl DocumentCloudPersistence for HttpDocumentCloudPersistence {
  76. fn enable_sync(&self) -> bool { true }
  77. fn read_document(&self, doc_id: &str) -> BoxResultFuture<DocumentInfo, CollaborateError> {
  78. let params = DocumentId {
  79. doc_id: doc_id.to_string(),
  80. ..Default::default()
  81. };
  82. let kv_store = self.0.clone();
  83. Box::pin(async move {
  84. let mut pb_doc = read_document(&kv_store, params)
  85. .await
  86. .map_err(server_error_to_collaborate_error)?;
  87. let doc = (&mut pb_doc)
  88. .try_into()
  89. .map_err(|e| CollaborateError::internal().context(e))?;
  90. Ok(doc)
  91. })
  92. }
  93. fn create_document(
  94. &self,
  95. doc_id: &str,
  96. repeated_revision: RepeatedRevisionPB,
  97. ) -> BoxResultFuture<DocumentInfo, CollaborateError> {
  98. let kv_store = self.0.clone();
  99. let doc_id = doc_id.to_owned();
  100. Box::pin(async move {
  101. let revisions = repeated_revision_from_repeated_revision_pb(repeated_revision.clone())?.into_inner();
  102. let doc = DocumentInfo::from_revisions(&doc_id, revisions)?;
  103. let doc_id = doc_id.to_owned();
  104. let mut params = CreateDocParamsPB::new();
  105. params.set_id(doc_id);
  106. params.set_revisions(repeated_revision);
  107. let _ = create_document(&kv_store, params)
  108. .await
  109. .map_err(server_error_to_collaborate_error)?;
  110. Ok(doc)
  111. })
  112. }
  113. fn read_revisions(
  114. &self,
  115. doc_id: &str,
  116. rev_ids: Option<Vec<i64>>,
  117. ) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError> {
  118. let kv_store = self.0.clone();
  119. let doc_id = doc_id.to_owned();
  120. let f = || async move {
  121. let mut repeated_revision = kv_store.get_revisions(&doc_id, rev_ids).await?;
  122. Ok(repeated_revision.take_items().into())
  123. };
  124. Box::pin(async move { f().await.map_err(server_error_to_collaborate_error) })
  125. }
  126. fn save_revisions(&self, mut repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> {
  127. let kv_store = self.0.clone();
  128. let f = || async move {
  129. let revisions = repeated_revision.take_items().into();
  130. let _ = kv_store.set_revision(revisions).await?;
  131. Ok(())
  132. };
  133. Box::pin(async move { f().await.map_err(server_error_to_collaborate_error) })
  134. }
  135. fn reset_document(
  136. &self,
  137. doc_id: &str,
  138. mut repeated_revision: RepeatedRevisionPB,
  139. ) -> BoxResultFuture<(), CollaborateError> {
  140. let kv_store = self.0.clone();
  141. let doc_id = doc_id.to_owned();
  142. let f = || async move {
  143. kv_store
  144. .transaction(|mut transaction| {
  145. Box::pin(async move {
  146. let _ = transaction.batch_delete_key_start_with(&doc_id).await?;
  147. let items = revisions_to_key_value_items(repeated_revision.take_items().into())?;
  148. let _ = transaction.batch_set(items).await?;
  149. Ok(())
  150. })
  151. })
  152. .await
  153. };
  154. Box::pin(async move { f().await.map_err(server_error_to_collaborate_error) })
  155. }
  156. }
  157. fn server_error_to_collaborate_error(error: ServerError) -> CollaborateError {
  158. if error.is_record_not_found() {
  159. CollaborateError::record_not_found()
  160. } else {
  161. CollaborateError::internal().context(error)
  162. }
  163. }