block_editor.rs 8.4 KB


  1. use crate::queue::BlockRevisionCompact;
  2. use crate::web_socket::{make_block_ws_manager, EditorCommandSender};
  3. use crate::{
  4. errors::FlowyError,
  5. queue::{EditBlockQueue, EditorCommand},
  6. BlockUser,
  7. };
  8. use bytes::Bytes;
  9. use flowy_collaboration::entities::ws_data::ServerRevisionWSData;
  10. use flowy_collaboration::{
  11. entities::{document_info::BlockInfo, revision::Revision},
  12. errors::CollaborateResult,
  13. util::make_delta_from_revisions,
  14. };
  15. use flowy_error::{internal_error, FlowyResult};
  16. use flowy_sync::{
  17. RevisionCloudService, RevisionManager, RevisionObjectBuilder, RevisionWebSocket, RevisionWebSocketManager,
  18. };
  19. use lib_ot::{
  20. core::{Interval, Operation},
  21. rich_text::{RichTextAttribute, RichTextDelta},
  22. };
  23. use lib_ws::WSConnectState;
  24. use std::sync::Arc;
  25. use tokio::sync::{mpsc, oneshot};
  26. pub struct ClientBlockEditor {
  27. pub doc_id: String,
  28. #[allow(dead_code)]
  29. rev_manager: Arc<RevisionManager>,
  30. ws_manager: Arc<RevisionWebSocketManager>,
  31. edit_cmd_tx: EditorCommandSender,
  32. }
  33. impl ClientBlockEditor {
  34. pub(crate) async fn new(
  35. doc_id: &str,
  36. user: Arc<dyn BlockUser>,
  37. mut rev_manager: RevisionManager,
  38. rev_web_socket: Arc<dyn RevisionWebSocket>,
  39. cloud_service: Arc<dyn RevisionCloudService>,
  40. ) -> FlowyResult<Arc<Self>> {
  41. let document_info = rev_manager
  42. .load::<BlockInfoBuilder, BlockRevisionCompact>(cloud_service)
  43. .await?;
  44. let delta = document_info.delta()?;
  45. let rev_manager = Arc::new(rev_manager);
  46. let doc_id = doc_id.to_string();
  47. let user_id = user.user_id()?;
  48. let edit_cmd_tx = spawn_edit_queue(user, rev_manager.clone(), delta);
  49. let ws_manager = make_block_ws_manager(
  50. doc_id.clone(),
  51. user_id.clone(),
  52. edit_cmd_tx.clone(),
  53. rev_manager.clone(),
  54. rev_web_socket,
  55. )
  56. .await;
  57. let editor = Arc::new(Self {
  58. doc_id,
  59. rev_manager,
  60. ws_manager,
  61. edit_cmd_tx,
  62. });
  63. Ok(editor)
  64. }
  65. pub async fn insert<T: ToString>(&self, index: usize, data: T) -> Result<(), FlowyError> {
  66. let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
  67. let msg = EditorCommand::Insert {
  68. index,
  69. data: data.to_string(),
  70. ret,
  71. };
  72. let _ = self.edit_cmd_tx.send(msg).await;
  73. let _ = rx.await.map_err(internal_error)??;
  74. Ok(())
  75. }
  76. pub async fn delete(&self, interval: Interval) -> Result<(), FlowyError> {
  77. let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
  78. let msg = EditorCommand::Delete { interval, ret };
  79. let _ = self.edit_cmd_tx.send(msg).await;
  80. let _ = rx.await.map_err(internal_error)??;
  81. Ok(())
  82. }
  83. pub async fn format(&self, interval: Interval, attribute: RichTextAttribute) -> Result<(), FlowyError> {
  84. let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
  85. let msg = EditorCommand::Format {
  86. interval,
  87. attribute,
  88. ret,
  89. };
  90. let _ = self.edit_cmd_tx.send(msg).await;
  91. let _ = rx.await.map_err(internal_error)??;
  92. Ok(())
  93. }
  94. pub async fn replace<T: ToString>(&self, interval: Interval, data: T) -> Result<(), FlowyError> {
  95. let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
  96. let msg = EditorCommand::Replace {
  97. interval,
  98. data: data.to_string(),
  99. ret,
  100. };
  101. let _ = self.edit_cmd_tx.send(msg).await;
  102. let _ = rx.await.map_err(internal_error)??;
  103. Ok(())
  104. }
  105. pub async fn can_undo(&self) -> bool {
  106. let (ret, rx) = oneshot::channel::<bool>();
  107. let msg = EditorCommand::CanUndo { ret };
  108. let _ = self.edit_cmd_tx.send(msg).await;
  109. rx.await.unwrap_or(false)
  110. }
  111. pub async fn can_redo(&self) -> bool {
  112. let (ret, rx) = oneshot::channel::<bool>();
  113. let msg = EditorCommand::CanRedo { ret };
  114. let _ = self.edit_cmd_tx.send(msg).await;
  115. rx.await.unwrap_or(false)
  116. }
  117. pub async fn undo(&self) -> Result<(), FlowyError> {
  118. let (ret, rx) = oneshot::channel();
  119. let msg = EditorCommand::Undo { ret };
  120. let _ = self.edit_cmd_tx.send(msg).await;
  121. let _ = rx.await.map_err(internal_error)??;
  122. Ok(())
  123. }
  124. pub async fn redo(&self) -> Result<(), FlowyError> {
  125. let (ret, rx) = oneshot::channel();
  126. let msg = EditorCommand::Redo { ret };
  127. let _ = self.edit_cmd_tx.send(msg).await;
  128. let _ = rx.await.map_err(internal_error)??;
  129. Ok(())
  130. }
  131. pub async fn block_json(&self) -> FlowyResult<String> {
  132. let (ret, rx) = oneshot::channel::<CollaborateResult<String>>();
  133. let msg = EditorCommand::ReadBlockJson { ret };
  134. let _ = self.edit_cmd_tx.send(msg).await;
  135. let json = rx.await.map_err(internal_error)??;
  136. Ok(json)
  137. }
  138. #[tracing::instrument(level = "trace", skip(self, data), err)]
  139. pub(crate) async fn compose_local_delta(&self, data: Bytes) -> Result<(), FlowyError> {
  140. let delta = RichTextDelta::from_bytes(&data)?;
  141. let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
  142. let msg = EditorCommand::ComposeLocalDelta {
  143. delta: delta.clone(),
  144. ret,
  145. };
  146. let _ = self.edit_cmd_tx.send(msg).await;
  147. let _ = rx.await.map_err(internal_error)??;
  148. Ok(())
  149. }
  150. pub fn stop(&self) {
  151. self.ws_manager.stop();
  152. }
  153. pub(crate) async fn receive_ws_data(&self, data: ServerRevisionWSData) -> Result<(), FlowyError> {
  154. self.ws_manager.receive_ws_data(data).await
  155. }
  156. pub(crate) fn receive_ws_state(&self, state: &WSConnectState) {
  157. self.ws_manager.connect_state_changed(state.clone());
  158. }
  159. }
  160. impl std::ops::Drop for ClientBlockEditor {
  161. fn drop(&mut self) {
  162. tracing::trace!("{} ClientBlockEditor was dropped", self.doc_id)
  163. }
  164. }
  165. // The edit queue will exit after the EditorCommandSender was dropped.
  166. fn spawn_edit_queue(
  167. user: Arc<dyn BlockUser>,
  168. rev_manager: Arc<RevisionManager>,
  169. delta: RichTextDelta,
  170. ) -> EditorCommandSender {
  171. let (sender, receiver) = mpsc::channel(1000);
  172. let edit_queue = EditBlockQueue::new(user, rev_manager, delta, receiver);
  173. tokio::spawn(edit_queue.run());
  174. sender
  175. }
  176. #[cfg(feature = "flowy_unit_test")]
  177. impl ClientBlockEditor {
  178. pub async fn doc_json(&self) -> FlowyResult<String> {
  179. let (ret, rx) = oneshot::channel::<CollaborateResult<String>>();
  180. let msg = EditorCommand::ReadBlockJson { ret };
  181. let _ = self.edit_cmd_tx.send(msg).await;
  182. let s = rx.await.map_err(internal_error)??;
  183. Ok(s)
  184. }
  185. pub async fn doc_delta(&self) -> FlowyResult<RichTextDelta> {
  186. let (ret, rx) = oneshot::channel::<CollaborateResult<RichTextDelta>>();
  187. let msg = EditorCommand::ReadBlockDelta { ret };
  188. let _ = self.edit_cmd_tx.send(msg).await;
  189. let delta = rx.await.map_err(internal_error)??;
  190. Ok(delta)
  191. }
  192. pub fn rev_manager(&self) -> Arc<RevisionManager> {
  193. self.rev_manager.clone()
  194. }
  195. }
  196. struct BlockInfoBuilder();
  197. impl RevisionObjectBuilder for BlockInfoBuilder {
  198. type Output = BlockInfo;
  199. fn build_object(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
  200. let (base_rev_id, rev_id) = revisions.last().unwrap().pair_rev_id();
  201. let mut delta = make_delta_from_revisions(revisions)?;
  202. correct_delta(&mut delta);
  203. Result::<BlockInfo, FlowyError>::Ok(BlockInfo {
  204. block_id: object_id.to_owned(),
  205. text: delta.to_delta_json(),
  206. rev_id,
  207. base_rev_id,
  208. })
  209. }
  210. }
  211. // quill-editor requires the delta should end with '\n' and only contains the
  212. // insert operation. The function, correct_delta maybe be removed in the future.
  213. fn correct_delta(delta: &mut RichTextDelta) {
  214. if let Some(op) = delta.ops.last() {
  215. let op_data = op.get_data();
  216. if !op_data.ends_with('\n') {
  217. tracing::warn!("The document must end with newline. Correcting it by inserting newline op");
  218. delta.ops.push(Operation::Insert("\n".into()));
  219. }
  220. }
  221. if let Some(op) = delta.ops.iter().find(|op| !op.is_insert()) {
  222. tracing::warn!("The document can only contains insert operations, but found {:?}", op);
  223. delta.ops.retain(|op| op.is_insert());
  224. }
  225. }