edit_queue.rs 6.7 KB


  1. use async_stream::stream;
  2. use bytes::Bytes;
  3. use flowy_collaboration::{
  4. core::document::{history::UndoResult, Document},
  5. errors::CollaborateError,
  6. };
  7. use flowy_error::FlowyError;
  8. use futures::stream::StreamExt;
  9. use lib_ot::{
  10. core::{Interval, OperationTransformable},
  11. revision::{RevId, Revision},
  12. rich_text::{RichTextAttribute, RichTextDelta},
  13. };
  14. use std::{convert::TryFrom, sync::Arc};
  15. use tokio::sync::{mpsc, oneshot, RwLock};
  16. pub(crate) struct EditCommandQueue {
  17. doc_id: String,
  18. document: Arc<RwLock<Document>>,
  19. receiver: Option<mpsc::UnboundedReceiver<EditCommand>>,
  20. }
  21. impl EditCommandQueue {
  22. pub(crate) fn new(doc_id: &str, delta: RichTextDelta, receiver: mpsc::UnboundedReceiver<EditCommand>) -> Self {
  23. let document = Arc::new(RwLock::new(Document::from_delta(delta)));
  24. Self {
  25. doc_id: doc_id.to_owned(),
  26. document,
  27. receiver: Some(receiver),
  28. }
  29. }
  30. pub(crate) async fn run(mut self) {
  31. let mut receiver = self.receiver.take().expect("Should only call once");
  32. let stream = stream! {
  33. loop {
  34. match receiver.recv().await {
  35. Some(msg) => yield msg,
  36. None => break,
  37. }
  38. }
  39. };
  40. stream
  41. .for_each(|msg| async {
  42. match self.handle_message(msg).await {
  43. Ok(_) => {},
  44. Err(e) => tracing::debug!("[EditCommandQueue]: {}", e),
  45. }
  46. })
  47. .await;
  48. }
  49. async fn handle_message(&self, msg: EditCommand) -> Result<(), FlowyError> {
  50. match msg {
  51. EditCommand::ComposeDelta { delta, ret } => {
  52. let result = self.composed_delta(delta).await;
  53. let _ = ret.send(result);
  54. },
  55. EditCommand::ProcessRemoteRevision { bytes, ret } => {
  56. let f = || async {
  57. let revision = Revision::try_from(bytes)?;
  58. let delta = RichTextDelta::from_bytes(&revision.delta_data)?;
  59. let server_rev_id: RevId = revision.rev_id.into();
  60. let read_guard = self.document.read().await;
  61. let (server_prime, client_prime) = read_guard.delta().transform(&delta)?;
  62. drop(read_guard);
  63. let transform_delta = TransformDeltas {
  64. client_prime,
  65. server_prime,
  66. server_rev_id,
  67. };
  68. Ok::<TransformDeltas, CollaborateError>(transform_delta)
  69. };
  70. let _ = ret.send(f().await);
  71. },
  72. EditCommand::Insert { index, data, ret } => {
  73. let mut write_guard = self.document.write().await;
  74. let delta = write_guard.insert(index, data)?;
  75. let md5 = write_guard.md5();
  76. let _ = ret.send(Ok((delta, md5)));
  77. },
  78. EditCommand::Delete { interval, ret } => {
  79. let mut write_guard = self.document.write().await;
  80. let delta = write_guard.delete(interval)?;
  81. let md5 = write_guard.md5();
  82. let _ = ret.send(Ok((delta, md5)));
  83. },
  84. EditCommand::Format {
  85. interval,
  86. attribute,
  87. ret,
  88. } => {
  89. let mut write_guard = self.document.write().await;
  90. let delta = write_guard.format(interval, attribute)?;
  91. let md5 = write_guard.md5();
  92. let _ = ret.send(Ok((delta, md5)));
  93. },
  94. EditCommand::Replace { interval, data, ret } => {
  95. let mut write_guard = self.document.write().await;
  96. let delta = write_guard.replace(interval, data)?;
  97. let md5 = write_guard.md5();
  98. let _ = ret.send(Ok((delta, md5)));
  99. },
  100. EditCommand::CanUndo { ret } => {
  101. let _ = ret.send(self.document.read().await.can_undo());
  102. },
  103. EditCommand::CanRedo { ret } => {
  104. let _ = ret.send(self.document.read().await.can_redo());
  105. },
  106. EditCommand::Undo { ret } => {
  107. let result = self.document.write().await.undo();
  108. let _ = ret.send(result);
  109. },
  110. EditCommand::Redo { ret } => {
  111. let result = self.document.write().await.redo();
  112. let _ = ret.send(result);
  113. },
  114. EditCommand::ReadDoc { ret } => {
  115. let data = self.document.read().await.to_json();
  116. let _ = ret.send(Ok(data));
  117. },
  118. EditCommand::ReadDocDelta { ret } => {
  119. let delta = self.document.read().await.delta().clone();
  120. let _ = ret.send(Ok(delta));
  121. },
  122. }
  123. Ok(())
  124. }
  125. #[tracing::instrument(level = "debug", skip(self, delta), fields(compose_result), err)]
  126. async fn composed_delta(&self, delta: RichTextDelta) -> Result<String, CollaborateError> {
  127. // tracing::debug!("{:?} thread handle_message", thread::current(),);
  128. let mut document = self.document.write().await;
  129. tracing::Span::current().record(
  130. "composed_delta",
  131. &format!("doc_id:{} - {}", &self.doc_id, delta.to_json()).as_str(),
  132. );
  133. let _ = document.compose_delta(delta)?;
  134. let md5 = document.md5();
  135. drop(document);
  136. Ok(md5)
  137. }
  138. }
  139. pub(crate) type Ret<T> = oneshot::Sender<Result<T, CollaborateError>>;
  140. pub(crate) type NewDelta = (RichTextDelta, String);
  141. pub(crate) type DocumentMD5 = String;
  142. #[allow(dead_code)]
  143. pub(crate) enum EditCommand {
  144. ComposeDelta {
  145. delta: RichTextDelta,
  146. ret: Ret<DocumentMD5>,
  147. },
  148. ProcessRemoteRevision {
  149. bytes: Bytes,
  150. ret: Ret<TransformDeltas>,
  151. },
  152. Insert {
  153. index: usize,
  154. data: String,
  155. ret: Ret<NewDelta>,
  156. },
  157. Delete {
  158. interval: Interval,
  159. ret: Ret<NewDelta>,
  160. },
  161. Format {
  162. interval: Interval,
  163. attribute: RichTextAttribute,
  164. ret: Ret<NewDelta>,
  165. },
  166. Replace {
  167. interval: Interval,
  168. data: String,
  169. ret: Ret<NewDelta>,
  170. },
  171. CanUndo {
  172. ret: oneshot::Sender<bool>,
  173. },
  174. CanRedo {
  175. ret: oneshot::Sender<bool>,
  176. },
  177. Undo {
  178. ret: Ret<UndoResult>,
  179. },
  180. Redo {
  181. ret: Ret<UndoResult>,
  182. },
  183. ReadDoc {
  184. ret: Ret<String>,
  185. },
  186. ReadDocDelta {
  187. ret: Ret<RichTextDelta>,
  188. },
  189. }
  190. pub(crate) struct TransformDeltas {
  191. pub client_prime: RichTextDelta,
  192. pub server_prime: RichTextDelta,
  193. pub server_rev_id: RevId,
  194. }