ws_handler.rs 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. use super::edit_doc::EditDoc;
  2. use crate::service::{doc::read_doc, util::parse_from_bytes, ws::WsBizHandler};
  3. use actix_web::web::Data;
  4. use bytes::Bytes;
  5. use flowy_document::{
  6. protobuf::{QueryDocParams, Revision, WsDataType, WsDocumentData},
  7. services::doc::Document,
  8. };
  9. use flowy_net::errors::ServerError;
  10. use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard};
  11. use protobuf::Message;
  12. use sqlx::PgPool;
  13. use std::{collections::HashMap, sync::Arc};
  14. pub struct DocWsBizHandler {
  15. inner: Arc<Inner>,
  16. }
  17. impl DocWsBizHandler {
  18. pub fn new(pg_pool: Data<PgPool>) -> Self {
  19. Self {
  20. inner: Arc::new(Inner::new(pg_pool)),
  21. }
  22. }
  23. }
  24. impl WsBizHandler for DocWsBizHandler {
  25. fn receive_data(&self, data: Bytes) {
  26. let inner = self.inner.clone();
  27. actix_rt::spawn(async move {
  28. let result = inner.handle(data).await;
  29. match result {
  30. Ok(_) => {},
  31. Err(e) => log::error!("WsBizHandler handle data error: {:?}", e),
  32. }
  33. });
  34. }
  35. }
  36. struct Inner {
  37. pg_pool: Data<PgPool>,
  38. edit_docs: RwLock<HashMap<String, Arc<EditDoc>>>,
  39. }
  40. impl Inner {
  41. fn new(pg_pool: Data<PgPool>) -> Self {
  42. Self {
  43. pg_pool,
  44. edit_docs: RwLock::new(HashMap::new()),
  45. }
  46. }
  47. async fn handle(&self, data: Bytes) -> Result<(), ServerError> {
  48. let document_data: WsDocumentData = parse_from_bytes(&data)?;
  49. match document_data.ty {
  50. WsDataType::Command => {},
  51. WsDataType::Delta => {
  52. let revision: Revision = parse_from_bytes(&document_data.data)?;
  53. let edited_doc = self.get_edit_doc(&revision.doc_id).await?;
  54. tokio::spawn(async move {
  55. edited_doc.apply_revision(revision).await.unwrap();
  56. });
  57. },
  58. }
  59. Ok(())
  60. }
  61. async fn get_edit_doc(&self, doc_id: &str) -> Result<Arc<EditDoc>, ServerError> {
  62. // Opti: using lock free map instead?
  63. let edit_docs = self.edit_docs.upgradable_read();
  64. if let Some(doc) = edit_docs.get(doc_id) {
  65. return Ok(doc.clone());
  66. } else {
  67. let mut edit_docs = RwLockUpgradableReadGuard::upgrade(edit_docs);
  68. let pg_pool = self.pg_pool.clone();
  69. let params = QueryDocParams {
  70. doc_id: doc_id.to_string(),
  71. ..Default::default()
  72. };
  73. let doc = read_doc(pg_pool.get_ref(), params).await?;
  74. let edit_doc = Arc::new(EditDoc::new(doc, self.pg_pool.clone())?);
  75. edit_docs.insert(doc_id.to_string(), edit_doc.clone());
  76. Ok(edit_doc)
  77. }
  78. }
  79. }