edit_actor.rs 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. use crate::service::{
  2. doc::edit::ServerDocEditor,
  3. ws::{entities::Socket, WsUser},
  4. };
  5. use actix_web::web::Data;
  6. use async_stream::stream;
  7. use backend_service::errors::{internal_error, Result as DocResult, ServerError};
  8. use flowy_document_infra::protobuf::{Doc, Revision};
  9. use futures::stream::StreamExt;
  10. use sqlx::PgPool;
  11. use std::sync::{atomic::Ordering::SeqCst, Arc};
  12. use tokio::{
  13. sync::{mpsc, oneshot},
  14. task::spawn_blocking,
  15. };
  16. #[derive(Clone)]
  17. pub struct EditUser {
  18. user: Arc<WsUser>,
  19. pub(crate) socket: Socket,
  20. }
  21. impl EditUser {
  22. pub fn id(&self) -> String { self.user.id().to_string() }
  23. }
  24. #[derive(Debug)]
  25. pub enum EditMsg {
  26. Revision {
  27. user: Arc<WsUser>,
  28. socket: Socket,
  29. revision: Revision,
  30. ret: oneshot::Sender<DocResult<()>>,
  31. },
  32. DocumentJson {
  33. ret: oneshot::Sender<DocResult<String>>,
  34. },
  35. DocumentRevId {
  36. ret: oneshot::Sender<DocResult<i64>>,
  37. },
  38. NewDocUser {
  39. user: Arc<WsUser>,
  40. socket: Socket,
  41. rev_id: i64,
  42. ret: oneshot::Sender<DocResult<()>>,
  43. },
  44. }
  45. pub struct EditDocActor {
  46. receiver: Option<mpsc::Receiver<EditMsg>>,
  47. edit_doc: Arc<ServerDocEditor>,
  48. pg_pool: Data<PgPool>,
  49. }
  50. impl EditDocActor {
  51. pub fn new(receiver: mpsc::Receiver<EditMsg>, doc: Doc, pg_pool: Data<PgPool>) -> Result<Self, ServerError> {
  52. let edit_doc = Arc::new(ServerDocEditor::new(doc)?);
  53. Ok(Self {
  54. receiver: Some(receiver),
  55. edit_doc,
  56. pg_pool,
  57. })
  58. }
  59. pub async fn run(mut self) {
  60. let mut receiver = self
  61. .receiver
  62. .take()
  63. .expect("DocActor's receiver should only take one time");
  64. let stream = stream! {
  65. loop {
  66. match receiver.recv().await {
  67. Some(msg) => yield msg,
  68. None => break,
  69. }
  70. }
  71. };
  72. stream.for_each(|msg| self.handle_message(msg)).await;
  73. }
  74. async fn handle_message(&self, msg: EditMsg) {
  75. match msg {
  76. EditMsg::Revision {
  77. user,
  78. socket,
  79. revision,
  80. ret,
  81. } => {
  82. let user = EditUser {
  83. user: user.clone(),
  84. socket: socket.clone(),
  85. };
  86. let _ = ret.send(self.edit_doc.apply_revision(user, revision, self.pg_pool.clone()).await);
  87. },
  88. EditMsg::DocumentJson { ret } => {
  89. let edit_context = self.edit_doc.clone();
  90. let json = spawn_blocking(move || edit_context.document_json())
  91. .await
  92. .map_err(internal_error);
  93. let _ = ret.send(json);
  94. },
  95. EditMsg::DocumentRevId { ret } => {
  96. let edit_context = self.edit_doc.clone();
  97. let _ = ret.send(Ok(edit_context.rev_id.load(SeqCst)));
  98. },
  99. EditMsg::NewDocUser {
  100. user,
  101. socket,
  102. rev_id,
  103. ret,
  104. } => {
  105. log::debug!("Receive new doc user: {:?}, rev_id: {}", user, rev_id);
  106. let user = EditUser {
  107. user: user.clone(),
  108. socket: socket.clone(),
  109. };
  110. let _ = ret.send(self.edit_doc.new_doc_user(user, rev_id).await);
  111. },
  112. }
  113. }
  114. }