doc_actor.rs 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. use crate::{
  2. entities::doc::{RevId, Revision},
  3. errors::DocResult,
  4. services::doc::{
  5. edit::message::{DocumentMsg, TransformDeltas},
  6. Document,
  7. },
  8. };
  9. use async_stream::stream;
  10. use flowy_ot::core::{Delta, OperationTransformable};
  11. use futures::stream::StreamExt;
  12. use std::{convert::TryFrom, sync::Arc};
  13. use tokio::sync::{mpsc, RwLock};
  14. pub struct DocumentActor {
  15. document: Arc<RwLock<Document>>,
  16. receiver: Option<mpsc::UnboundedReceiver<DocumentMsg>>,
  17. }
  18. impl DocumentActor {
  19. pub fn new(delta: Delta, receiver: mpsc::UnboundedReceiver<DocumentMsg>) -> Self {
  20. let document = Arc::new(RwLock::new(Document::from_delta(delta)));
  21. Self {
  22. document,
  23. receiver: Some(receiver),
  24. }
  25. }
  26. pub async fn run(mut self) {
  27. let mut receiver = self.receiver.take().expect("Should only call once");
  28. let stream = stream! {
  29. loop {
  30. match receiver.recv().await {
  31. Some(msg) => yield msg,
  32. None => break,
  33. }
  34. }
  35. };
  36. stream
  37. .for_each(|msg| async {
  38. match self.handle_message(msg).await {
  39. Ok(_) => {},
  40. Err(e) => log::error!("{:?}", e),
  41. }
  42. })
  43. .await;
  44. }
  45. async fn handle_message(&self, msg: DocumentMsg) -> DocResult<()> {
  46. match msg {
  47. DocumentMsg::Delta { delta, ret } => {
  48. let result = self.compose_delta(delta).await;
  49. let _ = ret.send(result);
  50. },
  51. DocumentMsg::RemoteRevision { bytes, ret } => {
  52. let revision = Revision::try_from(bytes)?;
  53. let delta = Delta::from_bytes(&revision.delta_data)?;
  54. let rev_id: RevId = revision.rev_id.into();
  55. let (server_prime, client_prime) = self.document.read().await.delta().transform(&delta)?;
  56. let transform_delta = TransformDeltas {
  57. client_prime,
  58. server_prime,
  59. server_rev_id: rev_id,
  60. };
  61. let _ = ret.send(Ok(transform_delta));
  62. },
  63. DocumentMsg::Insert { index, data, ret } => {
  64. let delta = self.document.write().await.insert(index, data);
  65. let _ = ret.send(delta);
  66. },
  67. DocumentMsg::Delete { interval, ret } => {
  68. let result = self.document.write().await.delete(interval);
  69. let _ = ret.send(result);
  70. },
  71. DocumentMsg::Format {
  72. interval,
  73. attribute,
  74. ret,
  75. } => {
  76. let result = self.document.write().await.format(interval, attribute);
  77. let _ = ret.send(result);
  78. },
  79. DocumentMsg::Replace { interval, data, ret } => {
  80. let result = self.document.write().await.replace(interval, data);
  81. let _ = ret.send(result);
  82. },
  83. DocumentMsg::CanUndo { ret } => {
  84. let _ = ret.send(self.document.read().await.can_undo());
  85. },
  86. DocumentMsg::CanRedo { ret } => {
  87. let _ = ret.send(self.document.read().await.can_redo());
  88. },
  89. DocumentMsg::Undo { ret } => {
  90. let result = self.document.write().await.undo();
  91. let _ = ret.send(result);
  92. },
  93. DocumentMsg::Redo { ret } => {
  94. let result = self.document.write().await.redo();
  95. let _ = ret.send(result);
  96. },
  97. DocumentMsg::Doc { ret } => {
  98. let data = self.document.read().await.to_json();
  99. let _ = ret.send(Ok(data));
  100. },
  101. DocumentMsg::SaveDocument { rev_id: _, ret } => {
  102. // let result = self.save_to_disk(rev_id).await;
  103. let _ = ret.send(Ok(()));
  104. },
  105. }
  106. Ok(())
  107. }
  108. async fn compose_delta(&self, delta: Delta) -> DocResult<()> {
  109. // tracing::debug!("{:?} thread handle_message", thread::current(),);
  110. let mut document = self.document.write().await;
  111. let result = document.compose_delta(&delta);
  112. tracing::debug!(
  113. "Compose push delta: {}. result: {}",
  114. delta.to_json(),
  115. document.to_json()
  116. );
  117. drop(document);
  118. result
  119. }
  120. }
  121. // #[tracing::instrument(level = "debug", skip(self, params), err)]
  122. // fn update_doc_on_server(&self, params: UpdateDocParams) -> Result<(),
  123. // DocError> { let token = self.user.token()?;
  124. // let server = self.server.clone();
  125. // tokio::spawn(async move {
  126. // match server.update_doc(&token, params).await {
  127. // Ok(_) => {},
  128. // Err(e) => {
  129. // // TODO: retry?
  130. // log::error!("Update doc failed: {}", e);
  131. // },
  132. // }
  133. // });
  134. // Ok(())
  135. // }