editor.rs 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. use crate::web_socket::EditorCommandSender;
  2. use crate::{
  3. errors::FlowyError,
  4. queue::{EditDocumentQueue, EditorCommand},
  5. DocumentUser,
  6. };
  7. use bytes::Bytes;
  8. use flowy_error::{internal_error, FlowyResult};
  9. use flowy_revision::{
  10. RevisionCloudService, RevisionCompress, RevisionManager, RevisionObjectDeserializer, RevisionObjectSerializer,
  11. RevisionWebSocket,
  12. };
  13. use flowy_sync::entities::ws_data::ServerRevisionWSData;
  14. use flowy_sync::{
  15. entities::{document::DocumentPayloadPB, revision::Revision},
  16. errors::CollaborateResult,
  17. util::make_operations_from_revisions,
  18. };
  19. use lib_ot::core::{AttributeEntry, AttributeHashMap};
  20. use lib_ot::{
  21. core::{DeltaOperation, Interval},
  22. text_delta::TextOperations,
  23. };
  24. use lib_ws::WSConnectState;
  25. use std::sync::Arc;
  26. use tokio::sync::{mpsc, oneshot};
  27. pub struct DocumentEditor {
  28. pub doc_id: String,
  29. #[allow(dead_code)]
  30. rev_manager: Arc<RevisionManager>,
  31. #[cfg(feature = "sync")]
  32. ws_manager: Arc<flowy_revision::RevisionWebSocketManager>,
  33. edit_cmd_tx: EditorCommandSender,
  34. }
  35. impl DocumentEditor {
  36. #[allow(unused_variables)]
  37. pub(crate) async fn new(
  38. doc_id: &str,
  39. user: Arc<dyn DocumentUser>,
  40. mut rev_manager: RevisionManager,
  41. rev_web_socket: Arc<dyn RevisionWebSocket>,
  42. cloud_service: Arc<dyn RevisionCloudService>,
  43. ) -> FlowyResult<Arc<Self>> {
  44. let document_info = rev_manager.load::<DocumentRevisionSerde>(Some(cloud_service)).await?;
  45. let operations = TextOperations::from_bytes(&document_info.content)?;
  46. let rev_manager = Arc::new(rev_manager);
  47. let doc_id = doc_id.to_string();
  48. let user_id = user.user_id()?;
  49. let edit_cmd_tx = spawn_edit_queue(user, rev_manager.clone(), operations);
  50. #[cfg(feature = "sync")]
  51. let ws_manager = crate::web_socket::make_document_ws_manager(
  52. doc_id.clone(),
  53. user_id.clone(),
  54. edit_cmd_tx.clone(),
  55. rev_manager.clone(),
  56. rev_web_socket,
  57. )
  58. .await;
  59. let editor = Arc::new(Self {
  60. doc_id,
  61. rev_manager,
  62. #[cfg(feature = "sync")]
  63. ws_manager,
  64. edit_cmd_tx,
  65. });
  66. Ok(editor)
  67. }
  68. pub async fn insert<T: ToString>(&self, index: usize, data: T) -> Result<(), FlowyError> {
  69. let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
  70. let msg = EditorCommand::Insert {
  71. index,
  72. data: data.to_string(),
  73. ret,
  74. };
  75. let _ = self.edit_cmd_tx.send(msg).await;
  76. let _ = rx.await.map_err(internal_error)??;
  77. Ok(())
  78. }
  79. pub async fn delete(&self, interval: Interval) -> Result<(), FlowyError> {
  80. let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
  81. let msg = EditorCommand::Delete { interval, ret };
  82. let _ = self.edit_cmd_tx.send(msg).await;
  83. let _ = rx.await.map_err(internal_error)??;
  84. Ok(())
  85. }
  86. pub async fn format(&self, interval: Interval, attribute: AttributeEntry) -> Result<(), FlowyError> {
  87. let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
  88. let msg = EditorCommand::Format {
  89. interval,
  90. attribute,
  91. ret,
  92. };
  93. let _ = self.edit_cmd_tx.send(msg).await;
  94. let _ = rx.await.map_err(internal_error)??;
  95. Ok(())
  96. }
  97. pub async fn replace<T: ToString>(&self, interval: Interval, data: T) -> Result<(), FlowyError> {
  98. let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
  99. let msg = EditorCommand::Replace {
  100. interval,
  101. data: data.to_string(),
  102. ret,
  103. };
  104. let _ = self.edit_cmd_tx.send(msg).await;
  105. let _ = rx.await.map_err(internal_error)??;
  106. Ok(())
  107. }
  108. pub async fn can_undo(&self) -> bool {
  109. let (ret, rx) = oneshot::channel::<bool>();
  110. let msg = EditorCommand::CanUndo { ret };
  111. let _ = self.edit_cmd_tx.send(msg).await;
  112. rx.await.unwrap_or(false)
  113. }
  114. pub async fn can_redo(&self) -> bool {
  115. let (ret, rx) = oneshot::channel::<bool>();
  116. let msg = EditorCommand::CanRedo { ret };
  117. let _ = self.edit_cmd_tx.send(msg).await;
  118. rx.await.unwrap_or(false)
  119. }
  120. pub async fn undo(&self) -> Result<(), FlowyError> {
  121. let (ret, rx) = oneshot::channel();
  122. let msg = EditorCommand::Undo { ret };
  123. let _ = self.edit_cmd_tx.send(msg).await;
  124. let _ = rx.await.map_err(internal_error)??;
  125. Ok(())
  126. }
  127. pub async fn redo(&self) -> Result<(), FlowyError> {
  128. let (ret, rx) = oneshot::channel();
  129. let msg = EditorCommand::Redo { ret };
  130. let _ = self.edit_cmd_tx.send(msg).await;
  131. let _ = rx.await.map_err(internal_error)??;
  132. Ok(())
  133. }
  134. pub async fn get_operation_str(&self) -> FlowyResult<String> {
  135. let (ret, rx) = oneshot::channel::<CollaborateResult<String>>();
  136. let msg = EditorCommand::StringifyOperations { ret };
  137. let _ = self.edit_cmd_tx.send(msg).await;
  138. let json = rx.await.map_err(internal_error)??;
  139. Ok(json)
  140. }
  141. #[tracing::instrument(level = "trace", skip(self, data), err)]
  142. pub(crate) async fn compose_local_operations(&self, data: Bytes) -> Result<(), FlowyError> {
  143. let operations = TextOperations::from_bytes(&data)?;
  144. let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
  145. let msg = EditorCommand::ComposeLocalOperations { operations, ret };
  146. let _ = self.edit_cmd_tx.send(msg).await;
  147. let _ = rx.await.map_err(internal_error)??;
  148. Ok(())
  149. }
  150. #[cfg(feature = "sync")]
  151. pub fn stop(&self) {
  152. self.ws_manager.stop();
  153. }
  154. #[cfg(not(feature = "sync"))]
  155. pub fn stop(&self) {}
  156. #[cfg(feature = "sync")]
  157. pub(crate) async fn receive_ws_data(&self, data: ServerRevisionWSData) -> Result<(), FlowyError> {
  158. self.ws_manager.receive_ws_data(data).await
  159. }
  160. #[cfg(not(feature = "sync"))]
  161. pub(crate) async fn receive_ws_data(&self, _data: ServerRevisionWSData) -> Result<(), FlowyError> {
  162. Ok(())
  163. }
  164. #[cfg(feature = "sync")]
  165. pub(crate) fn receive_ws_state(&self, state: &WSConnectState) {
  166. self.ws_manager.connect_state_changed(state.clone());
  167. }
  168. #[cfg(not(feature = "sync"))]
  169. pub(crate) fn receive_ws_state(&self, _state: &WSConnectState) {}
  170. }
  171. impl std::ops::Drop for DocumentEditor {
  172. fn drop(&mut self) {
  173. tracing::trace!("{} DocumentEditor was dropped", self.doc_id)
  174. }
  175. }
  176. // The edit queue will exit after the EditorCommandSender was dropped.
  177. fn spawn_edit_queue(
  178. user: Arc<dyn DocumentUser>,
  179. rev_manager: Arc<RevisionManager>,
  180. delta: TextOperations,
  181. ) -> EditorCommandSender {
  182. let (sender, receiver) = mpsc::channel(1000);
  183. let edit_queue = EditDocumentQueue::new(user, rev_manager, delta, receiver);
  184. // We can use tokio::task::spawn_local here by using tokio::spawn_blocking.
  185. // https://github.com/tokio-rs/tokio/issues/2095
  186. // tokio::task::spawn_blocking(move || {
  187. // let rt = tokio::runtime::Handle::current();
  188. // rt.block_on(async {
  189. // let local = tokio::task::LocalSet::new();
  190. // local.run_until(edit_queue.run()).await;
  191. // });
  192. // });
  193. tokio::spawn(edit_queue.run());
  194. sender
  195. }
  196. #[cfg(feature = "flowy_unit_test")]
  197. impl DocumentEditor {
  198. pub async fn document_operations(&self) -> FlowyResult<TextOperations> {
  199. let (ret, rx) = oneshot::channel::<CollaborateResult<TextOperations>>();
  200. let msg = EditorCommand::ReadOperations { ret };
  201. let _ = self.edit_cmd_tx.send(msg).await;
  202. let delta = rx.await.map_err(internal_error)??;
  203. Ok(delta)
  204. }
  205. pub fn rev_manager(&self) -> Arc<RevisionManager> {
  206. self.rev_manager.clone()
  207. }
  208. }
  209. pub struct DocumentRevisionSerde();
  210. impl RevisionObjectDeserializer for DocumentRevisionSerde {
  211. type Output = DocumentPayloadPB;
  212. fn deserialize_revisions(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
  213. let (base_rev_id, rev_id) = revisions.last().unwrap().pair_rev_id();
  214. let mut delta = make_operations_from_revisions(revisions)?;
  215. correct_delta(&mut delta);
  216. Result::<DocumentPayloadPB, FlowyError>::Ok(DocumentPayloadPB {
  217. doc_id: object_id.to_owned(),
  218. content: delta.json_str(),
  219. rev_id,
  220. base_rev_id,
  221. })
  222. }
  223. }
  224. impl RevisionObjectSerializer for DocumentRevisionSerde {
  225. fn serialize_revisions(revisions: Vec<Revision>) -> FlowyResult<Bytes> {
  226. let operations = make_operations_from_revisions::<AttributeHashMap>(revisions)?;
  227. Ok(operations.json_bytes())
  228. }
  229. }
  230. pub(crate) struct DocumentRevisionCompactor();
  231. impl RevisionCompress for DocumentRevisionCompactor {
  232. fn serialize_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
  233. DocumentRevisionSerde::serialize_revisions(revisions)
  234. }
  235. }
  236. // quill-editor requires the delta should end with '\n' and only contains the
  237. // insert operation. The function, correct_delta maybe be removed in the future.
  238. fn correct_delta(delta: &mut TextOperations) {
  239. if let Some(op) = delta.ops.last() {
  240. let op_data = op.get_data();
  241. if !op_data.ends_with('\n') {
  242. tracing::warn!("The document must end with newline. Correcting it by inserting newline op");
  243. delta.ops.push(DeltaOperation::Insert("\n".into()));
  244. }
  245. }
  246. if let Some(op) = delta.ops.iter().find(|op| !op.is_insert()) {
  247. tracing::warn!("The document can only contains insert operations, but found {:?}", op);
  248. delta.ops.retain(|op| op.is_insert());
  249. }
  250. }