queue.rs 10 KB


  1. use crate::old_editor::web_socket::DeltaDocumentResolveOperations;
  2. use crate::DocumentUser;
  3. use async_stream::stream;
  4. use flowy_database::ConnectionPool;
  5. use flowy_error::FlowyError;
  6. use flowy_http_model::revision::RevId;
  7. use flowy_revision::{RevisionMD5, RevisionManager, TransformOperations};
  8. use flowy_sync::{
  9. client_document::{history::UndoResult, ClientDocument},
  10. errors::CollaborateError,
  11. };
  12. use futures::stream::StreamExt;
  13. use lib_ot::core::AttributeEntry;
  14. use lib_ot::{
  15. core::{Interval, OperationTransform},
  16. text_delta::DeltaTextOperations,
  17. };
  18. use std::sync::Arc;
  19. use tokio::sync::mpsc::{Receiver, Sender};
  20. use tokio::sync::{oneshot, RwLock};
  21. // The EditorCommandQueue executes each command that will alter the document in
  22. // serial.
  23. pub(crate) struct EditDocumentQueue {
  24. document: Arc<RwLock<ClientDocument>>,
  25. #[allow(dead_code)]
  26. user: Arc<dyn DocumentUser>,
  27. rev_manager: Arc<RevisionManager<Arc<ConnectionPool>>>,
  28. receiver: Option<EditorCommandReceiver>,
  29. }
  30. impl EditDocumentQueue {
  31. pub(crate) fn new(
  32. user: Arc<dyn DocumentUser>,
  33. rev_manager: Arc<RevisionManager<Arc<ConnectionPool>>>,
  34. operations: DeltaTextOperations,
  35. receiver: EditorCommandReceiver,
  36. ) -> Self {
  37. let document = Arc::new(RwLock::new(ClientDocument::from_operations(operations)));
  38. Self {
  39. document,
  40. user,
  41. rev_manager,
  42. receiver: Some(receiver),
  43. }
  44. }
  45. pub(crate) async fn run(mut self) {
  46. let mut receiver = self.receiver.take().expect("Should only call once");
  47. let stream = stream! {
  48. loop {
  49. match receiver.recv().await {
  50. Some(msg) => yield msg,
  51. None => break,
  52. }
  53. }
  54. };
  55. stream
  56. .for_each(|command| async {
  57. match self.handle_command(command).await {
  58. Ok(_) => {}
  59. Err(e) => tracing::debug!("[EditCommandQueue]: {}", e),
  60. }
  61. })
  62. .await;
  63. }
  64. #[tracing::instrument(level = "trace", skip(self), err)]
  65. async fn handle_command(&self, command: EditorCommand) -> Result<(), FlowyError> {
  66. match command {
  67. EditorCommand::ComposeLocalOperations { operations, ret } => {
  68. let mut document = self.document.write().await;
  69. let _ = document.compose_operations(operations.clone())?;
  70. let md5 = document.document_md5();
  71. drop(document);
  72. let _ = self.save_local_operations(operations, md5).await?;
  73. let _ = ret.send(Ok(()));
  74. }
  75. EditorCommand::ComposeRemoteOperation { client_operations, ret } => {
  76. let mut document = self.document.write().await;
  77. let _ = document.compose_operations(client_operations.clone())?;
  78. let md5 = document.document_md5();
  79. drop(document);
  80. let _ = ret.send(Ok(md5.into()));
  81. }
  82. EditorCommand::ResetOperations { operations, ret } => {
  83. let mut document = self.document.write().await;
  84. let _ = document.set_operations(operations);
  85. let md5 = document.document_md5();
  86. drop(document);
  87. let _ = ret.send(Ok(md5.into()));
  88. }
  89. EditorCommand::TransformOperations { operations, ret } => {
  90. let f = || async {
  91. let read_guard = self.document.read().await;
  92. let mut server_operations: Option<DeltaDocumentResolveOperations> = None;
  93. let client_operations: DeltaTextOperations;
  94. if read_guard.is_empty() {
  95. // Do nothing
  96. client_operations = operations;
  97. } else {
  98. let (s_prime, c_prime) = read_guard.get_operations().transform(&operations)?;
  99. client_operations = c_prime;
  100. server_operations = Some(DeltaDocumentResolveOperations(s_prime));
  101. }
  102. drop(read_guard);
  103. Ok::<TextTransformOperations, CollaborateError>(TransformOperations {
  104. client_operations: DeltaDocumentResolveOperations(client_operations),
  105. server_operations,
  106. })
  107. };
  108. let _ = ret.send(f().await);
  109. }
  110. EditorCommand::Insert { index, data, ret } => {
  111. let mut write_guard = self.document.write().await;
  112. let operations = write_guard.insert(index, data)?;
  113. let md5 = write_guard.document_md5();
  114. let _ = self.save_local_operations(operations, md5).await?;
  115. let _ = ret.send(Ok(()));
  116. }
  117. EditorCommand::Delete { interval, ret } => {
  118. let mut write_guard = self.document.write().await;
  119. let operations = write_guard.delete(interval)?;
  120. let md5 = write_guard.document_md5();
  121. let _ = self.save_local_operations(operations, md5).await?;
  122. let _ = ret.send(Ok(()));
  123. }
  124. EditorCommand::Format {
  125. interval,
  126. attribute,
  127. ret,
  128. } => {
  129. let mut write_guard = self.document.write().await;
  130. let operations = write_guard.format(interval, attribute)?;
  131. let md5 = write_guard.document_md5();
  132. let _ = self.save_local_operations(operations, md5).await?;
  133. let _ = ret.send(Ok(()));
  134. }
  135. EditorCommand::Replace { interval, data, ret } => {
  136. let mut write_guard = self.document.write().await;
  137. let operations = write_guard.replace(interval, data)?;
  138. let md5 = write_guard.document_md5();
  139. let _ = self.save_local_operations(operations, md5).await?;
  140. let _ = ret.send(Ok(()));
  141. }
  142. EditorCommand::CanUndo { ret } => {
  143. let _ = ret.send(self.document.read().await.can_undo());
  144. }
  145. EditorCommand::CanRedo { ret } => {
  146. let _ = ret.send(self.document.read().await.can_redo());
  147. }
  148. EditorCommand::Undo { ret } => {
  149. let mut write_guard = self.document.write().await;
  150. let UndoResult { operations } = write_guard.undo()?;
  151. let md5 = write_guard.document_md5();
  152. let _ = self.save_local_operations(operations, md5).await?;
  153. let _ = ret.send(Ok(()));
  154. }
  155. EditorCommand::Redo { ret } => {
  156. let mut write_guard = self.document.write().await;
  157. let UndoResult { operations } = write_guard.redo()?;
  158. let md5 = write_guard.document_md5();
  159. let _ = self.save_local_operations(operations, md5).await?;
  160. let _ = ret.send(Ok(()));
  161. }
  162. EditorCommand::GetOperationsString { ret } => {
  163. let data = self.document.read().await.get_operations_json();
  164. let _ = ret.send(Ok(data));
  165. }
  166. EditorCommand::GetOperations { ret } => {
  167. let operations = self.document.read().await.get_operations().clone();
  168. let _ = ret.send(Ok(operations));
  169. }
  170. }
  171. Ok(())
  172. }
  173. async fn save_local_operations(&self, operations: DeltaTextOperations, md5: String) -> Result<RevId, FlowyError> {
  174. let bytes = operations.json_bytes();
  175. let rev_id = self.rev_manager.add_local_revision(bytes, md5).await?;
  176. Ok(rev_id.into())
  177. }
  178. }
  179. pub type TextTransformOperations = TransformOperations<DeltaDocumentResolveOperations>;
  180. pub(crate) type EditorCommandSender = Sender<EditorCommand>;
  181. pub(crate) type EditorCommandReceiver = Receiver<EditorCommand>;
  182. pub(crate) type Ret<T> = oneshot::Sender<Result<T, CollaborateError>>;
  183. pub(crate) enum EditorCommand {
  184. ComposeLocalOperations {
  185. operations: DeltaTextOperations,
  186. ret: Ret<()>,
  187. },
  188. ComposeRemoteOperation {
  189. client_operations: DeltaTextOperations,
  190. ret: Ret<RevisionMD5>,
  191. },
  192. ResetOperations {
  193. operations: DeltaTextOperations,
  194. ret: Ret<RevisionMD5>,
  195. },
  196. TransformOperations {
  197. operations: DeltaTextOperations,
  198. ret: Ret<TextTransformOperations>,
  199. },
  200. Insert {
  201. index: usize,
  202. data: String,
  203. ret: Ret<()>,
  204. },
  205. Delete {
  206. interval: Interval,
  207. ret: Ret<()>,
  208. },
  209. Format {
  210. interval: Interval,
  211. attribute: AttributeEntry,
  212. ret: Ret<()>,
  213. },
  214. Replace {
  215. interval: Interval,
  216. data: String,
  217. ret: Ret<()>,
  218. },
  219. CanUndo {
  220. ret: oneshot::Sender<bool>,
  221. },
  222. CanRedo {
  223. ret: oneshot::Sender<bool>,
  224. },
  225. Undo {
  226. ret: Ret<()>,
  227. },
  228. Redo {
  229. ret: Ret<()>,
  230. },
  231. GetOperationsString {
  232. ret: Ret<String>,
  233. },
  234. #[allow(dead_code)]
  235. GetOperations {
  236. ret: Ret<DeltaTextOperations>,
  237. },
  238. }
  239. impl std::fmt::Debug for EditorCommand {
  240. fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
  241. let s = match self {
  242. EditorCommand::ComposeLocalOperations { .. } => "ComposeLocalOperations",
  243. EditorCommand::ComposeRemoteOperation { .. } => "ComposeRemoteOperation",
  244. EditorCommand::ResetOperations { .. } => "ResetOperations",
  245. EditorCommand::TransformOperations { .. } => "TransformOperations",
  246. EditorCommand::Insert { .. } => "Insert",
  247. EditorCommand::Delete { .. } => "Delete",
  248. EditorCommand::Format { .. } => "Format",
  249. EditorCommand::Replace { .. } => "Replace",
  250. EditorCommand::CanUndo { .. } => "CanUndo",
  251. EditorCommand::CanRedo { .. } => "CanRedo",
  252. EditorCommand::Undo { .. } => "Undo",
  253. EditorCommand::Redo { .. } => "Redo",
  254. EditorCommand::GetOperationsString { .. } => "StringifyOperations",
  255. EditorCommand::GetOperations { .. } => "ReadOperations",
  256. };
  257. f.write_str(s)
  258. }
  259. }