queue.rs 11 KB


  1. use crate::web_socket::EditorCommandReceiver;
  2. use crate::BlockUser;
  3. use async_stream::stream;
  4. use flowy_collaboration::util::make_delta_from_revisions;
  5. use flowy_collaboration::{
  6. client_document::{history::UndoResult, ClientDocument},
  7. entities::revision::{RevId, Revision},
  8. errors::CollaborateError,
  9. };
  10. use flowy_error::{FlowyError, FlowyResult};
  11. use flowy_sync::{DeltaMD5, RevisionCompact, RevisionManager, RichTextTransformDeltas, TransformDeltas};
  12. use futures::stream::StreamExt;
  13. use lib_ot::{
  14. core::{Interval, OperationTransformable},
  15. rich_text::{RichTextAttribute, RichTextAttributes, RichTextDelta},
  16. };
  17. use std::sync::Arc;
  18. use tokio::sync::{oneshot, RwLock};
  19. // The EditorCommandQueue executes each command that will alter the document in
  20. // serial.
  21. pub(crate) struct EditBlockQueue {
  22. document: Arc<RwLock<ClientDocument>>,
  23. user: Arc<dyn BlockUser>,
  24. rev_manager: Arc<RevisionManager>,
  25. receiver: Option<EditorCommandReceiver>,
  26. }
  27. impl EditBlockQueue {
  28. pub(crate) fn new(
  29. user: Arc<dyn BlockUser>,
  30. rev_manager: Arc<RevisionManager>,
  31. delta: RichTextDelta,
  32. receiver: EditorCommandReceiver,
  33. ) -> Self {
  34. let document = Arc::new(RwLock::new(ClientDocument::from_delta(delta)));
  35. Self {
  36. document,
  37. user,
  38. rev_manager,
  39. receiver: Some(receiver),
  40. }
  41. }
  42. pub(crate) async fn run(mut self) {
  43. let mut receiver = self.receiver.take().expect("Should only call once");
  44. let stream = stream! {
  45. loop {
  46. match receiver.recv().await {
  47. Some(msg) => yield msg,
  48. None => break,
  49. }
  50. }
  51. };
  52. stream
  53. .for_each(|command| async {
  54. match self.handle_command(command).await {
  55. Ok(_) => {}
  56. Err(e) => tracing::debug!("[EditCommandQueue]: {}", e),
  57. }
  58. })
  59. .await;
  60. }
  61. #[tracing::instrument(level = "trace", skip(self), err)]
  62. async fn handle_command(&self, command: EditorCommand) -> Result<(), FlowyError> {
  63. match command {
  64. EditorCommand::ComposeLocalDelta { delta, ret } => {
  65. let mut document = self.document.write().await;
  66. let _ = document.compose_delta(delta.clone())?;
  67. let md5 = document.md5();
  68. drop(document);
  69. let _ = self.save_local_delta(delta, md5).await?;
  70. let _ = ret.send(Ok(()));
  71. }
  72. EditorCommand::ComposeRemoteDelta { client_delta, ret } => {
  73. let mut document = self.document.write().await;
  74. let _ = document.compose_delta(client_delta.clone())?;
  75. let md5 = document.md5();
  76. drop(document);
  77. let _ = ret.send(Ok(md5));
  78. }
  79. EditorCommand::ResetDelta { delta, ret } => {
  80. let mut document = self.document.write().await;
  81. let _ = document.set_delta(delta);
  82. let md5 = document.md5();
  83. drop(document);
  84. let _ = ret.send(Ok(md5));
  85. }
  86. EditorCommand::TransformDelta { delta, ret } => {
  87. let f = || async {
  88. let read_guard = self.document.read().await;
  89. let mut server_prime: Option<RichTextDelta> = None;
  90. let client_prime: RichTextDelta;
  91. if read_guard.is_empty() {
  92. // Do nothing
  93. client_prime = delta;
  94. } else {
  95. let (s_prime, c_prime) = read_guard.delta().transform(&delta)?;
  96. client_prime = c_prime;
  97. server_prime = Some(s_prime);
  98. }
  99. drop(read_guard);
  100. Ok::<RichTextTransformDeltas, CollaborateError>(TransformDeltas {
  101. client_prime,
  102. server_prime,
  103. })
  104. };
  105. let _ = ret.send(f().await);
  106. }
  107. EditorCommand::Insert { index, data, ret } => {
  108. let mut write_guard = self.document.write().await;
  109. let delta = write_guard.insert(index, data)?;
  110. let md5 = write_guard.md5();
  111. let _ = self.save_local_delta(delta, md5).await?;
  112. let _ = ret.send(Ok(()));
  113. }
  114. EditorCommand::Delete { interval, ret } => {
  115. let mut write_guard = self.document.write().await;
  116. let delta = write_guard.delete(interval)?;
  117. let md5 = write_guard.md5();
  118. let _ = self.save_local_delta(delta, md5).await?;
  119. let _ = ret.send(Ok(()));
  120. }
  121. EditorCommand::Format {
  122. interval,
  123. attribute,
  124. ret,
  125. } => {
  126. let mut write_guard = self.document.write().await;
  127. let delta = write_guard.format(interval, attribute)?;
  128. let md5 = write_guard.md5();
  129. let _ = self.save_local_delta(delta, md5).await?;
  130. let _ = ret.send(Ok(()));
  131. }
  132. EditorCommand::Replace { interval, data, ret } => {
  133. let mut write_guard = self.document.write().await;
  134. let delta = write_guard.replace(interval, data)?;
  135. let md5 = write_guard.md5();
  136. let _ = self.save_local_delta(delta, md5).await?;
  137. let _ = ret.send(Ok(()));
  138. }
  139. EditorCommand::CanUndo { ret } => {
  140. let _ = ret.send(self.document.read().await.can_undo());
  141. }
  142. EditorCommand::CanRedo { ret } => {
  143. let _ = ret.send(self.document.read().await.can_redo());
  144. }
  145. EditorCommand::Undo { ret } => {
  146. let mut write_guard = self.document.write().await;
  147. let UndoResult { delta } = write_guard.undo()?;
  148. let md5 = write_guard.md5();
  149. let _ = self.save_local_delta(delta, md5).await?;
  150. let _ = ret.send(Ok(()));
  151. }
  152. EditorCommand::Redo { ret } => {
  153. let mut write_guard = self.document.write().await;
  154. let UndoResult { delta } = write_guard.redo()?;
  155. let md5 = write_guard.md5();
  156. let _ = self.save_local_delta(delta, md5).await?;
  157. let _ = ret.send(Ok(()));
  158. }
  159. EditorCommand::ReadBlockJson { ret } => {
  160. let data = self.document.read().await.to_json();
  161. let _ = ret.send(Ok(data));
  162. }
  163. EditorCommand::ReadBlockDelta { ret } => {
  164. let delta = self.document.read().await.delta().clone();
  165. let _ = ret.send(Ok(delta));
  166. }
  167. }
  168. Ok(())
  169. }
  170. async fn save_local_delta(&self, delta: RichTextDelta, md5: String) -> Result<RevId, FlowyError> {
  171. let delta_data = delta.to_bytes();
  172. let (base_rev_id, rev_id) = self.rev_manager.next_rev_id_pair();
  173. let user_id = self.user.user_id()?;
  174. let revision = Revision::new(
  175. &self.rev_manager.object_id,
  176. base_rev_id,
  177. rev_id,
  178. delta_data,
  179. &user_id,
  180. md5,
  181. );
  182. let _ = self
  183. .rev_manager
  184. .add_local_revision::<BlockRevisionCompact>(&revision)
  185. .await?;
  186. Ok(rev_id.into())
  187. }
  188. }
  189. pub(crate) struct BlockRevisionCompact();
  190. impl RevisionCompact for BlockRevisionCompact {
  191. fn compact_revisions(user_id: &str, object_id: &str, mut revisions: Vec<Revision>) -> FlowyResult<Revision> {
  192. if revisions.is_empty() {
  193. return Err(FlowyError::internal().context("Can't compact the empty block's revisions"));
  194. }
  195. if revisions.len() == 1 {
  196. return Ok(revisions.pop().unwrap());
  197. }
  198. let first_revision = revisions.first().unwrap();
  199. let last_revision = revisions.last().unwrap();
  200. let (base_rev_id, rev_id) = first_revision.pair_rev_id();
  201. let md5 = last_revision.md5.clone();
  202. let delta = make_delta_from_revisions::<RichTextAttributes>(revisions)?;
  203. let delta_data = delta.to_bytes();
  204. Ok(Revision::new(object_id, base_rev_id, rev_id, delta_data, user_id, md5))
  205. }
  206. }
  207. pub(crate) type Ret<T> = oneshot::Sender<Result<T, CollaborateError>>;
  208. pub(crate) enum EditorCommand {
  209. ComposeLocalDelta {
  210. delta: RichTextDelta,
  211. ret: Ret<()>,
  212. },
  213. ComposeRemoteDelta {
  214. client_delta: RichTextDelta,
  215. ret: Ret<DeltaMD5>,
  216. },
  217. ResetDelta {
  218. delta: RichTextDelta,
  219. ret: Ret<DeltaMD5>,
  220. },
  221. TransformDelta {
  222. delta: RichTextDelta,
  223. ret: Ret<RichTextTransformDeltas>,
  224. },
  225. Insert {
  226. index: usize,
  227. data: String,
  228. ret: Ret<()>,
  229. },
  230. Delete {
  231. interval: Interval,
  232. ret: Ret<()>,
  233. },
  234. Format {
  235. interval: Interval,
  236. attribute: RichTextAttribute,
  237. ret: Ret<()>,
  238. },
  239. Replace {
  240. interval: Interval,
  241. data: String,
  242. ret: Ret<()>,
  243. },
  244. CanUndo {
  245. ret: oneshot::Sender<bool>,
  246. },
  247. CanRedo {
  248. ret: oneshot::Sender<bool>,
  249. },
  250. Undo {
  251. ret: Ret<()>,
  252. },
  253. Redo {
  254. ret: Ret<()>,
  255. },
  256. ReadBlockJson {
  257. ret: Ret<String>,
  258. },
  259. #[allow(dead_code)]
  260. ReadBlockDelta {
  261. ret: Ret<RichTextDelta>,
  262. },
  263. }
  264. impl std::fmt::Debug for EditorCommand {
  265. fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
  266. let s = match self {
  267. EditorCommand::ComposeLocalDelta { .. } => "ComposeLocalDelta",
  268. EditorCommand::ComposeRemoteDelta { .. } => "ComposeRemoteDelta",
  269. EditorCommand::ResetDelta { .. } => "ResetDelta",
  270. EditorCommand::TransformDelta { .. } => "TransformDelta",
  271. EditorCommand::Insert { .. } => "Insert",
  272. EditorCommand::Delete { .. } => "Delete",
  273. EditorCommand::Format { .. } => "Format",
  274. EditorCommand::Replace { .. } => "Replace",
  275. EditorCommand::CanUndo { .. } => "CanUndo",
  276. EditorCommand::CanRedo { .. } => "CanRedo",
  277. EditorCommand::Undo { .. } => "Undo",
  278. EditorCommand::Redo { .. } => "Redo",
  279. EditorCommand::ReadBlockJson { .. } => "ReadDocumentAsJson",
  280. EditorCommand::ReadBlockDelta { .. } => "ReadDocumentAsDelta",
  281. };
  282. f.write_str(s)
  283. }
  284. }