editor.rs 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. use crate::core::DocumentRevisionCompact;
  2. use crate::{
  3. core::{make_document_ws_manager, EditorCommand, EditorCommandQueue, EditorCommandSender},
  4. errors::FlowyError,
  5. DocumentUser, DocumentWSReceiver,
  6. };
  7. use bytes::Bytes;
  8. use flowy_collaboration::{
  9. entities::{document_info::DocumentInfo, revision::Revision},
  10. errors::CollaborateResult,
  11. util::make_delta_from_revisions,
  12. };
  13. use flowy_error::{internal_error, FlowyResult};
  14. use flowy_sync::{
  15. RevisionCloudService, RevisionManager, RevisionObjectBuilder, RevisionWebSocket, RevisionWebSocketManager,
  16. };
  17. use lib_ot::{
  18. core::{Interval, Operation},
  19. rich_text::{RichTextAttribute, RichTextDelta},
  20. };
  21. use std::sync::Arc;
  22. use tokio::sync::{mpsc, oneshot};
  23. pub struct ClientDocumentEditor {
  24. pub doc_id: String,
  25. #[allow(dead_code)]
  26. rev_manager: Arc<RevisionManager>,
  27. ws_manager: Arc<RevisionWebSocketManager>,
  28. edit_cmd_tx: EditorCommandSender,
  29. }
  30. impl ClientDocumentEditor {
  31. pub(crate) async fn new(
  32. doc_id: &str,
  33. user: Arc<dyn DocumentUser>,
  34. mut rev_manager: RevisionManager,
  35. web_socket: Arc<dyn RevisionWebSocket>,
  36. server: Arc<dyn RevisionCloudService>,
  37. ) -> FlowyResult<Arc<Self>> {
  38. let document_info = rev_manager
  39. .load::<DocumentInfoBuilder, DocumentRevisionCompact>(server)
  40. .await?;
  41. let delta = document_info.delta()?;
  42. let rev_manager = Arc::new(rev_manager);
  43. let doc_id = doc_id.to_string();
  44. let user_id = user.user_id()?;
  45. let edit_cmd_tx = spawn_edit_queue(user, rev_manager.clone(), delta);
  46. let ws_manager = make_document_ws_manager(
  47. doc_id.clone(),
  48. user_id.clone(),
  49. edit_cmd_tx.clone(),
  50. rev_manager.clone(),
  51. web_socket,
  52. )
  53. .await;
  54. let editor = Arc::new(Self {
  55. doc_id,
  56. rev_manager,
  57. ws_manager,
  58. edit_cmd_tx,
  59. });
  60. Ok(editor)
  61. }
  62. pub async fn insert<T: ToString>(&self, index: usize, data: T) -> Result<(), FlowyError> {
  63. let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
  64. let msg = EditorCommand::Insert {
  65. index,
  66. data: data.to_string(),
  67. ret,
  68. };
  69. let _ = self.edit_cmd_tx.send(msg).await;
  70. let _ = rx.await.map_err(internal_error)??;
  71. Ok(())
  72. }
  73. pub async fn delete(&self, interval: Interval) -> Result<(), FlowyError> {
  74. let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
  75. let msg = EditorCommand::Delete { interval, ret };
  76. let _ = self.edit_cmd_tx.send(msg).await;
  77. let _ = rx.await.map_err(internal_error)??;
  78. Ok(())
  79. }
  80. pub async fn format(&self, interval: Interval, attribute: RichTextAttribute) -> Result<(), FlowyError> {
  81. let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
  82. let msg = EditorCommand::Format {
  83. interval,
  84. attribute,
  85. ret,
  86. };
  87. let _ = self.edit_cmd_tx.send(msg).await;
  88. let _ = rx.await.map_err(internal_error)??;
  89. Ok(())
  90. }
  91. pub async fn replace<T: ToString>(&self, interval: Interval, data: T) -> Result<(), FlowyError> {
  92. let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
  93. let msg = EditorCommand::Replace {
  94. interval,
  95. data: data.to_string(),
  96. ret,
  97. };
  98. let _ = self.edit_cmd_tx.send(msg).await;
  99. let _ = rx.await.map_err(internal_error)??;
  100. Ok(())
  101. }
  102. pub async fn can_undo(&self) -> bool {
  103. let (ret, rx) = oneshot::channel::<bool>();
  104. let msg = EditorCommand::CanUndo { ret };
  105. let _ = self.edit_cmd_tx.send(msg).await;
  106. rx.await.unwrap_or(false)
  107. }
  108. pub async fn can_redo(&self) -> bool {
  109. let (ret, rx) = oneshot::channel::<bool>();
  110. let msg = EditorCommand::CanRedo { ret };
  111. let _ = self.edit_cmd_tx.send(msg).await;
  112. rx.await.unwrap_or(false)
  113. }
  114. pub async fn undo(&self) -> Result<(), FlowyError> {
  115. let (ret, rx) = oneshot::channel();
  116. let msg = EditorCommand::Undo { ret };
  117. let _ = self.edit_cmd_tx.send(msg).await;
  118. let _ = rx.await.map_err(internal_error)??;
  119. Ok(())
  120. }
  121. pub async fn redo(&self) -> Result<(), FlowyError> {
  122. let (ret, rx) = oneshot::channel();
  123. let msg = EditorCommand::Redo { ret };
  124. let _ = self.edit_cmd_tx.send(msg).await;
  125. let _ = rx.await.map_err(internal_error)??;
  126. Ok(())
  127. }
  128. pub async fn document_json(&self) -> FlowyResult<String> {
  129. let (ret, rx) = oneshot::channel::<CollaborateResult<String>>();
  130. let msg = EditorCommand::ReadDocumentAsJson { ret };
  131. let _ = self.edit_cmd_tx.send(msg).await;
  132. let json = rx.await.map_err(internal_error)??;
  133. Ok(json)
  134. }
  135. #[tracing::instrument(level = "trace", skip(self, data), err)]
  136. pub(crate) async fn compose_local_delta(&self, data: Bytes) -> Result<(), FlowyError> {
  137. let delta = RichTextDelta::from_bytes(&data)?;
  138. let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
  139. let msg = EditorCommand::ComposeLocalDelta {
  140. delta: delta.clone(),
  141. ret,
  142. };
  143. let _ = self.edit_cmd_tx.send(msg).await;
  144. let _ = rx.await.map_err(internal_error)??;
  145. Ok(())
  146. }
  147. pub fn stop(&self) {
  148. self.ws_manager.stop();
  149. }
  150. pub(crate) fn ws_handler(&self) -> Arc<dyn DocumentWSReceiver> {
  151. self.ws_manager.clone()
  152. }
  153. }
  154. impl std::ops::Drop for ClientDocumentEditor {
  155. fn drop(&mut self) {
  156. tracing::trace!("{} ClientDocumentEditor was dropped", self.doc_id)
  157. }
  158. }
  159. // The edit queue will exit after the EditorCommandSender was dropped.
  160. fn spawn_edit_queue(
  161. user: Arc<dyn DocumentUser>,
  162. rev_manager: Arc<RevisionManager>,
  163. delta: RichTextDelta,
  164. ) -> EditorCommandSender {
  165. let (sender, receiver) = mpsc::channel(1000);
  166. let actor = EditorCommandQueue::new(user, rev_manager, delta, receiver);
  167. tokio::spawn(actor.run());
  168. sender
  169. }
  170. #[cfg(feature = "flowy_unit_test")]
  171. impl ClientDocumentEditor {
  172. pub async fn doc_json(&self) -> FlowyResult<String> {
  173. let (ret, rx) = oneshot::channel::<CollaborateResult<String>>();
  174. let msg = EditorCommand::ReadDocumentAsJson { ret };
  175. let _ = self.edit_cmd_tx.send(msg).await;
  176. let s = rx.await.map_err(internal_error)??;
  177. Ok(s)
  178. }
  179. pub async fn doc_delta(&self) -> FlowyResult<RichTextDelta> {
  180. let (ret, rx) = oneshot::channel::<CollaborateResult<RichTextDelta>>();
  181. let msg = EditorCommand::ReadDocumentAsDelta { ret };
  182. let _ = self.edit_cmd_tx.send(msg).await;
  183. let delta = rx.await.map_err(internal_error)??;
  184. Ok(delta)
  185. }
  186. pub fn rev_manager(&self) -> Arc<RevisionManager> {
  187. self.rev_manager.clone()
  188. }
  189. }
  190. struct DocumentInfoBuilder();
  191. impl RevisionObjectBuilder for DocumentInfoBuilder {
  192. type Output = DocumentInfo;
  193. fn build_with_revisions(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
  194. let (base_rev_id, rev_id) = revisions.last().unwrap().pair_rev_id();
  195. let mut delta = make_delta_from_revisions(revisions)?;
  196. correct_delta(&mut delta);
  197. Result::<DocumentInfo, FlowyError>::Ok(DocumentInfo {
  198. doc_id: object_id.to_owned(),
  199. text: delta.to_json(),
  200. rev_id,
  201. base_rev_id,
  202. })
  203. }
  204. }
  205. // quill-editor requires the delta should end with '\n' and only contains the
  206. // insert operation. The function, correct_delta maybe be removed in the future.
  207. fn correct_delta(delta: &mut RichTextDelta) {
  208. if let Some(op) = delta.ops.last() {
  209. let op_data = op.get_data();
  210. if !op_data.ends_with('\n') {
  211. tracing::warn!("The document must end with newline. Correcting it by inserting newline op");
  212. delta.ops.push(Operation::Insert("\n".into()));
  213. }
  214. }
  215. if let Some(op) = delta.ops.iter().find(|op| !op.is_insert()) {
  216. tracing::warn!("The document can only contains insert operations, but found {:?}", op);
  217. delta.ops.retain(|op| op.is_insert());
  218. }
  219. }