queue.rs 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. use crate::web_socket::EditorCommandReceiver;
  2. use crate::TextEditorUser;
  3. use async_stream::stream;
  4. use bytes::Bytes;
  5. use flowy_error::{FlowyError, FlowyResult};
  6. use flowy_revision::{OperationsMD5, RevisionCompactor, RevisionManager, RichTextTransformDeltas, TransformDeltas};
  7. use flowy_sync::util::make_operations_from_revisions;
  8. use flowy_sync::{
  9. client_document::{history::UndoResult, ClientDocument},
  10. entities::revision::{RevId, Revision},
  11. errors::CollaborateError,
  12. };
  13. use futures::stream::StreamExt;
  14. use lib_ot::core::{AttributeEntry, AttributeHashMap};
  15. use lib_ot::{
  16. core::{Interval, OperationTransform},
  17. text_delta::TextOperations,
  18. };
  19. use std::sync::Arc;
  20. use tokio::sync::{oneshot, RwLock};
  21. // The EditorCommandQueue executes each command that will alter the document in
  22. // serial.
  23. pub(crate) struct EditBlockQueue {
  24. document: Arc<RwLock<ClientDocument>>,
  25. user: Arc<dyn TextEditorUser>,
  26. rev_manager: Arc<RevisionManager>,
  27. receiver: Option<EditorCommandReceiver>,
  28. }
  29. impl EditBlockQueue {
  30. pub(crate) fn new(
  31. user: Arc<dyn TextEditorUser>,
  32. rev_manager: Arc<RevisionManager>,
  33. delta: TextOperations,
  34. receiver: EditorCommandReceiver,
  35. ) -> Self {
  36. let document = Arc::new(RwLock::new(ClientDocument::from_operations(delta)));
  37. Self {
  38. document,
  39. user,
  40. rev_manager,
  41. receiver: Some(receiver),
  42. }
  43. }
  44. pub(crate) async fn run(mut self) {
  45. let mut receiver = self.receiver.take().expect("Should only call once");
  46. let stream = stream! {
  47. loop {
  48. match receiver.recv().await {
  49. Some(msg) => yield msg,
  50. None => break,
  51. }
  52. }
  53. };
  54. stream
  55. .for_each(|command| async {
  56. match self.handle_command(command).await {
  57. Ok(_) => {}
  58. Err(e) => tracing::debug!("[EditCommandQueue]: {}", e),
  59. }
  60. })
  61. .await;
  62. }
  63. #[tracing::instrument(level = "trace", skip(self), err)]
  64. async fn handle_command(&self, command: EditorCommand) -> Result<(), FlowyError> {
  65. match command {
  66. EditorCommand::ComposeLocalDelta { delta, ret } => {
  67. let mut document = self.document.write().await;
  68. let _ = document.compose_operations(delta.clone())?;
  69. let md5 = document.md5();
  70. drop(document);
  71. let _ = self.save_local_delta(delta, md5).await?;
  72. let _ = ret.send(Ok(()));
  73. }
  74. EditorCommand::ComposeRemoteDelta { client_delta, ret } => {
  75. let mut document = self.document.write().await;
  76. let _ = document.compose_operations(client_delta.clone())?;
  77. let md5 = document.md5();
  78. drop(document);
  79. let _ = ret.send(Ok(md5));
  80. }
  81. EditorCommand::ResetDelta { delta, ret } => {
  82. let mut document = self.document.write().await;
  83. let _ = document.set_operations(delta);
  84. let md5 = document.md5();
  85. drop(document);
  86. let _ = ret.send(Ok(md5));
  87. }
  88. EditorCommand::TransformDelta { delta, ret } => {
  89. let f = || async {
  90. let read_guard = self.document.read().await;
  91. let mut server_prime: Option<TextOperations> = None;
  92. let client_prime: TextOperations;
  93. if read_guard.is_empty() {
  94. // Do nothing
  95. client_prime = delta;
  96. } else {
  97. let (s_prime, c_prime) = read_guard.get_operations().transform(&delta)?;
  98. client_prime = c_prime;
  99. server_prime = Some(s_prime);
  100. }
  101. drop(read_guard);
  102. Ok::<RichTextTransformDeltas, CollaborateError>(TransformDeltas {
  103. client_prime,
  104. server_prime,
  105. })
  106. };
  107. let _ = ret.send(f().await);
  108. }
  109. EditorCommand::Insert { index, data, ret } => {
  110. let mut write_guard = self.document.write().await;
  111. let delta = write_guard.insert(index, data)?;
  112. let md5 = write_guard.md5();
  113. let _ = self.save_local_delta(delta, md5).await?;
  114. let _ = ret.send(Ok(()));
  115. }
  116. EditorCommand::Delete { interval, ret } => {
  117. let mut write_guard = self.document.write().await;
  118. let delta = write_guard.delete(interval)?;
  119. let md5 = write_guard.md5();
  120. let _ = self.save_local_delta(delta, md5).await?;
  121. let _ = ret.send(Ok(()));
  122. }
  123. EditorCommand::Format {
  124. interval,
  125. attribute,
  126. ret,
  127. } => {
  128. let mut write_guard = self.document.write().await;
  129. let delta = write_guard.format(interval, attribute)?;
  130. let md5 = write_guard.md5();
  131. let _ = self.save_local_delta(delta, md5).await?;
  132. let _ = ret.send(Ok(()));
  133. }
  134. EditorCommand::Replace { interval, data, ret } => {
  135. let mut write_guard = self.document.write().await;
  136. let delta = write_guard.replace(interval, data)?;
  137. let md5 = write_guard.md5();
  138. let _ = self.save_local_delta(delta, md5).await?;
  139. let _ = ret.send(Ok(()));
  140. }
  141. EditorCommand::CanUndo { ret } => {
  142. let _ = ret.send(self.document.read().await.can_undo());
  143. }
  144. EditorCommand::CanRedo { ret } => {
  145. let _ = ret.send(self.document.read().await.can_redo());
  146. }
  147. EditorCommand::Undo { ret } => {
  148. let mut write_guard = self.document.write().await;
  149. let UndoResult { operations: delta } = write_guard.undo()?;
  150. let md5 = write_guard.md5();
  151. let _ = self.save_local_delta(delta, md5).await?;
  152. let _ = ret.send(Ok(()));
  153. }
  154. EditorCommand::Redo { ret } => {
  155. let mut write_guard = self.document.write().await;
  156. let UndoResult { operations: delta } = write_guard.redo()?;
  157. let md5 = write_guard.md5();
  158. let _ = self.save_local_delta(delta, md5).await?;
  159. let _ = ret.send(Ok(()));
  160. }
  161. EditorCommand::ReadDeltaStr { ret } => {
  162. let data = self.document.read().await.get_operations_json();
  163. let _ = ret.send(Ok(data));
  164. }
  165. EditorCommand::ReadDelta { ret } => {
  166. let delta = self.document.read().await.get_operations().clone();
  167. let _ = ret.send(Ok(delta));
  168. }
  169. }
  170. Ok(())
  171. }
  172. async fn save_local_delta(&self, delta: TextOperations, md5: String) -> Result<RevId, FlowyError> {
  173. let delta_data = delta.json_bytes();
  174. let (base_rev_id, rev_id) = self.rev_manager.next_rev_id_pair();
  175. let user_id = self.user.user_id()?;
  176. let revision = Revision::new(
  177. &self.rev_manager.object_id,
  178. base_rev_id,
  179. rev_id,
  180. delta_data,
  181. &user_id,
  182. md5,
  183. );
  184. let _ = self.rev_manager.add_local_revision(&revision).await?;
  185. Ok(rev_id.into())
  186. }
  187. }
  188. pub(crate) struct TextBlockRevisionCompactor();
  189. impl RevisionCompactor for TextBlockRevisionCompactor {
  190. fn bytes_from_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
  191. let delta = make_operations_from_revisions::<AttributeHashMap>(revisions)?;
  192. Ok(delta.json_bytes())
  193. }
  194. }
  195. pub(crate) type Ret<T> = oneshot::Sender<Result<T, CollaborateError>>;
  196. pub(crate) enum EditorCommand {
  197. ComposeLocalDelta {
  198. delta: TextOperations,
  199. ret: Ret<()>,
  200. },
  201. ComposeRemoteDelta {
  202. client_delta: TextOperations,
  203. ret: Ret<OperationsMD5>,
  204. },
  205. ResetDelta {
  206. delta: TextOperations,
  207. ret: Ret<OperationsMD5>,
  208. },
  209. TransformDelta {
  210. delta: TextOperations,
  211. ret: Ret<RichTextTransformDeltas>,
  212. },
  213. Insert {
  214. index: usize,
  215. data: String,
  216. ret: Ret<()>,
  217. },
  218. Delete {
  219. interval: Interval,
  220. ret: Ret<()>,
  221. },
  222. Format {
  223. interval: Interval,
  224. attribute: AttributeEntry,
  225. ret: Ret<()>,
  226. },
  227. Replace {
  228. interval: Interval,
  229. data: String,
  230. ret: Ret<()>,
  231. },
  232. CanUndo {
  233. ret: oneshot::Sender<bool>,
  234. },
  235. CanRedo {
  236. ret: oneshot::Sender<bool>,
  237. },
  238. Undo {
  239. ret: Ret<()>,
  240. },
  241. Redo {
  242. ret: Ret<()>,
  243. },
  244. ReadDeltaStr {
  245. ret: Ret<String>,
  246. },
  247. #[allow(dead_code)]
  248. ReadDelta {
  249. ret: Ret<TextOperations>,
  250. },
  251. }
  252. impl std::fmt::Debug for EditorCommand {
  253. fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
  254. let s = match self {
  255. EditorCommand::ComposeLocalDelta { .. } => "ComposeLocalDelta",
  256. EditorCommand::ComposeRemoteDelta { .. } => "ComposeRemoteDelta",
  257. EditorCommand::ResetDelta { .. } => "ResetDelta",
  258. EditorCommand::TransformDelta { .. } => "TransformDelta",
  259. EditorCommand::Insert { .. } => "Insert",
  260. EditorCommand::Delete { .. } => "Delete",
  261. EditorCommand::Format { .. } => "Format",
  262. EditorCommand::Replace { .. } => "Replace",
  263. EditorCommand::CanUndo { .. } => "CanUndo",
  264. EditorCommand::CanRedo { .. } => "CanRedo",
  265. EditorCommand::Undo { .. } => "Undo",
  266. EditorCommand::Redo { .. } => "Redo",
  267. EditorCommand::ReadDeltaStr { .. } => "ReadDeltaStr",
  268. EditorCommand::ReadDelta { .. } => "ReadDocumentAsDelta",
  269. };
  270. f.write_str(s)
  271. }
  272. }