editor.rs 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. use crate::{
  2. context::DocumentUser,
  3. errors::FlowyError,
  4. services::doc::{
  5. web_socket::{make_document_ws_manager, DocumentWebSocketManager},
  6. *,
  7. },
  8. };
  9. use bytes::Bytes;
  10. use flowy_collaboration::{
  11. document::history::UndoResult,
  12. entities::revision::{RevId, RevType, Revision},
  13. errors::CollaborateResult,
  14. };
  15. use flowy_database::ConnectionPool;
  16. use flowy_error::{internal_error, FlowyResult};
  17. use lib_ot::{
  18. core::Interval,
  19. rich_text::{RichTextAttribute, RichTextDelta},
  20. };
  21. use std::sync::Arc;
  22. use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
  23. pub struct ClientDocumentEditor {
  24. pub doc_id: String,
  25. rev_manager: Arc<RevisionManager>,
  26. ws_manager: Arc<dyn DocumentWebSocketManager>,
  27. edit_queue: UnboundedSender<EditorCommand>,
  28. user: Arc<dyn DocumentUser>,
  29. }
  30. impl ClientDocumentEditor {
  31. pub(crate) async fn new(
  32. doc_id: &str,
  33. user: Arc<dyn DocumentUser>,
  34. pool: Arc<ConnectionPool>,
  35. mut rev_manager: RevisionManager,
  36. ws: Arc<dyn DocumentWebSocket>,
  37. server: Arc<dyn RevisionServer>,
  38. ) -> FlowyResult<Arc<Self>> {
  39. let delta = rev_manager.load_document(server).await?;
  40. let edit_queue = spawn_edit_queue(doc_id, delta, pool.clone());
  41. let doc_id = doc_id.to_string();
  42. let user_id = user.user_id()?;
  43. let rev_manager = Arc::new(rev_manager);
  44. let ws_manager = make_document_ws_manager(
  45. doc_id.clone(),
  46. user_id.clone(),
  47. edit_queue.clone(),
  48. rev_manager.clone(),
  49. ws,
  50. )
  51. .await;
  52. let editor = Arc::new(Self {
  53. doc_id,
  54. rev_manager,
  55. ws_manager,
  56. edit_queue,
  57. user,
  58. });
  59. Ok(editor)
  60. }
  61. pub async fn insert<T: ToString>(&self, index: usize, data: T) -> Result<(), FlowyError> {
  62. let (ret, rx) = oneshot::channel::<CollaborateResult<NewDelta>>();
  63. let msg = EditorCommand::Insert {
  64. index,
  65. data: data.to_string(),
  66. ret,
  67. };
  68. let _ = self.edit_queue.send(msg);
  69. let (delta, md5) = rx.await.map_err(internal_error)??;
  70. let _ = self.save_local_delta(delta, md5).await?;
  71. Ok(())
  72. }
  73. pub async fn delete(&self, interval: Interval) -> Result<(), FlowyError> {
  74. let (ret, rx) = oneshot::channel::<CollaborateResult<NewDelta>>();
  75. let msg = EditorCommand::Delete { interval, ret };
  76. let _ = self.edit_queue.send(msg);
  77. let (delta, md5) = rx.await.map_err(internal_error)??;
  78. let _ = self.save_local_delta(delta, md5).await?;
  79. Ok(())
  80. }
  81. pub async fn format(&self, interval: Interval, attribute: RichTextAttribute) -> Result<(), FlowyError> {
  82. let (ret, rx) = oneshot::channel::<CollaborateResult<NewDelta>>();
  83. let msg = EditorCommand::Format {
  84. interval,
  85. attribute,
  86. ret,
  87. };
  88. let _ = self.edit_queue.send(msg);
  89. let (delta, md5) = rx.await.map_err(internal_error)??;
  90. let _ = self.save_local_delta(delta, md5).await?;
  91. Ok(())
  92. }
  93. pub async fn replace<T: ToString>(&self, interval: Interval, data: T) -> Result<(), FlowyError> {
  94. let (ret, rx) = oneshot::channel::<CollaborateResult<NewDelta>>();
  95. let msg = EditorCommand::Replace {
  96. interval,
  97. data: data.to_string(),
  98. ret,
  99. };
  100. let _ = self.edit_queue.send(msg);
  101. let (delta, md5) = rx.await.map_err(internal_error)??;
  102. let _ = self.save_local_delta(delta, md5).await?;
  103. Ok(())
  104. }
  105. pub async fn can_undo(&self) -> bool {
  106. let (ret, rx) = oneshot::channel::<bool>();
  107. let msg = EditorCommand::CanUndo { ret };
  108. let _ = self.edit_queue.send(msg);
  109. rx.await.unwrap_or(false)
  110. }
  111. pub async fn can_redo(&self) -> bool {
  112. let (ret, rx) = oneshot::channel::<bool>();
  113. let msg = EditorCommand::CanRedo { ret };
  114. let _ = self.edit_queue.send(msg);
  115. rx.await.unwrap_or(false)
  116. }
  117. pub async fn undo(&self) -> Result<UndoResult, FlowyError> {
  118. let (ret, rx) = oneshot::channel::<CollaborateResult<UndoResult>>();
  119. let msg = EditorCommand::Undo { ret };
  120. let _ = self.edit_queue.send(msg);
  121. let r = rx.await.map_err(internal_error)??;
  122. Ok(r)
  123. }
  124. pub async fn redo(&self) -> Result<UndoResult, FlowyError> {
  125. let (ret, rx) = oneshot::channel::<CollaborateResult<UndoResult>>();
  126. let msg = EditorCommand::Redo { ret };
  127. let _ = self.edit_queue.send(msg);
  128. let r = rx.await.map_err(internal_error)??;
  129. Ok(r)
  130. }
  131. pub async fn document_json(&self) -> FlowyResult<String> {
  132. let (ret, rx) = oneshot::channel::<CollaborateResult<String>>();
  133. let msg = EditorCommand::ReadDoc { ret };
  134. let _ = self.edit_queue.send(msg);
  135. let json = rx.await.map_err(internal_error)??;
  136. Ok(json)
  137. }
  138. async fn save_local_delta(&self, delta: RichTextDelta, md5: String) -> Result<RevId, FlowyError> {
  139. let delta_data = delta.to_bytes();
  140. let (base_rev_id, rev_id) = self.rev_manager.next_rev_id();
  141. let user_id = self.user.user_id()?;
  142. let revision = Revision::new(
  143. &self.doc_id,
  144. base_rev_id,
  145. rev_id,
  146. delta_data,
  147. RevType::Local,
  148. &user_id,
  149. md5,
  150. );
  151. let _ = self.rev_manager.add_local_revision(&revision).await?;
  152. Ok(rev_id.into())
  153. }
  154. #[tracing::instrument(level = "debug", skip(self, data), err)]
  155. pub(crate) async fn compose_local_delta(&self, data: Bytes) -> Result<(), FlowyError> {
  156. let delta = RichTextDelta::from_bytes(&data)?;
  157. let (ret, rx) = oneshot::channel::<CollaborateResult<DocumentMD5>>();
  158. let msg = EditorCommand::ComposeDelta {
  159. delta: delta.clone(),
  160. ret,
  161. };
  162. let _ = self.edit_queue.send(msg);
  163. let md5 = rx.await.map_err(internal_error)??;
  164. let _ = self.save_local_delta(delta, md5).await?;
  165. Ok(())
  166. }
  167. #[tracing::instrument(level = "debug", skip(self))]
  168. pub fn stop(&self) { self.ws_manager.stop(); }
  169. pub(crate) fn ws_handler(&self) -> Arc<dyn DocumentWSReceiver> { self.ws_manager.receiver() }
  170. }
  171. fn spawn_edit_queue(doc_id: &str, delta: RichTextDelta, _pool: Arc<ConnectionPool>) -> UnboundedSender<EditorCommand> {
  172. let (sender, receiver) = mpsc::unbounded_channel::<EditorCommand>();
  173. let actor = EditorCommandQueue::new(doc_id, delta, receiver);
  174. tokio::spawn(actor.run());
  175. sender
  176. }
  177. #[cfg(feature = "flowy_unit_test")]
  178. impl ClientDocumentEditor {
  179. pub async fn doc_json(&self) -> FlowyResult<String> {
  180. let (ret, rx) = oneshot::channel::<CollaborateResult<DocumentMD5>>();
  181. let msg = EditorCommand::ReadDoc { ret };
  182. let _ = self.edit_queue.send(msg);
  183. let s = rx.await.map_err(internal_error)??;
  184. Ok(s)
  185. }
  186. pub async fn doc_delta(&self) -> FlowyResult<RichTextDelta> {
  187. let (ret, rx) = oneshot::channel::<CollaborateResult<RichTextDelta>>();
  188. let msg = EditorCommand::ReadDocDelta { ret };
  189. let _ = self.edit_queue.send(msg);
  190. let delta = rx.await.map_err(internal_error)??;
  191. Ok(delta)
  192. }
  193. pub fn rev_manager(&self) -> Arc<RevisionManager> { self.rev_manager.clone() }
  194. }