manager.rs 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. use crate::{
  2. services::doc::{
  3. read_doc,
  4. update_doc,
  5. ws_actor::{DocWsActor, DocWsMsg},
  6. },
  7. web_socket::{WsBizHandler, WsClientData},
  8. };
  9. use actix_web::web::Data;
  10. use crate::services::doc::create_doc;
  11. use backend_service::errors::ServerError;
  12. use flowy_collaboration::{
  13. core::sync::{ServerDocManager, ServerDocPersistence},
  14. entities::doc::Doc,
  15. errors::CollaborateError,
  16. protobuf::{CreateDocParams, DocIdentifier, UpdateDocParams},
  17. };
  18. use lib_infra::future::FutureResultSend;
  19. use lib_ot::{revision::Revision, rich_text::RichTextDelta};
  20. use sqlx::PgPool;
  21. use std::{convert::TryInto, sync::Arc};
  22. use tokio::sync::{mpsc, oneshot};
  23. pub struct DocumentCore {
  24. pub manager: Arc<ServerDocManager>,
  25. ws_sender: mpsc::Sender<DocWsMsg>,
  26. pg_pool: Data<PgPool>,
  27. }
  28. impl DocumentCore {
  29. pub fn new(pg_pool: Data<PgPool>) -> Self {
  30. let manager = Arc::new(ServerDocManager::new(Arc::new(DocPersistenceImpl(pg_pool.clone()))));
  31. let (ws_sender, rx) = mpsc::channel(100);
  32. let actor = DocWsActor::new(rx, manager.clone());
  33. tokio::task::spawn(actor.run());
  34. Self {
  35. manager,
  36. ws_sender,
  37. pg_pool,
  38. }
  39. }
  40. }
  41. impl WsBizHandler for DocumentCore {
  42. fn receive(&self, data: WsClientData) {
  43. let (ret, rx) = oneshot::channel();
  44. let sender = self.ws_sender.clone();
  45. let pool = self.pg_pool.clone();
  46. actix_rt::spawn(async move {
  47. let msg = DocWsMsg::ClientData {
  48. client_data: data,
  49. ret,
  50. pool,
  51. };
  52. match sender.send(msg).await {
  53. Ok(_) => {},
  54. Err(e) => log::error!("{}", e),
  55. }
  56. match rx.await {
  57. Ok(_) => {},
  58. Err(e) => log::error!("{:?}", e),
  59. };
  60. });
  61. }
  62. }
  63. struct DocPersistenceImpl(Data<PgPool>);
  64. impl ServerDocPersistence for DocPersistenceImpl {
  65. fn update_doc(&self, doc_id: &str, rev_id: i64, delta: RichTextDelta) -> FutureResultSend<(), CollaborateError> {
  66. let pg_pool = self.0.clone();
  67. let mut params = UpdateDocParams::new();
  68. let doc_json = delta.to_json();
  69. params.set_doc_id(doc_id.to_string());
  70. params.set_data(doc_json);
  71. params.set_rev_id(rev_id);
  72. FutureResultSend::new(async move {
  73. let _ = update_doc(pg_pool.get_ref(), params)
  74. .await
  75. .map_err(server_error_to_collaborate_error)?;
  76. Ok(())
  77. })
  78. }
  79. fn read_doc(&self, doc_id: &str) -> FutureResultSend<Doc, CollaborateError> {
  80. let params = DocIdentifier {
  81. doc_id: doc_id.to_string(),
  82. ..Default::default()
  83. };
  84. let pg_pool = self.0.clone();
  85. FutureResultSend::new(async move {
  86. let mut pb_doc = read_doc(pg_pool.get_ref(), params)
  87. .await
  88. .map_err(server_error_to_collaborate_error)?;
  89. let doc = (&mut pb_doc)
  90. .try_into()
  91. .map_err(|e| CollaborateError::internal().context(e))?;
  92. Ok(doc)
  93. })
  94. }
  95. fn create_doc(&self, revision: Revision) -> FutureResultSend<Doc, CollaborateError> {
  96. let pg_pool = self.0.clone();
  97. FutureResultSend::new(async move {
  98. let delta = RichTextDelta::from_bytes(&revision.delta_data)?;
  99. let doc_json = delta.to_json();
  100. let params = CreateDocParams {
  101. id: revision.doc_id.clone(),
  102. data: doc_json.clone(),
  103. unknown_fields: Default::default(),
  104. cached_size: Default::default(),
  105. };
  106. let _ = create_doc(pg_pool.get_ref(), params)
  107. .await
  108. .map_err(server_error_to_collaborate_error)?;
  109. let doc: Doc = revision.try_into()?;
  110. Ok(doc)
  111. })
  112. }
  113. }
  114. fn server_error_to_collaborate_error(error: ServerError) -> CollaborateError {
  115. if error.is_record_not_found() {
  116. CollaborateError::record_not_found()
  117. } else {
  118. CollaborateError::internal().context(error)
  119. }
  120. }