queue.rs 10.0 KB


  1. use crate::web_socket::EditorCommandReceiver;
  2. use crate::TextBlockUser;
  3. use async_stream::stream;
  4. use bytes::Bytes;
  5. use flowy_collaboration::util::make_delta_from_revisions;
  6. use flowy_collaboration::{
  7. client_document::{history::UndoResult, ClientDocument},
  8. entities::revision::{RevId, Revision},
  9. errors::CollaborateError,
  10. };
  11. use flowy_error::{FlowyError, FlowyResult};
  12. use flowy_sync::{DeltaMD5, RevisionCompactor, RevisionManager, RichTextTransformDeltas, TransformDeltas};
  13. use futures::stream::StreamExt;
  14. use lib_ot::{
  15. core::{Interval, OperationTransformable},
  16. rich_text::{RichTextAttribute, RichTextAttributes, RichTextDelta},
  17. };
  18. use std::sync::Arc;
  19. use tokio::sync::{oneshot, RwLock};
  20. // The EditorCommandQueue executes each command that will alter the document in
  21. // serial.
  22. pub(crate) struct EditBlockQueue {
  23. document: Arc<RwLock<ClientDocument>>,
  24. user: Arc<dyn TextBlockUser>,
  25. rev_manager: Arc<RevisionManager>,
  26. receiver: Option<EditorCommandReceiver>,
  27. }
  28. impl EditBlockQueue {
  29. pub(crate) fn new(
  30. user: Arc<dyn TextBlockUser>,
  31. rev_manager: Arc<RevisionManager>,
  32. delta: RichTextDelta,
  33. receiver: EditorCommandReceiver,
  34. ) -> Self {
  35. let document = Arc::new(RwLock::new(ClientDocument::from_delta(delta)));
  36. Self {
  37. document,
  38. user,
  39. rev_manager,
  40. receiver: Some(receiver),
  41. }
  42. }
  43. pub(crate) async fn run(mut self) {
  44. let mut receiver = self.receiver.take().expect("Should only call once");
  45. let stream = stream! {
  46. loop {
  47. match receiver.recv().await {
  48. Some(msg) => yield msg,
  49. None => break,
  50. }
  51. }
  52. };
  53. stream
  54. .for_each(|command| async {
  55. match self.handle_command(command).await {
  56. Ok(_) => {}
  57. Err(e) => tracing::debug!("[EditCommandQueue]: {}", e),
  58. }
  59. })
  60. .await;
  61. }
  62. #[tracing::instrument(level = "trace", skip(self), err)]
  63. async fn handle_command(&self, command: EditorCommand) -> Result<(), FlowyError> {
  64. match command {
  65. EditorCommand::ComposeLocalDelta { delta, ret } => {
  66. let mut document = self.document.write().await;
  67. let _ = document.compose_delta(delta.clone())?;
  68. let md5 = document.md5();
  69. drop(document);
  70. let _ = self.save_local_delta(delta, md5).await?;
  71. let _ = ret.send(Ok(()));
  72. }
  73. EditorCommand::ComposeRemoteDelta { client_delta, ret } => {
  74. let mut document = self.document.write().await;
  75. let _ = document.compose_delta(client_delta.clone())?;
  76. let md5 = document.md5();
  77. drop(document);
  78. let _ = ret.send(Ok(md5));
  79. }
  80. EditorCommand::ResetDelta { delta, ret } => {
  81. let mut document = self.document.write().await;
  82. let _ = document.set_delta(delta);
  83. let md5 = document.md5();
  84. drop(document);
  85. let _ = ret.send(Ok(md5));
  86. }
  87. EditorCommand::TransformDelta { delta, ret } => {
  88. let f = || async {
  89. let read_guard = self.document.read().await;
  90. let mut server_prime: Option<RichTextDelta> = None;
  91. let client_prime: RichTextDelta;
  92. if read_guard.is_empty() {
  93. // Do nothing
  94. client_prime = delta;
  95. } else {
  96. let (s_prime, c_prime) = read_guard.delta().transform(&delta)?;
  97. client_prime = c_prime;
  98. server_prime = Some(s_prime);
  99. }
  100. drop(read_guard);
  101. Ok::<RichTextTransformDeltas, CollaborateError>(TransformDeltas {
  102. client_prime,
  103. server_prime,
  104. })
  105. };
  106. let _ = ret.send(f().await);
  107. }
  108. EditorCommand::Insert { index, data, ret } => {
  109. let mut write_guard = self.document.write().await;
  110. let delta = write_guard.insert(index, data)?;
  111. let md5 = write_guard.md5();
  112. let _ = self.save_local_delta(delta, md5).await?;
  113. let _ = ret.send(Ok(()));
  114. }
  115. EditorCommand::Delete { interval, ret } => {
  116. let mut write_guard = self.document.write().await;
  117. let delta = write_guard.delete(interval)?;
  118. let md5 = write_guard.md5();
  119. let _ = self.save_local_delta(delta, md5).await?;
  120. let _ = ret.send(Ok(()));
  121. }
  122. EditorCommand::Format {
  123. interval,
  124. attribute,
  125. ret,
  126. } => {
  127. let mut write_guard = self.document.write().await;
  128. let delta = write_guard.format(interval, attribute)?;
  129. let md5 = write_guard.md5();
  130. let _ = self.save_local_delta(delta, md5).await?;
  131. let _ = ret.send(Ok(()));
  132. }
  133. EditorCommand::Replace { interval, data, ret } => {
  134. let mut write_guard = self.document.write().await;
  135. let delta = write_guard.replace(interval, data)?;
  136. let md5 = write_guard.md5();
  137. let _ = self.save_local_delta(delta, md5).await?;
  138. let _ = ret.send(Ok(()));
  139. }
  140. EditorCommand::CanUndo { ret } => {
  141. let _ = ret.send(self.document.read().await.can_undo());
  142. }
  143. EditorCommand::CanRedo { ret } => {
  144. let _ = ret.send(self.document.read().await.can_redo());
  145. }
  146. EditorCommand::Undo { ret } => {
  147. let mut write_guard = self.document.write().await;
  148. let UndoResult { delta } = write_guard.undo()?;
  149. let md5 = write_guard.md5();
  150. let _ = self.save_local_delta(delta, md5).await?;
  151. let _ = ret.send(Ok(()));
  152. }
  153. EditorCommand::Redo { ret } => {
  154. let mut write_guard = self.document.write().await;
  155. let UndoResult { delta } = write_guard.redo()?;
  156. let md5 = write_guard.md5();
  157. let _ = self.save_local_delta(delta, md5).await?;
  158. let _ = ret.send(Ok(()));
  159. }
  160. EditorCommand::ReadDeltaStr { ret } => {
  161. let data = self.document.read().await.delta_str();
  162. let _ = ret.send(Ok(data));
  163. }
  164. EditorCommand::ReadDelta { ret } => {
  165. let delta = self.document.read().await.delta().clone();
  166. let _ = ret.send(Ok(delta));
  167. }
  168. }
  169. Ok(())
  170. }
  171. async fn save_local_delta(&self, delta: RichTextDelta, md5: String) -> Result<RevId, FlowyError> {
  172. let delta_data = delta.to_bytes();
  173. let (base_rev_id, rev_id) = self.rev_manager.next_rev_id_pair();
  174. let user_id = self.user.user_id()?;
  175. let revision = Revision::new(
  176. &self.rev_manager.object_id,
  177. base_rev_id,
  178. rev_id,
  179. delta_data,
  180. &user_id,
  181. md5,
  182. );
  183. let _ = self
  184. .rev_manager
  185. .add_local_revision(&revision, Box::new(TextBlockRevisionCompactor()))
  186. .await?;
  187. Ok(rev_id.into())
  188. }
  189. }
  190. pub(crate) struct TextBlockRevisionCompactor();
  191. impl RevisionCompactor for TextBlockRevisionCompactor {
  192. fn bytes_from_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
  193. let delta = make_delta_from_revisions::<RichTextAttributes>(revisions)?;
  194. Ok(delta.to_bytes())
  195. }
  196. }
  197. pub(crate) type Ret<T> = oneshot::Sender<Result<T, CollaborateError>>;
  198. pub(crate) enum EditorCommand {
  199. ComposeLocalDelta {
  200. delta: RichTextDelta,
  201. ret: Ret<()>,
  202. },
  203. ComposeRemoteDelta {
  204. client_delta: RichTextDelta,
  205. ret: Ret<DeltaMD5>,
  206. },
  207. ResetDelta {
  208. delta: RichTextDelta,
  209. ret: Ret<DeltaMD5>,
  210. },
  211. TransformDelta {
  212. delta: RichTextDelta,
  213. ret: Ret<RichTextTransformDeltas>,
  214. },
  215. Insert {
  216. index: usize,
  217. data: String,
  218. ret: Ret<()>,
  219. },
  220. Delete {
  221. interval: Interval,
  222. ret: Ret<()>,
  223. },
  224. Format {
  225. interval: Interval,
  226. attribute: RichTextAttribute,
  227. ret: Ret<()>,
  228. },
  229. Replace {
  230. interval: Interval,
  231. data: String,
  232. ret: Ret<()>,
  233. },
  234. CanUndo {
  235. ret: oneshot::Sender<bool>,
  236. },
  237. CanRedo {
  238. ret: oneshot::Sender<bool>,
  239. },
  240. Undo {
  241. ret: Ret<()>,
  242. },
  243. Redo {
  244. ret: Ret<()>,
  245. },
  246. ReadDeltaStr {
  247. ret: Ret<String>,
  248. },
  249. #[allow(dead_code)]
  250. ReadDelta {
  251. ret: Ret<RichTextDelta>,
  252. },
  253. }
  254. impl std::fmt::Debug for EditorCommand {
  255. fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
  256. let s = match self {
  257. EditorCommand::ComposeLocalDelta { .. } => "ComposeLocalDelta",
  258. EditorCommand::ComposeRemoteDelta { .. } => "ComposeRemoteDelta",
  259. EditorCommand::ResetDelta { .. } => "ResetDelta",
  260. EditorCommand::TransformDelta { .. } => "TransformDelta",
  261. EditorCommand::Insert { .. } => "Insert",
  262. EditorCommand::Delete { .. } => "Delete",
  263. EditorCommand::Format { .. } => "Format",
  264. EditorCommand::Replace { .. } => "Replace",
  265. EditorCommand::CanUndo { .. } => "CanUndo",
  266. EditorCommand::CanRedo { .. } => "CanRedo",
  267. EditorCommand::Undo { .. } => "Undo",
  268. EditorCommand::Redo { .. } => "Redo",
  269. EditorCommand::ReadDeltaStr { .. } => "ReadDeltaStr",
  270. EditorCommand::ReadDelta { .. } => "ReadDocumentAsDelta",
  271. };
  272. f.write_str(s)
  273. }
  274. }