ws_receiver.rs 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. use crate::services::{
  2. document::{
  3. persistence::{create_document, read_document},
  4. ws_actor::{DocumentWebSocketActor, WSActorMessage},
  5. },
  6. web_socket::{WSClientData, WebSocketReceiver},
  7. };
  8. use crate::context::FlowyPersistence;
  9. use backend_service::errors::ServerError;
  10. use flowy_collaboration::{
  11. entities::{
  12. doc::{CreateDocParams, DocumentInfo},
  13. revision::{RepeatedRevision, Revision},
  14. },
  15. errors::CollaborateError,
  16. protobuf::DocIdentifier,
  17. };
  18. use lib_infra::future::BoxResultFuture;
  19. use flowy_collaboration::sync::{DocumentPersistence, ServerDocumentManager};
  20. use std::{
  21. convert::TryInto,
  22. fmt::{Debug, Formatter},
  23. sync::Arc,
  24. };
  25. use tokio::sync::{mpsc, oneshot};
  26. pub fn make_document_ws_receiver(persistence: Arc<FlowyPersistence>) -> Arc<DocumentWebSocketReceiver> {
  27. let document_persistence = Arc::new(DocumentPersistenceImpl(persistence.clone()));
  28. let document_manager = Arc::new(ServerDocumentManager::new(document_persistence));
  29. let (ws_sender, rx) = tokio::sync::mpsc::channel(100);
  30. let actor = DocumentWebSocketActor::new(rx, document_manager);
  31. tokio::task::spawn(actor.run());
  32. Arc::new(DocumentWebSocketReceiver::new(persistence, ws_sender))
  33. }
  34. pub struct DocumentWebSocketReceiver {
  35. ws_sender: mpsc::Sender<WSActorMessage>,
  36. persistence: Arc<FlowyPersistence>,
  37. }
  38. impl DocumentWebSocketReceiver {
  39. pub fn new(persistence: Arc<FlowyPersistence>, ws_sender: mpsc::Sender<WSActorMessage>) -> Self {
  40. Self { ws_sender, persistence }
  41. }
  42. }
  43. impl WebSocketReceiver for DocumentWebSocketReceiver {
  44. fn receive(&self, data: WSClientData) {
  45. let (ret, rx) = oneshot::channel();
  46. let sender = self.ws_sender.clone();
  47. let persistence = self.persistence.clone();
  48. actix_rt::spawn(async move {
  49. let msg = WSActorMessage::ClientData {
  50. client_data: data,
  51. ret,
  52. persistence,
  53. };
  54. match sender.send(msg).await {
  55. Ok(_) => {},
  56. Err(e) => log::error!("{}", e),
  57. }
  58. match rx.await {
  59. Ok(_) => {},
  60. Err(e) => log::error!("{:?}", e),
  61. };
  62. });
  63. }
  64. }
  65. struct DocumentPersistenceImpl(Arc<FlowyPersistence>);
  66. impl Debug for DocumentPersistenceImpl {
  67. fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("DocumentPersistenceImpl") }
  68. }
  69. impl DocumentPersistence for DocumentPersistenceImpl {
  70. fn read_doc(&self, doc_id: &str) -> BoxResultFuture<DocumentInfo, CollaborateError> {
  71. let params = DocIdentifier {
  72. doc_id: doc_id.to_string(),
  73. ..Default::default()
  74. };
  75. let persistence = self.0.kv_store();
  76. Box::pin(async move {
  77. let mut pb_doc = read_document(&persistence, params)
  78. .await
  79. .map_err(server_error_to_collaborate_error)?;
  80. let doc = (&mut pb_doc)
  81. .try_into()
  82. .map_err(|e| CollaborateError::internal().context(e))?;
  83. Ok(doc)
  84. })
  85. }
  86. fn create_doc(&self, doc_id: &str, revisions: Vec<Revision>) -> BoxResultFuture<DocumentInfo, CollaborateError> {
  87. let kv_store = self.0.kv_store();
  88. let doc_id = doc_id.to_owned();
  89. Box::pin(async move {
  90. let doc = DocumentInfo::from_revisions(&doc_id, revisions.clone())?;
  91. let doc_id = doc_id.to_owned();
  92. let revisions = RepeatedRevision::new(revisions);
  93. let params = CreateDocParams { id: doc_id, revisions };
  94. let pb_params: flowy_collaboration::protobuf::CreateDocParams = params.try_into().unwrap();
  95. let _ = create_document(&kv_store, pb_params)
  96. .await
  97. .map_err(server_error_to_collaborate_error)?;
  98. Ok(doc)
  99. })
  100. }
  101. fn get_revisions(&self, doc_id: &str, rev_ids: Vec<i64>) -> BoxResultFuture<Vec<Revision>, CollaborateError> {
  102. let kv_store = self.0.kv_store();
  103. let doc_id = doc_id.to_owned();
  104. let f = || async move {
  105. let mut pb = kv_store.batch_get_revisions(&doc_id, rev_ids).await?;
  106. let repeated_revision: RepeatedRevision = (&mut pb).try_into()?;
  107. let revisions = repeated_revision.into_inner();
  108. Ok(revisions)
  109. };
  110. Box::pin(async move { f().await.map_err(server_error_to_collaborate_error) })
  111. }
  112. fn get_doc_revisions(&self, doc_id: &str) -> BoxResultFuture<Vec<Revision>, CollaborateError> {
  113. let kv_store = self.0.kv_store();
  114. let doc_id = doc_id.to_owned();
  115. let f = || async move {
  116. let mut pb = kv_store.get_doc_revisions(&doc_id).await?;
  117. let repeated_revision: RepeatedRevision = (&mut pb).try_into()?;
  118. let revisions = repeated_revision.into_inner();
  119. Ok(revisions)
  120. };
  121. Box::pin(async move { f().await.map_err(server_error_to_collaborate_error) })
  122. }
  123. }
  124. fn server_error_to_collaborate_error(error: ServerError) -> CollaborateError {
  125. if error.is_record_not_found() {
  126. CollaborateError::record_not_found()
  127. } else {
  128. CollaborateError::internal().context(error)
  129. }
  130. }