editor.rs 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. use crate::{
  2. errors::FlowyError,
  3. module::DocumentUser,
  4. services::doc::{
  5. web_socket::{initialize_document_web_socket, DocumentWebSocketContext, EditorWebSocket},
  6. *,
  7. },
  8. };
  9. use bytes::Bytes;
  10. use flowy_collaboration::{
  11. core::document::history::UndoResult,
  12. entities::{
  13. doc::DocDelta,
  14. revision::{RevId, RevType, Revision},
  15. },
  16. errors::CollaborateResult,
  17. };
  18. use flowy_database::ConnectionPool;
  19. use flowy_error::{internal_error, FlowyResult};
  20. use lib_ot::{
  21. core::Interval,
  22. rich_text::{RichTextAttribute, RichTextDelta},
  23. };
  24. use std::sync::Arc;
  25. use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
  26. pub struct ClientDocEditor {
  27. pub doc_id: String,
  28. rev_manager: Arc<RevisionManager>,
  29. editor_ws: Arc<dyn EditorWebSocket>,
  30. editor_cmd_sender: UnboundedSender<EditorCommand>,
  31. user: Arc<dyn DocumentUser>,
  32. }
  33. impl ClientDocEditor {
  34. pub(crate) async fn new(
  35. doc_id: &str,
  36. user: Arc<dyn DocumentUser>,
  37. pool: Arc<ConnectionPool>,
  38. mut rev_manager: RevisionManager,
  39. ws: Arc<dyn DocumentWebSocket>,
  40. server: Arc<dyn RevisionServer>,
  41. ) -> FlowyResult<Arc<Self>> {
  42. let delta = rev_manager.load_document(server).await?;
  43. let editor_cmd_sender = spawn_edit_queue(doc_id, delta, pool.clone());
  44. let doc_id = doc_id.to_string();
  45. let user_id = user.user_id()?;
  46. let rev_manager = Arc::new(rev_manager);
  47. let context = DocumentWebSocketContext {
  48. doc_id: doc_id.to_owned(),
  49. user_id: user_id.clone(),
  50. editor_cmd_sender: editor_cmd_sender.clone(),
  51. rev_manager: rev_manager.clone(),
  52. ws,
  53. };
  54. let editor_ws = initialize_document_web_socket(context).await;
  55. let editor = Arc::new(Self {
  56. doc_id,
  57. rev_manager,
  58. editor_ws,
  59. editor_cmd_sender,
  60. user,
  61. });
  62. Ok(editor)
  63. }
  64. pub async fn insert<T: ToString>(&self, index: usize, data: T) -> Result<(), FlowyError> {
  65. let (ret, rx) = oneshot::channel::<CollaborateResult<NewDelta>>();
  66. let msg = EditorCommand::Insert {
  67. index,
  68. data: data.to_string(),
  69. ret,
  70. };
  71. let _ = self.editor_cmd_sender.send(msg);
  72. let (delta, md5) = rx.await.map_err(internal_error)??;
  73. let _ = self.save_local_delta(delta, md5).await?;
  74. Ok(())
  75. }
  76. pub async fn delete(&self, interval: Interval) -> Result<(), FlowyError> {
  77. let (ret, rx) = oneshot::channel::<CollaborateResult<NewDelta>>();
  78. let msg = EditorCommand::Delete { interval, ret };
  79. let _ = self.editor_cmd_sender.send(msg);
  80. let (delta, md5) = rx.await.map_err(internal_error)??;
  81. let _ = self.save_local_delta(delta, md5).await?;
  82. Ok(())
  83. }
  84. pub async fn format(&self, interval: Interval, attribute: RichTextAttribute) -> Result<(), FlowyError> {
  85. let (ret, rx) = oneshot::channel::<CollaborateResult<NewDelta>>();
  86. let msg = EditorCommand::Format {
  87. interval,
  88. attribute,
  89. ret,
  90. };
  91. let _ = self.editor_cmd_sender.send(msg);
  92. let (delta, md5) = rx.await.map_err(internal_error)??;
  93. let _ = self.save_local_delta(delta, md5).await?;
  94. Ok(())
  95. }
  96. pub async fn replace<T: ToString>(&self, interval: Interval, data: T) -> Result<(), FlowyError> {
  97. let (ret, rx) = oneshot::channel::<CollaborateResult<NewDelta>>();
  98. let msg = EditorCommand::Replace {
  99. interval,
  100. data: data.to_string(),
  101. ret,
  102. };
  103. let _ = self.editor_cmd_sender.send(msg);
  104. let (delta, md5) = rx.await.map_err(internal_error)??;
  105. let _ = self.save_local_delta(delta, md5).await?;
  106. Ok(())
  107. }
  108. pub async fn can_undo(&self) -> bool {
  109. let (ret, rx) = oneshot::channel::<bool>();
  110. let msg = EditorCommand::CanUndo { ret };
  111. let _ = self.editor_cmd_sender.send(msg);
  112. rx.await.unwrap_or(false)
  113. }
  114. pub async fn can_redo(&self) -> bool {
  115. let (ret, rx) = oneshot::channel::<bool>();
  116. let msg = EditorCommand::CanRedo { ret };
  117. let _ = self.editor_cmd_sender.send(msg);
  118. rx.await.unwrap_or(false)
  119. }
  120. pub async fn undo(&self) -> Result<UndoResult, FlowyError> {
  121. let (ret, rx) = oneshot::channel::<CollaborateResult<UndoResult>>();
  122. let msg = EditorCommand::Undo { ret };
  123. let _ = self.editor_cmd_sender.send(msg);
  124. let r = rx.await.map_err(internal_error)??;
  125. Ok(r)
  126. }
  127. pub async fn redo(&self) -> Result<UndoResult, FlowyError> {
  128. let (ret, rx) = oneshot::channel::<CollaborateResult<UndoResult>>();
  129. let msg = EditorCommand::Redo { ret };
  130. let _ = self.editor_cmd_sender.send(msg);
  131. let r = rx.await.map_err(internal_error)??;
  132. Ok(r)
  133. }
  134. pub async fn delta(&self) -> FlowyResult<DocDelta> {
  135. let (ret, rx) = oneshot::channel::<CollaborateResult<DocumentMD5>>();
  136. let msg = EditorCommand::ReadDoc { ret };
  137. let _ = self.editor_cmd_sender.send(msg);
  138. let data = rx.await.map_err(internal_error)??;
  139. Ok(DocDelta {
  140. doc_id: self.doc_id.clone(),
  141. data,
  142. })
  143. }
  144. async fn save_local_delta(&self, delta: RichTextDelta, md5: String) -> Result<RevId, FlowyError> {
  145. let delta_data = delta.to_bytes();
  146. let (base_rev_id, rev_id) = self.rev_manager.next_rev_id();
  147. let user_id = self.user.user_id()?;
  148. let revision = Revision::new(
  149. &self.doc_id,
  150. base_rev_id,
  151. rev_id,
  152. delta_data,
  153. RevType::Local,
  154. &user_id,
  155. md5,
  156. );
  157. let _ = self.rev_manager.add_local_revision(&revision).await?;
  158. Ok(rev_id.into())
  159. }
  160. #[tracing::instrument(level = "debug", skip(self, data), err)]
  161. pub(crate) async fn composing_local_delta(&self, data: Bytes) -> Result<(), FlowyError> {
  162. let delta = RichTextDelta::from_bytes(&data)?;
  163. let (ret, rx) = oneshot::channel::<CollaborateResult<DocumentMD5>>();
  164. let msg = EditorCommand::ComposeDelta {
  165. delta: delta.clone(),
  166. ret,
  167. };
  168. let _ = self.editor_cmd_sender.send(msg);
  169. let md5 = rx.await.map_err(internal_error)??;
  170. let _ = self.save_local_delta(delta, md5).await?;
  171. Ok(())
  172. }
  173. #[tracing::instrument(level = "debug", skip(self))]
  174. pub fn stop(&self) { self.editor_ws.stop_web_socket(); }
  175. pub(crate) fn ws_handler(&self) -> Arc<dyn DocumentWsHandler> { self.editor_ws.ws_handler() }
  176. }
  177. fn spawn_edit_queue(doc_id: &str, delta: RichTextDelta, _pool: Arc<ConnectionPool>) -> UnboundedSender<EditorCommand> {
  178. let (sender, receiver) = mpsc::unbounded_channel::<EditorCommand>();
  179. let actor = EditorCommandQueue::new(doc_id, delta, receiver);
  180. tokio::spawn(actor.run());
  181. sender
  182. }
  183. #[cfg(feature = "flowy_unit_test")]
  184. impl ClientDocEditor {
  185. pub async fn doc_json(&self) -> FlowyResult<String> {
  186. let (ret, rx) = oneshot::channel::<CollaborateResult<DocumentMD5>>();
  187. let msg = EditorCommand::ReadDoc { ret };
  188. let _ = self.editor_cmd_sender.send(msg);
  189. let s = rx.await.map_err(internal_error)??;
  190. Ok(s)
  191. }
  192. pub async fn doc_delta(&self) -> FlowyResult<RichTextDelta> {
  193. let (ret, rx) = oneshot::channel::<CollaborateResult<RichTextDelta>>();
  194. let msg = EditorCommand::ReadDocDelta { ret };
  195. let _ = self.editor_cmd_sender.send(msg);
  196. let delta = rx.await.map_err(internal_error)??;
  197. Ok(delta)
  198. }
  199. pub fn rev_manager(&self) -> Arc<RevisionManager> { self.rev_manager.clone() }
  200. }