editor.rs 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. use crate::queue::DocumentRevisionCompact;
  2. use crate::web_socket::{make_document_ws_manager, EditorCommandSender};
  3. use crate::{
  4. errors::FlowyError,
  5. queue::{EditorCommand, EditorCommandQueue},
  6. DocumentUser, DocumentWSReceiver,
  7. };
  8. use bytes::Bytes;
  9. use flowy_collaboration::{
  10. entities::{document_info::DocumentInfo, revision::Revision},
  11. errors::CollaborateResult,
  12. util::make_delta_from_revisions,
  13. };
  14. use flowy_error::{internal_error, FlowyResult};
  15. use flowy_sync::{
  16. RevisionCloudService, RevisionManager, RevisionObjectBuilder, RevisionWebSocket, RevisionWebSocketManager,
  17. };
  18. use lib_ot::{
  19. core::{Interval, Operation},
  20. rich_text::{RichTextAttribute, RichTextDelta},
  21. };
  22. use std::sync::Arc;
  23. use tokio::sync::{mpsc, oneshot};
  24. pub struct ClientDocumentEditor {
  25. pub doc_id: String,
  26. #[allow(dead_code)]
  27. rev_manager: Arc<RevisionManager>,
  28. ws_manager: Arc<RevisionWebSocketManager>,
  29. edit_cmd_tx: EditorCommandSender,
  30. }
  31. impl ClientDocumentEditor {
  32. pub(crate) async fn new(
  33. doc_id: &str,
  34. user: Arc<dyn DocumentUser>,
  35. mut rev_manager: RevisionManager,
  36. rev_web_socket: Arc<dyn RevisionWebSocket>,
  37. cloud_service: Arc<dyn RevisionCloudService>,
  38. ) -> FlowyResult<Arc<Self>> {
  39. let document_info = rev_manager
  40. .load::<DocumentInfoBuilder, DocumentRevisionCompact>(cloud_service)
  41. .await?;
  42. let delta = document_info.delta()?;
  43. let rev_manager = Arc::new(rev_manager);
  44. let doc_id = doc_id.to_string();
  45. let user_id = user.user_id()?;
  46. let edit_cmd_tx = spawn_edit_queue(user, rev_manager.clone(), delta);
  47. let ws_manager = make_document_ws_manager(
  48. doc_id.clone(),
  49. user_id.clone(),
  50. edit_cmd_tx.clone(),
  51. rev_manager.clone(),
  52. rev_web_socket,
  53. )
  54. .await;
  55. let editor = Arc::new(Self {
  56. doc_id,
  57. rev_manager,
  58. ws_manager,
  59. edit_cmd_tx,
  60. });
  61. Ok(editor)
  62. }
  63. pub async fn insert<T: ToString>(&self, index: usize, data: T) -> Result<(), FlowyError> {
  64. let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
  65. let msg = EditorCommand::Insert {
  66. index,
  67. data: data.to_string(),
  68. ret,
  69. };
  70. let _ = self.edit_cmd_tx.send(msg).await;
  71. let _ = rx.await.map_err(internal_error)??;
  72. Ok(())
  73. }
  74. pub async fn delete(&self, interval: Interval) -> Result<(), FlowyError> {
  75. let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
  76. let msg = EditorCommand::Delete { interval, ret };
  77. let _ = self.edit_cmd_tx.send(msg).await;
  78. let _ = rx.await.map_err(internal_error)??;
  79. Ok(())
  80. }
  81. pub async fn format(&self, interval: Interval, attribute: RichTextAttribute) -> Result<(), FlowyError> {
  82. let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
  83. let msg = EditorCommand::Format {
  84. interval,
  85. attribute,
  86. ret,
  87. };
  88. let _ = self.edit_cmd_tx.send(msg).await;
  89. let _ = rx.await.map_err(internal_error)??;
  90. Ok(())
  91. }
  92. pub async fn replace<T: ToString>(&self, interval: Interval, data: T) -> Result<(), FlowyError> {
  93. let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
  94. let msg = EditorCommand::Replace {
  95. interval,
  96. data: data.to_string(),
  97. ret,
  98. };
  99. let _ = self.edit_cmd_tx.send(msg).await;
  100. let _ = rx.await.map_err(internal_error)??;
  101. Ok(())
  102. }
  103. pub async fn can_undo(&self) -> bool {
  104. let (ret, rx) = oneshot::channel::<bool>();
  105. let msg = EditorCommand::CanUndo { ret };
  106. let _ = self.edit_cmd_tx.send(msg).await;
  107. rx.await.unwrap_or(false)
  108. }
  109. pub async fn can_redo(&self) -> bool {
  110. let (ret, rx) = oneshot::channel::<bool>();
  111. let msg = EditorCommand::CanRedo { ret };
  112. let _ = self.edit_cmd_tx.send(msg).await;
  113. rx.await.unwrap_or(false)
  114. }
  115. pub async fn undo(&self) -> Result<(), FlowyError> {
  116. let (ret, rx) = oneshot::channel();
  117. let msg = EditorCommand::Undo { ret };
  118. let _ = self.edit_cmd_tx.send(msg).await;
  119. let _ = rx.await.map_err(internal_error)??;
  120. Ok(())
  121. }
  122. pub async fn redo(&self) -> Result<(), FlowyError> {
  123. let (ret, rx) = oneshot::channel();
  124. let msg = EditorCommand::Redo { ret };
  125. let _ = self.edit_cmd_tx.send(msg).await;
  126. let _ = rx.await.map_err(internal_error)??;
  127. Ok(())
  128. }
  129. pub async fn document_json(&self) -> FlowyResult<String> {
  130. let (ret, rx) = oneshot::channel::<CollaborateResult<String>>();
  131. let msg = EditorCommand::ReadDocumentAsJson { ret };
  132. let _ = self.edit_cmd_tx.send(msg).await;
  133. let json = rx.await.map_err(internal_error)??;
  134. Ok(json)
  135. }
  136. #[tracing::instrument(level = "trace", skip(self, data), err)]
  137. pub(crate) async fn compose_local_delta(&self, data: Bytes) -> Result<(), FlowyError> {
  138. let delta = RichTextDelta::from_bytes(&data)?;
  139. let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
  140. let msg = EditorCommand::ComposeLocalDelta {
  141. delta: delta.clone(),
  142. ret,
  143. };
  144. let _ = self.edit_cmd_tx.send(msg).await;
  145. let _ = rx.await.map_err(internal_error)??;
  146. Ok(())
  147. }
  148. pub fn stop(&self) {
  149. self.ws_manager.stop();
  150. }
  151. pub(crate) fn ws_handler(&self) -> Arc<dyn DocumentWSReceiver> {
  152. self.ws_manager.clone()
  153. }
  154. }
  155. impl std::ops::Drop for ClientDocumentEditor {
  156. fn drop(&mut self) {
  157. tracing::trace!("{} ClientDocumentEditor was dropped", self.doc_id)
  158. }
  159. }
  160. // The edit queue will exit after the EditorCommandSender was dropped.
  161. fn spawn_edit_queue(
  162. user: Arc<dyn DocumentUser>,
  163. rev_manager: Arc<RevisionManager>,
  164. delta: RichTextDelta,
  165. ) -> EditorCommandSender {
  166. let (sender, receiver) = mpsc::channel(1000);
  167. let actor = EditorCommandQueue::new(user, rev_manager, delta, receiver);
  168. tokio::spawn(actor.run());
  169. sender
  170. }
  171. #[cfg(feature = "flowy_unit_test")]
  172. impl ClientDocumentEditor {
  173. pub async fn doc_json(&self) -> FlowyResult<String> {
  174. let (ret, rx) = oneshot::channel::<CollaborateResult<String>>();
  175. let msg = EditorCommand::ReadDocumentAsJson { ret };
  176. let _ = self.edit_cmd_tx.send(msg).await;
  177. let s = rx.await.map_err(internal_error)??;
  178. Ok(s)
  179. }
  180. pub async fn doc_delta(&self) -> FlowyResult<RichTextDelta> {
  181. let (ret, rx) = oneshot::channel::<CollaborateResult<RichTextDelta>>();
  182. let msg = EditorCommand::ReadDocumentAsDelta { ret };
  183. let _ = self.edit_cmd_tx.send(msg).await;
  184. let delta = rx.await.map_err(internal_error)??;
  185. Ok(delta)
  186. }
  187. pub fn rev_manager(&self) -> Arc<RevisionManager> {
  188. self.rev_manager.clone()
  189. }
  190. }
  191. struct DocumentInfoBuilder();
  192. impl RevisionObjectBuilder for DocumentInfoBuilder {
  193. type Output = DocumentInfo;
  194. fn build_object(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
  195. let (base_rev_id, rev_id) = revisions.last().unwrap().pair_rev_id();
  196. let mut delta = make_delta_from_revisions(revisions)?;
  197. correct_delta(&mut delta);
  198. Result::<DocumentInfo, FlowyError>::Ok(DocumentInfo {
  199. doc_id: object_id.to_owned(),
  200. text: delta.to_json(),
  201. rev_id,
  202. base_rev_id,
  203. })
  204. }
  205. }
  206. // quill-editor requires the delta should end with '\n' and only contains the
  207. // insert operation. The function, correct_delta maybe be removed in the future.
  208. fn correct_delta(delta: &mut RichTextDelta) {
  209. if let Some(op) = delta.ops.last() {
  210. let op_data = op.get_data();
  211. if !op_data.ends_with('\n') {
  212. tracing::warn!("The document must end with newline. Correcting it by inserting newline op");
  213. delta.ops.push(Operation::Insert("\n".into()));
  214. }
  215. }
  216. if let Some(op) = delta.ops.iter().find(|op| !op.is_insert()) {
  217. tracing::warn!("The document can only contains insert operations, but found {:?}", op);
  218. delta.ops.retain(|op| op.is_insert());
  219. }
  220. }