ws_actor.rs 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. use crate::{
  2. services::{
  3. doc::editor::ServerDocUser,
  4. util::{md5, parse_from_bytes},
  5. },
  6. web_socket::{entities::Socket, WsClientData, WsUser},
  7. };
  8. use actix_rt::task::spawn_blocking;
  9. use actix_web::web::Data;
  10. use async_stream::stream;
  11. use backend_service::errors::{internal_error, Result, ServerError};
  12. use flowy_collaboration::{
  13. core::sync::ServerDocManager,
  14. protobuf::{DocumentWSData, DocumentWSDataType},
  15. };
  16. use futures::stream::StreamExt;
  17. use lib_ot::protobuf::Revision;
  18. use sqlx::PgPool;
  19. use std::{convert::TryInto, sync::Arc};
  20. use tokio::sync::{mpsc, oneshot};
  21. pub enum DocWsMsg {
  22. ClientData {
  23. client_data: WsClientData,
  24. pool: Data<PgPool>,
  25. ret: oneshot::Sender<Result<()>>,
  26. },
  27. }
  28. pub struct DocWsActor {
  29. receiver: Option<mpsc::Receiver<DocWsMsg>>,
  30. doc_manager: Arc<ServerDocManager>,
  31. }
  32. impl DocWsActor {
  33. pub fn new(receiver: mpsc::Receiver<DocWsMsg>, manager: Arc<ServerDocManager>) -> Self {
  34. Self {
  35. receiver: Some(receiver),
  36. doc_manager: manager,
  37. }
  38. }
  39. pub async fn run(mut self) {
  40. let mut receiver = self
  41. .receiver
  42. .take()
  43. .expect("DocActor's receiver should only take one time");
  44. let stream = stream! {
  45. loop {
  46. match receiver.recv().await {
  47. Some(msg) => yield msg,
  48. None => break,
  49. }
  50. }
  51. };
  52. stream.for_each(|msg| self.handle_message(msg)).await;
  53. }
  54. async fn handle_message(&self, msg: DocWsMsg) {
  55. match msg {
  56. DocWsMsg::ClientData { client_data, pool, ret } => {
  57. let _ = ret.send(self.handle_client_data(client_data, pool).await);
  58. },
  59. }
  60. }
  61. async fn handle_client_data(&self, client_data: WsClientData, pool: Data<PgPool>) -> Result<()> {
  62. let WsClientData { user, socket, data } = client_data;
  63. let document_data = spawn_blocking(move || {
  64. let document_data: DocumentWSData = parse_from_bytes(&data)?;
  65. Result::Ok(document_data)
  66. })
  67. .await
  68. .map_err(internal_error)??;
  69. let data = document_data.data;
  70. match document_data.ty {
  71. DocumentWSDataType::Acked => Ok(()),
  72. DocumentWSDataType::PushRev => self.apply_pushed_rev(user, socket, data, pool).await,
  73. DocumentWSDataType::PullRev => Ok(()),
  74. DocumentWSDataType::UserConnect => Ok(()),
  75. }
  76. }
  77. async fn apply_pushed_rev(
  78. &self,
  79. user: Arc<WsUser>,
  80. socket: Socket,
  81. data: Vec<u8>,
  82. pg_pool: Data<PgPool>,
  83. ) -> Result<()> {
  84. let mut revision_pb = spawn_blocking(move || {
  85. let revision: Revision = parse_from_bytes(&data)?;
  86. let _ = verify_md5(&revision)?;
  87. Result::Ok(revision)
  88. })
  89. .await
  90. .map_err(internal_error)??;
  91. let revision: lib_ot::revision::Revision = (&mut revision_pb).try_into().map_err(internal_error)?;
  92. // Create the doc if it doesn't exist
  93. let handler = match self.doc_manager.get(&revision.doc_id).await {
  94. None => self
  95. .doc_manager
  96. .create_doc(revision.clone())
  97. .await
  98. .map_err(internal_error)?,
  99. Some(handler) => handler,
  100. };
  101. let user = Arc::new(ServerDocUser { user, socket, pg_pool });
  102. handler.apply_revision(user, revision).await.map_err(internal_error)?;
  103. Ok(())
  104. }
  105. }
  106. fn verify_md5(revision: &Revision) -> Result<()> {
  107. if md5(&revision.delta_data) != revision.md5 {
  108. return Err(ServerError::internal().context("Revision md5 not match"));
  109. }
  110. Ok(())
  111. }