queue.rs 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. use crate::web_socket::{DocumentResolveOperations, EditorCommandReceiver};
  2. use crate::DocumentUser;
  3. use async_stream::stream;
  4. use flowy_error::FlowyError;
  5. use flowy_revision::{OperationsMD5, RevisionManager, TransformOperations};
  6. use flowy_sync::{
  7. client_document::{history::UndoResult, ClientDocument},
  8. entities::revision::{RevId, Revision},
  9. errors::CollaborateError,
  10. };
  11. use futures::stream::StreamExt;
  12. use lib_ot::core::AttributeEntry;
  13. use lib_ot::{
  14. core::{Interval, OperationTransform},
  15. text_delta::TextOperations,
  16. };
  17. use std::sync::Arc;
  18. use tokio::sync::{oneshot, RwLock};
  19. // The EditorCommandQueue executes each command that will alter the document in
  20. // serial.
  21. pub(crate) struct EditDocumentQueue {
  22. document: Arc<RwLock<ClientDocument>>,
  23. user: Arc<dyn DocumentUser>,
  24. rev_manager: Arc<RevisionManager>,
  25. receiver: Option<EditorCommandReceiver>,
  26. }
  27. impl EditDocumentQueue {
  28. pub(crate) fn new(
  29. user: Arc<dyn DocumentUser>,
  30. rev_manager: Arc<RevisionManager>,
  31. operations: TextOperations,
  32. receiver: EditorCommandReceiver,
  33. ) -> Self {
  34. let document = Arc::new(RwLock::new(ClientDocument::from_operations(operations)));
  35. Self {
  36. document,
  37. user,
  38. rev_manager,
  39. receiver: Some(receiver),
  40. }
  41. }
  42. pub(crate) async fn run(mut self) {
  43. let mut receiver = self.receiver.take().expect("Should only call once");
  44. let stream = stream! {
  45. loop {
  46. match receiver.recv().await {
  47. Some(msg) => yield msg,
  48. None => break,
  49. }
  50. }
  51. };
  52. stream
  53. .for_each(|command| async {
  54. match self.handle_command(command).await {
  55. Ok(_) => {}
  56. Err(e) => tracing::debug!("[EditCommandQueue]: {}", e),
  57. }
  58. })
  59. .await;
  60. }
  61. #[tracing::instrument(level = "trace", skip(self), err)]
  62. async fn handle_command(&self, command: EditorCommand) -> Result<(), FlowyError> {
  63. match command {
  64. EditorCommand::ComposeLocalOperations { operations, ret } => {
  65. let mut document = self.document.write().await;
  66. let _ = document.compose_operations(operations.clone())?;
  67. let md5 = document.md5();
  68. drop(document);
  69. let _ = self.save_local_operations(operations, md5).await?;
  70. let _ = ret.send(Ok(()));
  71. }
  72. EditorCommand::ComposeRemoteOperation { client_operations, ret } => {
  73. let mut document = self.document.write().await;
  74. let _ = document.compose_operations(client_operations.clone())?;
  75. let md5 = document.md5();
  76. drop(document);
  77. let _ = ret.send(Ok(md5));
  78. }
  79. EditorCommand::ResetOperations { operations, ret } => {
  80. let mut document = self.document.write().await;
  81. let _ = document.set_operations(operations);
  82. let md5 = document.md5();
  83. drop(document);
  84. let _ = ret.send(Ok(md5));
  85. }
  86. EditorCommand::TransformOperations { operations, ret } => {
  87. let f = || async {
  88. let read_guard = self.document.read().await;
  89. let mut server_operations: Option<DocumentResolveOperations> = None;
  90. let client_operations: TextOperations;
  91. if read_guard.is_empty() {
  92. // Do nothing
  93. client_operations = operations;
  94. } else {
  95. let (s_prime, c_prime) = read_guard.get_operations().transform(&operations)?;
  96. client_operations = c_prime;
  97. server_operations = Some(DocumentResolveOperations(s_prime));
  98. }
  99. drop(read_guard);
  100. Ok::<TextTransformOperations, CollaborateError>(TransformOperations {
  101. client_operations: DocumentResolveOperations(client_operations),
  102. server_operations,
  103. })
  104. };
  105. let _ = ret.send(f().await);
  106. }
  107. EditorCommand::Insert { index, data, ret } => {
  108. let mut write_guard = self.document.write().await;
  109. let operations = write_guard.insert(index, data)?;
  110. let md5 = write_guard.md5();
  111. let _ = self.save_local_operations(operations, md5).await?;
  112. let _ = ret.send(Ok(()));
  113. }
  114. EditorCommand::Delete { interval, ret } => {
  115. let mut write_guard = self.document.write().await;
  116. let operations = write_guard.delete(interval)?;
  117. let md5 = write_guard.md5();
  118. let _ = self.save_local_operations(operations, md5).await?;
  119. let _ = ret.send(Ok(()));
  120. }
  121. EditorCommand::Format {
  122. interval,
  123. attribute,
  124. ret,
  125. } => {
  126. let mut write_guard = self.document.write().await;
  127. let operations = write_guard.format(interval, attribute)?;
  128. let md5 = write_guard.md5();
  129. let _ = self.save_local_operations(operations, md5).await?;
  130. let _ = ret.send(Ok(()));
  131. }
  132. EditorCommand::Replace { interval, data, ret } => {
  133. let mut write_guard = self.document.write().await;
  134. let operations = write_guard.replace(interval, data)?;
  135. let md5 = write_guard.md5();
  136. let _ = self.save_local_operations(operations, md5).await?;
  137. let _ = ret.send(Ok(()));
  138. }
  139. EditorCommand::CanUndo { ret } => {
  140. let _ = ret.send(self.document.read().await.can_undo());
  141. }
  142. EditorCommand::CanRedo { ret } => {
  143. let _ = ret.send(self.document.read().await.can_redo());
  144. }
  145. EditorCommand::Undo { ret } => {
  146. let mut write_guard = self.document.write().await;
  147. let UndoResult { operations } = write_guard.undo()?;
  148. let md5 = write_guard.md5();
  149. let _ = self.save_local_operations(operations, md5).await?;
  150. let _ = ret.send(Ok(()));
  151. }
  152. EditorCommand::Redo { ret } => {
  153. let mut write_guard = self.document.write().await;
  154. let UndoResult { operations } = write_guard.redo()?;
  155. let md5 = write_guard.md5();
  156. let _ = self.save_local_operations(operations, md5).await?;
  157. let _ = ret.send(Ok(()));
  158. }
  159. EditorCommand::StringifyOperations { ret } => {
  160. let data = self.document.read().await.get_operations_json();
  161. let _ = ret.send(Ok(data));
  162. }
  163. EditorCommand::ReadOperations { ret } => {
  164. let operations = self.document.read().await.get_operations().clone();
  165. let _ = ret.send(Ok(operations));
  166. }
  167. }
  168. Ok(())
  169. }
  170. async fn save_local_operations(&self, operations: TextOperations, md5: String) -> Result<RevId, FlowyError> {
  171. let bytes = operations.json_bytes();
  172. let (base_rev_id, rev_id) = self.rev_manager.next_rev_id_pair();
  173. let user_id = self.user.user_id()?;
  174. let revision = Revision::new(&self.rev_manager.object_id, base_rev_id, rev_id, bytes, &user_id, md5);
  175. let _ = self.rev_manager.add_local_revision(&revision).await?;
  176. Ok(rev_id.into())
  177. }
  178. }
  179. pub type TextTransformOperations = TransformOperations<DocumentResolveOperations>;
  180. pub(crate) type Ret<T> = oneshot::Sender<Result<T, CollaborateError>>;
  181. pub(crate) enum EditorCommand {
  182. ComposeLocalOperations {
  183. operations: TextOperations,
  184. ret: Ret<()>,
  185. },
  186. ComposeRemoteOperation {
  187. client_operations: TextOperations,
  188. ret: Ret<OperationsMD5>,
  189. },
  190. ResetOperations {
  191. operations: TextOperations,
  192. ret: Ret<OperationsMD5>,
  193. },
  194. TransformOperations {
  195. operations: TextOperations,
  196. ret: Ret<TextTransformOperations>,
  197. },
  198. Insert {
  199. index: usize,
  200. data: String,
  201. ret: Ret<()>,
  202. },
  203. Delete {
  204. interval: Interval,
  205. ret: Ret<()>,
  206. },
  207. Format {
  208. interval: Interval,
  209. attribute: AttributeEntry,
  210. ret: Ret<()>,
  211. },
  212. Replace {
  213. interval: Interval,
  214. data: String,
  215. ret: Ret<()>,
  216. },
  217. CanUndo {
  218. ret: oneshot::Sender<bool>,
  219. },
  220. CanRedo {
  221. ret: oneshot::Sender<bool>,
  222. },
  223. Undo {
  224. ret: Ret<()>,
  225. },
  226. Redo {
  227. ret: Ret<()>,
  228. },
  229. StringifyOperations {
  230. ret: Ret<String>,
  231. },
  232. #[allow(dead_code)]
  233. ReadOperations {
  234. ret: Ret<TextOperations>,
  235. },
  236. }
  237. impl std::fmt::Debug for EditorCommand {
  238. fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
  239. let s = match self {
  240. EditorCommand::ComposeLocalOperations { .. } => "ComposeLocalOperations",
  241. EditorCommand::ComposeRemoteOperation { .. } => "ComposeRemoteOperation",
  242. EditorCommand::ResetOperations { .. } => "ResetOperations",
  243. EditorCommand::TransformOperations { .. } => "TransformOperations",
  244. EditorCommand::Insert { .. } => "Insert",
  245. EditorCommand::Delete { .. } => "Delete",
  246. EditorCommand::Format { .. } => "Format",
  247. EditorCommand::Replace { .. } => "Replace",
  248. EditorCommand::CanUndo { .. } => "CanUndo",
  249. EditorCommand::CanRedo { .. } => "CanRedo",
  250. EditorCommand::Undo { .. } => "Undo",
  251. EditorCommand::Redo { .. } => "Redo",
  252. EditorCommand::StringifyOperations { .. } => "StringifyOperations",
  253. EditorCommand::ReadOperations { .. } => "ReadOperations",
  254. };
  255. f.write_str(s)
  256. }
  257. }