queue.rs 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. use async_stream::stream;
  2. use bytes::Bytes;
  3. use flowy_collaboration::{
  4. core::document::{history::UndoResult, Document},
  5. errors::DocumentError,
  6. };
  7. use futures::stream::StreamExt;
  8. use lib_ot::{
  9. core::{Interval, OperationTransformable},
  10. revision::{RevId, Revision},
  11. rich_text::{RichTextAttribute, RichTextDelta},
  12. };
  13. use std::{convert::TryFrom, sync::Arc};
  14. use tokio::sync::{mpsc, oneshot, RwLock};
  15. pub(crate) struct EditCommandQueue {
  16. doc_id: String,
  17. document: Arc<RwLock<Document>>,
  18. receiver: Option<mpsc::UnboundedReceiver<EditCommand>>,
  19. }
  20. impl EditCommandQueue {
  21. pub(crate) fn new(doc_id: &str, delta: RichTextDelta, receiver: mpsc::UnboundedReceiver<EditCommand>) -> Self {
  22. let document = Arc::new(RwLock::new(Document::from_delta(delta)));
  23. Self {
  24. doc_id: doc_id.to_owned(),
  25. document,
  26. receiver: Some(receiver),
  27. }
  28. }
  29. pub(crate) async fn run(mut self) {
  30. let mut receiver = self.receiver.take().expect("Should only call once");
  31. let stream = stream! {
  32. loop {
  33. match receiver.recv().await {
  34. Some(msg) => yield msg,
  35. None => break,
  36. }
  37. }
  38. };
  39. stream
  40. .for_each(|msg| async {
  41. self.handle_message(msg).await;
  42. })
  43. .await;
  44. }
  45. async fn handle_message(&self, msg: EditCommand) {
  46. match msg {
  47. EditCommand::ComposeDelta { delta, ret } => {
  48. let result = self.composed_delta(delta).await;
  49. let _ = ret.send(result);
  50. },
  51. EditCommand::ProcessRemoteRevision { bytes, ret } => {
  52. let f = || async {
  53. let revision = Revision::try_from(bytes)?;
  54. let delta = RichTextDelta::from_bytes(&revision.delta_data)?;
  55. let rev_id: RevId = revision.rev_id.into();
  56. let (server_prime, client_prime) = self.document.read().await.delta().transform(&delta)?;
  57. let transform_delta = TransformDeltas {
  58. client_prime,
  59. server_prime,
  60. server_rev_id: rev_id,
  61. };
  62. Ok::<TransformDeltas, DocumentError>(transform_delta)
  63. };
  64. let _ = ret.send(f().await);
  65. },
  66. EditCommand::Insert { index, data, ret } => {
  67. let delta = self.document.write().await.insert(index, data);
  68. let _ = ret.send(delta);
  69. },
  70. EditCommand::Delete { interval, ret } => {
  71. let result = self.document.write().await.delete(interval);
  72. let _ = ret.send(result);
  73. },
  74. EditCommand::Format {
  75. interval,
  76. attribute,
  77. ret,
  78. } => {
  79. let result = self.document.write().await.format(interval, attribute);
  80. let _ = ret.send(result);
  81. },
  82. EditCommand::Replace { interval, data, ret } => {
  83. let result = self.document.write().await.replace(interval, data);
  84. let _ = ret.send(result);
  85. },
  86. EditCommand::CanUndo { ret } => {
  87. let _ = ret.send(self.document.read().await.can_undo());
  88. },
  89. EditCommand::CanRedo { ret } => {
  90. let _ = ret.send(self.document.read().await.can_redo());
  91. },
  92. EditCommand::Undo { ret } => {
  93. let result = self.document.write().await.undo();
  94. let _ = ret.send(result);
  95. },
  96. EditCommand::Redo { ret } => {
  97. let result = self.document.write().await.redo();
  98. let _ = ret.send(result);
  99. },
  100. EditCommand::ReadDoc { ret } => {
  101. let data = self.document.read().await.to_json();
  102. let _ = ret.send(Ok(data));
  103. },
  104. EditCommand::ReadDocDelta { ret } => {
  105. let delta = self.document.read().await.delta().clone();
  106. let _ = ret.send(Ok(delta));
  107. },
  108. }
  109. }
  110. #[tracing::instrument(level = "debug", skip(self, delta), fields(compose_result), err)]
  111. async fn composed_delta(&self, delta: RichTextDelta) -> Result<(), DocumentError> {
  112. // tracing::debug!("{:?} thread handle_message", thread::current(),);
  113. let mut document = self.document.write().await;
  114. tracing::Span::current().record(
  115. "composed_delta",
  116. &format!("doc_id:{} - {}", &self.doc_id, delta.to_json()).as_str(),
  117. );
  118. let result = document.compose_delta(delta);
  119. drop(document);
  120. result
  121. }
  122. }
  123. pub(crate) type Ret<T> = oneshot::Sender<Result<T, DocumentError>>;
  124. #[allow(dead_code)]
  125. pub(crate) enum EditCommand {
  126. ComposeDelta {
  127. delta: RichTextDelta,
  128. ret: Ret<()>,
  129. },
  130. ProcessRemoteRevision {
  131. bytes: Bytes,
  132. ret: Ret<TransformDeltas>,
  133. },
  134. Insert {
  135. index: usize,
  136. data: String,
  137. ret: Ret<RichTextDelta>,
  138. },
  139. Delete {
  140. interval: Interval,
  141. ret: Ret<RichTextDelta>,
  142. },
  143. Format {
  144. interval: Interval,
  145. attribute: RichTextAttribute,
  146. ret: Ret<RichTextDelta>,
  147. },
  148. Replace {
  149. interval: Interval,
  150. data: String,
  151. ret: Ret<RichTextDelta>,
  152. },
  153. CanUndo {
  154. ret: oneshot::Sender<bool>,
  155. },
  156. CanRedo {
  157. ret: oneshot::Sender<bool>,
  158. },
  159. Undo {
  160. ret: Ret<UndoResult>,
  161. },
  162. Redo {
  163. ret: Ret<UndoResult>,
  164. },
  165. ReadDoc {
  166. ret: Ret<String>,
  167. },
  168. ReadDocDelta {
  169. ret: Ret<RichTextDelta>,
  170. },
  171. }
  172. pub(crate) struct TransformDeltas {
  173. pub client_prime: RichTextDelta,
  174. pub server_prime: RichTextDelta,
  175. pub server_rev_id: RevId,
  176. }