ws_actor.rs 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. use crate::service::{
  2. doc::doc::DocManager,
  3. util::{md5, parse_from_bytes},
  4. ws::{entities::Socket, WsClientData, WsUser},
  5. };
  6. use actix_rt::task::spawn_blocking;
  7. use actix_web::web::Data;
  8. use async_stream::stream;
  9. use flowy_document::protobuf::{Revision, WsDataType, WsDocumentData};
  10. use flowy_net::errors::{internal_error, Result as DocResult, ServerError};
  11. use futures::stream::StreamExt;
  12. use sqlx::PgPool;
  13. use std::sync::Arc;
  14. use tokio::sync::{mpsc, oneshot};
  15. pub enum DocWsMsg {
  16. ClientData {
  17. client_data: WsClientData,
  18. pool: Data<PgPool>,
  19. ret: oneshot::Sender<DocResult<()>>,
  20. },
  21. }
  22. pub struct DocWsActor {
  23. receiver: Option<mpsc::Receiver<DocWsMsg>>,
  24. doc_manager: Arc<DocManager>,
  25. }
  26. impl DocWsActor {
  27. pub fn new(receiver: mpsc::Receiver<DocWsMsg>, manager: Arc<DocManager>) -> Self {
  28. Self {
  29. receiver: Some(receiver),
  30. doc_manager: manager,
  31. }
  32. }
  33. pub async fn run(mut self) {
  34. let mut receiver = self
  35. .receiver
  36. .take()
  37. .expect("DocActor's receiver should only take one time");
  38. let stream = stream! {
  39. loop {
  40. match receiver.recv().await {
  41. Some(msg) => yield msg,
  42. None => break,
  43. }
  44. }
  45. };
  46. stream.for_each(|msg| self.handle_message(msg)).await;
  47. }
  48. async fn handle_message(&self, msg: DocWsMsg) {
  49. match msg {
  50. DocWsMsg::ClientData { client_data, pool, ret } => {
  51. let _ = ret.send(self.handle_client_data(client_data, pool).await);
  52. },
  53. }
  54. }
  55. async fn handle_client_data(&self, client_data: WsClientData, pool: Data<PgPool>) -> DocResult<()> {
  56. let WsClientData { user, socket, data } = client_data;
  57. let document_data = spawn_blocking(move || {
  58. let document_data: WsDocumentData = parse_from_bytes(&data)?;
  59. DocResult::Ok(document_data)
  60. })
  61. .await
  62. .map_err(internal_error)??;
  63. match document_data.ty {
  64. WsDataType::Acked => Ok(()),
  65. WsDataType::PushRev => self.handle_push_rev(user, socket, document_data.data, pool).await,
  66. WsDataType::PullRev => Ok(()),
  67. WsDataType::Conflict => Ok(()),
  68. }
  69. }
  70. async fn handle_push_rev(
  71. &self,
  72. user: Arc<WsUser>,
  73. socket: Socket,
  74. revision_data: Vec<u8>,
  75. pool: Data<PgPool>,
  76. ) -> DocResult<()> {
  77. let revision = spawn_blocking(move || {
  78. let revision: Revision = parse_from_bytes(&revision_data)?;
  79. let _ = verify_md5(&revision)?;
  80. DocResult::Ok(revision)
  81. })
  82. .await
  83. .map_err(internal_error)??;
  84. match self.doc_manager.get(&revision.doc_id, pool).await? {
  85. Some(edit_doc) => {
  86. edit_doc.apply_revision(user, socket, revision).await?;
  87. Ok(())
  88. },
  89. None => {
  90. log::error!("Document with id: {} not exist", &revision.doc_id);
  91. Ok(())
  92. },
  93. }
  94. }
  95. }
  96. fn verify_md5(revision: &Revision) -> DocResult<()> {
  97. if md5(&revision.delta_data) != revision.md5 {
  98. return Err(ServerError::internal().context("Revision md5 not match"));
  99. }
  100. Ok(())
  101. }