controller.rs 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. use crate::services::{
  2. document::{
  3. persistence::{create_doc, read_doc, update_doc},
  4. ws_actor::{DocumentWebSocketActor, WSActorMessage},
  5. },
  6. web_socket::{WSClientData, WebSocketReceiver},
  7. };
  8. use backend_service::errors::ServerError;
  9. use crate::context::FlowyPersistence;
  10. use flowy_collaboration::{
  11. core::sync::{DocumentPersistence, ServerDocumentManager},
  12. entities::doc::Doc,
  13. errors::CollaborateError,
  14. protobuf::{CreateDocParams, DocIdentifier, UpdateDocParams},
  15. };
  16. use lib_infra::future::FutureResultSend;
  17. use lib_ot::{revision::Revision, rich_text::RichTextDelta};
  18. use std::{convert::TryInto, sync::Arc};
  19. use tokio::sync::{mpsc, oneshot};
  20. pub fn make_document_ws_receiver(persistence: Arc<FlowyPersistence>) -> Arc<DocumentWebSocketReceiver> {
  21. let document_persistence = Arc::new(DocumentPersistenceImpl(persistence.clone()));
  22. let document_manager = Arc::new(ServerDocumentManager::new(document_persistence));
  23. let (ws_sender, rx) = tokio::sync::mpsc::channel(100);
  24. let actor = DocumentWebSocketActor::new(rx, document_manager);
  25. tokio::task::spawn(actor.run());
  26. Arc::new(DocumentWebSocketReceiver::new(persistence, ws_sender))
  27. }
  28. pub struct DocumentWebSocketReceiver {
  29. ws_sender: mpsc::Sender<WSActorMessage>,
  30. persistence: Arc<FlowyPersistence>,
  31. }
  32. impl DocumentWebSocketReceiver {
  33. pub fn new(persistence: Arc<FlowyPersistence>, ws_sender: mpsc::Sender<WSActorMessage>) -> Self {
  34. Self { ws_sender, persistence }
  35. }
  36. }
  37. impl WebSocketReceiver for DocumentWebSocketReceiver {
  38. fn receive(&self, data: WSClientData) {
  39. let (ret, rx) = oneshot::channel();
  40. let sender = self.ws_sender.clone();
  41. let pool = self.persistence.pg_pool();
  42. actix_rt::spawn(async move {
  43. let msg = WSActorMessage::ClientData {
  44. client_data: data,
  45. ret,
  46. pool,
  47. };
  48. match sender.send(msg).await {
  49. Ok(_) => {},
  50. Err(e) => log::error!("{}", e),
  51. }
  52. match rx.await {
  53. Ok(_) => {},
  54. Err(e) => log::error!("{:?}", e),
  55. };
  56. });
  57. }
  58. }
  59. struct DocumentPersistenceImpl(Arc<FlowyPersistence>);
  60. impl DocumentPersistence for DocumentPersistenceImpl {
  61. fn update_doc(&self, doc_id: &str, rev_id: i64, delta: RichTextDelta) -> FutureResultSend<(), CollaborateError> {
  62. let pg_pool = self.0.pg_pool();
  63. let mut params = UpdateDocParams::new();
  64. let doc_json = delta.to_json();
  65. params.set_doc_id(doc_id.to_string());
  66. params.set_data(doc_json);
  67. params.set_rev_id(rev_id);
  68. FutureResultSend::new(async move {
  69. let _ = update_doc(&pg_pool, params)
  70. .await
  71. .map_err(server_error_to_collaborate_error)?;
  72. Ok(())
  73. })
  74. }
  75. fn read_doc(&self, doc_id: &str) -> FutureResultSend<Doc, CollaborateError> {
  76. let params = DocIdentifier {
  77. doc_id: doc_id.to_string(),
  78. ..Default::default()
  79. };
  80. let pg_pool = self.0.pg_pool();
  81. FutureResultSend::new(async move {
  82. let mut pb_doc = read_doc(&pg_pool, params)
  83. .await
  84. .map_err(server_error_to_collaborate_error)?;
  85. let doc = (&mut pb_doc)
  86. .try_into()
  87. .map_err(|e| CollaborateError::internal().context(e))?;
  88. Ok(doc)
  89. })
  90. }
  91. fn create_doc(&self, revision: Revision) -> FutureResultSend<Doc, CollaborateError> {
  92. let pg_pool = self.0.pg_pool();
  93. FutureResultSend::new(async move {
  94. let delta = RichTextDelta::from_bytes(&revision.delta_data)?;
  95. let doc_json = delta.to_json();
  96. let params = CreateDocParams {
  97. id: revision.doc_id.clone(),
  98. data: doc_json.clone(),
  99. unknown_fields: Default::default(),
  100. cached_size: Default::default(),
  101. };
  102. let _ = create_doc(&pg_pool, params)
  103. .await
  104. .map_err(server_error_to_collaborate_error)?;
  105. let doc: Doc = revision.try_into()?;
  106. Ok(doc)
  107. })
  108. }
  109. }
  110. fn server_error_to_collaborate_error(error: ServerError) -> CollaborateError {
  111. if error.is_record_not_found() {
  112. CollaborateError::record_not_found()
  113. } else {
  114. CollaborateError::internal().context(error)
  115. }
  116. }