editor.rs 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. use crate::web_socket::EditorCommandSender;
  2. use crate::{
  3. errors::FlowyError,
  4. queue::{EditBlockQueue, EditorCommand},
  5. TextBlockUser,
  6. };
  7. use bytes::Bytes;
  8. use flowy_error::{internal_error, FlowyResult};
  9. use flowy_revision::{RevisionCloudService, RevisionManager, RevisionObjectBuilder, RevisionWebSocket};
  10. use flowy_sync::entities::ws_data::ServerRevisionWSData;
  11. use flowy_sync::{
  12. entities::{revision::Revision, text_block::DocumentPB},
  13. errors::CollaborateResult,
  14. util::make_delta_from_revisions,
  15. };
  16. use lib_ot::{
  17. core::{Interval, Operation},
  18. rich_text::{RichTextAttribute, RichTextDelta},
  19. };
  20. use lib_ws::WSConnectState;
  21. use std::sync::Arc;
  22. use tokio::sync::{mpsc, oneshot};
  23. pub struct TextBlockEditor {
  24. pub doc_id: String,
  25. #[allow(dead_code)]
  26. rev_manager: Arc<RevisionManager>,
  27. #[cfg(feature = "sync")]
  28. ws_manager: Arc<flowy_revision::RevisionWebSocketManager>,
  29. edit_cmd_tx: EditorCommandSender,
  30. }
  31. impl TextBlockEditor {
  32. #[allow(unused_variables)]
  33. pub(crate) async fn new(
  34. doc_id: &str,
  35. user: Arc<dyn TextBlockUser>,
  36. mut rev_manager: RevisionManager,
  37. rev_web_socket: Arc<dyn RevisionWebSocket>,
  38. cloud_service: Arc<dyn RevisionCloudService>,
  39. ) -> FlowyResult<Arc<Self>> {
  40. let document_info = rev_manager.load::<TextBlockInfoBuilder>(Some(cloud_service)).await?;
  41. let delta = document_info.delta()?;
  42. let rev_manager = Arc::new(rev_manager);
  43. let doc_id = doc_id.to_string();
  44. let user_id = user.user_id()?;
  45. let edit_cmd_tx = spawn_edit_queue(user, rev_manager.clone(), delta);
  46. #[cfg(feature = "sync")]
  47. let ws_manager = crate::web_socket::make_block_ws_manager(
  48. doc_id.clone(),
  49. user_id.clone(),
  50. edit_cmd_tx.clone(),
  51. rev_manager.clone(),
  52. rev_web_socket,
  53. )
  54. .await;
  55. let editor = Arc::new(Self {
  56. doc_id,
  57. rev_manager,
  58. #[cfg(feature = "sync")]
  59. ws_manager,
  60. edit_cmd_tx,
  61. });
  62. Ok(editor)
  63. }
  64. pub async fn insert<T: ToString>(&self, index: usize, data: T) -> Result<(), FlowyError> {
  65. let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
  66. let msg = EditorCommand::Insert {
  67. index,
  68. data: data.to_string(),
  69. ret,
  70. };
  71. let _ = self.edit_cmd_tx.send(msg).await;
  72. let _ = rx.await.map_err(internal_error)??;
  73. Ok(())
  74. }
  75. pub async fn delete(&self, interval: Interval) -> Result<(), FlowyError> {
  76. let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
  77. let msg = EditorCommand::Delete { interval, ret };
  78. let _ = self.edit_cmd_tx.send(msg).await;
  79. let _ = rx.await.map_err(internal_error)??;
  80. Ok(())
  81. }
  82. pub async fn format(&self, interval: Interval, attribute: RichTextAttribute) -> Result<(), FlowyError> {
  83. let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
  84. let msg = EditorCommand::Format {
  85. interval,
  86. attribute,
  87. ret,
  88. };
  89. let _ = self.edit_cmd_tx.send(msg).await;
  90. let _ = rx.await.map_err(internal_error)??;
  91. Ok(())
  92. }
  93. pub async fn replace<T: ToString>(&self, interval: Interval, data: T) -> Result<(), FlowyError> {
  94. let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
  95. let msg = EditorCommand::Replace {
  96. interval,
  97. data: data.to_string(),
  98. ret,
  99. };
  100. let _ = self.edit_cmd_tx.send(msg).await;
  101. let _ = rx.await.map_err(internal_error)??;
  102. Ok(())
  103. }
  104. pub async fn can_undo(&self) -> bool {
  105. let (ret, rx) = oneshot::channel::<bool>();
  106. let msg = EditorCommand::CanUndo { ret };
  107. let _ = self.edit_cmd_tx.send(msg).await;
  108. rx.await.unwrap_or(false)
  109. }
  110. pub async fn can_redo(&self) -> bool {
  111. let (ret, rx) = oneshot::channel::<bool>();
  112. let msg = EditorCommand::CanRedo { ret };
  113. let _ = self.edit_cmd_tx.send(msg).await;
  114. rx.await.unwrap_or(false)
  115. }
  116. pub async fn undo(&self) -> Result<(), FlowyError> {
  117. let (ret, rx) = oneshot::channel();
  118. let msg = EditorCommand::Undo { ret };
  119. let _ = self.edit_cmd_tx.send(msg).await;
  120. let _ = rx.await.map_err(internal_error)??;
  121. Ok(())
  122. }
  123. pub async fn redo(&self) -> Result<(), FlowyError> {
  124. let (ret, rx) = oneshot::channel();
  125. let msg = EditorCommand::Redo { ret };
  126. let _ = self.edit_cmd_tx.send(msg).await;
  127. let _ = rx.await.map_err(internal_error)??;
  128. Ok(())
  129. }
  130. pub async fn delta_str(&self) -> FlowyResult<String> {
  131. let (ret, rx) = oneshot::channel::<CollaborateResult<String>>();
  132. let msg = EditorCommand::ReadDeltaStr { ret };
  133. let _ = self.edit_cmd_tx.send(msg).await;
  134. let json = rx.await.map_err(internal_error)??;
  135. Ok(json)
  136. }
  137. #[tracing::instrument(level = "trace", skip(self, data), err)]
  138. pub(crate) async fn compose_local_delta(&self, data: Bytes) -> Result<(), FlowyError> {
  139. let delta = RichTextDelta::from_bytes(&data)?;
  140. let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
  141. let msg = EditorCommand::ComposeLocalDelta {
  142. delta: delta.clone(),
  143. ret,
  144. };
  145. let _ = self.edit_cmd_tx.send(msg).await;
  146. let _ = rx.await.map_err(internal_error)??;
  147. Ok(())
  148. }
  149. #[cfg(feature = "sync")]
  150. pub fn stop(&self) {
  151. self.ws_manager.stop();
  152. }
  153. #[cfg(not(feature = "sync"))]
  154. pub fn stop(&self) {}
  155. #[cfg(feature = "sync")]
  156. pub(crate) async fn receive_ws_data(&self, data: ServerRevisionWSData) -> Result<(), FlowyError> {
  157. self.ws_manager.receive_ws_data(data).await
  158. }
  159. #[cfg(not(feature = "sync"))]
  160. pub(crate) async fn receive_ws_data(&self, _data: ServerRevisionWSData) -> Result<(), FlowyError> {
  161. Ok(())
  162. }
  163. #[cfg(feature = "sync")]
  164. pub(crate) fn receive_ws_state(&self, state: &WSConnectState) {
  165. self.ws_manager.connect_state_changed(state.clone());
  166. }
  167. #[cfg(not(feature = "sync"))]
  168. pub(crate) fn receive_ws_state(&self, _state: &WSConnectState) {}
  169. }
  170. impl std::ops::Drop for TextBlockEditor {
  171. fn drop(&mut self) {
  172. tracing::trace!("{} ClientBlockEditor was dropped", self.doc_id)
  173. }
  174. }
  175. // The edit queue will exit after the EditorCommandSender was dropped.
  176. fn spawn_edit_queue(
  177. user: Arc<dyn TextBlockUser>,
  178. rev_manager: Arc<RevisionManager>,
  179. delta: RichTextDelta,
  180. ) -> EditorCommandSender {
  181. let (sender, receiver) = mpsc::channel(1000);
  182. let edit_queue = EditBlockQueue::new(user, rev_manager, delta, receiver);
  183. // We can use tokio::task::spawn_local here by using tokio::spawn_blocking.
  184. // https://github.com/tokio-rs/tokio/issues/2095
  185. // tokio::task::spawn_blocking(move || {
  186. // let rt = tokio::runtime::Handle::current();
  187. // rt.block_on(async {
  188. // let local = tokio::task::LocalSet::new();
  189. // local.run_until(edit_queue.run()).await;
  190. // });
  191. // });
  192. tokio::spawn(edit_queue.run());
  193. sender
  194. }
  195. #[cfg(feature = "flowy_unit_test")]
  196. impl TextBlockEditor {
  197. pub async fn text_block_delta(&self) -> FlowyResult<RichTextDelta> {
  198. let (ret, rx) = oneshot::channel::<CollaborateResult<RichTextDelta>>();
  199. let msg = EditorCommand::ReadDelta { ret };
  200. let _ = self.edit_cmd_tx.send(msg).await;
  201. let delta = rx.await.map_err(internal_error)??;
  202. Ok(delta)
  203. }
  204. pub fn rev_manager(&self) -> Arc<RevisionManager> {
  205. self.rev_manager.clone()
  206. }
  207. }
  208. struct TextBlockInfoBuilder();
  209. impl RevisionObjectBuilder for TextBlockInfoBuilder {
  210. type Output = DocumentPB;
  211. fn build_object(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
  212. let (base_rev_id, rev_id) = revisions.last().unwrap().pair_rev_id();
  213. let mut delta = make_delta_from_revisions(revisions)?;
  214. correct_delta(&mut delta);
  215. Result::<DocumentPB, FlowyError>::Ok(DocumentPB {
  216. block_id: object_id.to_owned(),
  217. text: delta.to_json_str(),
  218. rev_id,
  219. base_rev_id,
  220. })
  221. }
  222. }
  223. // quill-editor requires the delta should end with '\n' and only contains the
  224. // insert operation. The function, correct_delta maybe be removed in the future.
  225. fn correct_delta(delta: &mut RichTextDelta) {
  226. if let Some(op) = delta.ops.last() {
  227. let op_data = op.get_data();
  228. if !op_data.ends_with('\n') {
  229. tracing::warn!("The document must end with newline. Correcting it by inserting newline op");
  230. delta.ops.push(Operation::Insert("\n".into()));
  231. }
  232. }
  233. if let Some(op) = delta.ops.iter().find(|op| !op.is_insert()) {
  234. tracing::warn!("The document can only contains insert operations, but found {:?}", op);
  235. delta.ops.retain(|op| op.is_insert());
  236. }
  237. }