editor.rs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. #![allow(unused_attributes)]
  2. #![allow(unused_attributes)]
  3. use crate::old_editor::queue::{EditDocumentQueue, EditorCommand, EditorCommandSender};
  4. use crate::{errors::FlowyError, DocumentEditor, DocumentUser};
  5. use bytes::Bytes;
  6. use flowy_database::ConnectionPool;
  7. use flowy_error::{internal_error, FlowyResult};
  8. use flowy_revision::{
  9. RevisionCloudService, RevisionCompress, RevisionManager, RevisionObjectDeserializer, RevisionObjectSerializer,
  10. RevisionWebSocket,
  11. };
  12. use flowy_sync::entities::ws_data::ServerRevisionWSData;
  13. use flowy_sync::{
  14. entities::{document::DocumentPayloadPB, revision::Revision},
  15. errors::CollaborateResult,
  16. util::make_operations_from_revisions,
  17. };
  18. use lib_infra::future::FutureResult;
  19. use lib_ot::core::{AttributeEntry, AttributeHashMap};
  20. use lib_ot::{
  21. core::{DeltaOperation, Interval},
  22. text_delta::DeltaTextOperations,
  23. };
  24. use lib_ws::WSConnectState;
  25. use std::any::Any;
  26. use std::sync::Arc;
  27. use tokio::sync::{mpsc, oneshot};
  28. pub struct DeltaDocumentEditor {
  29. pub doc_id: String,
  30. #[allow(dead_code)]
  31. rev_manager: Arc<RevisionManager<Arc<ConnectionPool>>>,
  32. #[cfg(feature = "sync")]
  33. ws_manager: Arc<flowy_revision::RevisionWebSocketManager>,
  34. edit_cmd_tx: EditorCommandSender,
  35. }
  36. impl DeltaDocumentEditor {
  37. #[allow(unused_variables)]
  38. pub(crate) async fn new(
  39. doc_id: &str,
  40. user: Arc<dyn DocumentUser>,
  41. mut rev_manager: RevisionManager<Arc<ConnectionPool>>,
  42. rev_web_socket: Arc<dyn RevisionWebSocket>,
  43. cloud_service: Arc<dyn RevisionCloudService>,
  44. ) -> FlowyResult<Arc<Self>> {
  45. let document = rev_manager
  46. .load::<DeltaDocumentRevisionSerde>(Some(cloud_service))
  47. .await?;
  48. let operations = DeltaTextOperations::from_bytes(&document.content)?;
  49. let rev_manager = Arc::new(rev_manager);
  50. let doc_id = doc_id.to_string();
  51. let user_id = user.user_id()?;
  52. let edit_cmd_tx = spawn_edit_queue(user, rev_manager.clone(), operations);
  53. #[cfg(feature = "sync")]
  54. let ws_manager = crate::old_editor::web_socket::make_document_ws_manager(
  55. doc_id.clone(),
  56. user_id.clone(),
  57. edit_cmd_tx.clone(),
  58. rev_manager.clone(),
  59. rev_web_socket,
  60. )
  61. .await;
  62. let editor = Arc::new(Self {
  63. doc_id,
  64. rev_manager,
  65. #[cfg(feature = "sync")]
  66. ws_manager,
  67. edit_cmd_tx,
  68. });
  69. Ok(editor)
  70. }
  71. pub async fn insert<T: ToString>(&self, index: usize, data: T) -> Result<(), FlowyError> {
  72. let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
  73. let msg = EditorCommand::Insert {
  74. index,
  75. data: data.to_string(),
  76. ret,
  77. };
  78. let _ = self.edit_cmd_tx.send(msg).await;
  79. let _ = rx.await.map_err(internal_error)??;
  80. Ok(())
  81. }
  82. pub async fn delete(&self, interval: Interval) -> Result<(), FlowyError> {
  83. let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
  84. let msg = EditorCommand::Delete { interval, ret };
  85. let _ = self.edit_cmd_tx.send(msg).await;
  86. let _ = rx.await.map_err(internal_error)??;
  87. Ok(())
  88. }
  89. pub async fn format(&self, interval: Interval, attribute: AttributeEntry) -> Result<(), FlowyError> {
  90. let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
  91. let msg = EditorCommand::Format {
  92. interval,
  93. attribute,
  94. ret,
  95. };
  96. let _ = self.edit_cmd_tx.send(msg).await;
  97. let _ = rx.await.map_err(internal_error)??;
  98. Ok(())
  99. }
  100. pub async fn replace<T: ToString>(&self, interval: Interval, data: T) -> Result<(), FlowyError> {
  101. let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
  102. let msg = EditorCommand::Replace {
  103. interval,
  104. data: data.to_string(),
  105. ret,
  106. };
  107. let _ = self.edit_cmd_tx.send(msg).await;
  108. let _ = rx.await.map_err(internal_error)??;
  109. Ok(())
  110. }
  111. pub async fn can_undo(&self) -> bool {
  112. let (ret, rx) = oneshot::channel::<bool>();
  113. let msg = EditorCommand::CanUndo { ret };
  114. let _ = self.edit_cmd_tx.send(msg).await;
  115. rx.await.unwrap_or(false)
  116. }
  117. pub async fn can_redo(&self) -> bool {
  118. let (ret, rx) = oneshot::channel::<bool>();
  119. let msg = EditorCommand::CanRedo { ret };
  120. let _ = self.edit_cmd_tx.send(msg).await;
  121. rx.await.unwrap_or(false)
  122. }
  123. pub async fn undo(&self) -> Result<(), FlowyError> {
  124. let (ret, rx) = oneshot::channel();
  125. let msg = EditorCommand::Undo { ret };
  126. let _ = self.edit_cmd_tx.send(msg).await;
  127. let _ = rx.await.map_err(internal_error)??;
  128. Ok(())
  129. }
  130. pub async fn redo(&self) -> Result<(), FlowyError> {
  131. let (ret, rx) = oneshot::channel();
  132. let msg = EditorCommand::Redo { ret };
  133. let _ = self.edit_cmd_tx.send(msg).await;
  134. let _ = rx.await.map_err(internal_error)??;
  135. Ok(())
  136. }
  137. }
  138. impl DocumentEditor for Arc<DeltaDocumentEditor> {
  139. fn close(&self) {
  140. #[cfg(feature = "sync")]
  141. self.ws_manager.stop();
  142. }
  143. fn export(&self) -> FutureResult<String, FlowyError> {
  144. let (ret, rx) = oneshot::channel::<CollaborateResult<String>>();
  145. let msg = EditorCommand::GetOperationsString { ret };
  146. let edit_cmd_tx = self.edit_cmd_tx.clone();
  147. FutureResult::new(async move {
  148. let _ = edit_cmd_tx.send(msg).await;
  149. let json = rx.await.map_err(internal_error)??;
  150. Ok(json)
  151. })
  152. }
  153. fn duplicate(&self) -> FutureResult<String, FlowyError> {
  154. self.export()
  155. }
  156. #[allow(unused_variables)]
  157. fn receive_ws_data(&self, data: ServerRevisionWSData) -> FutureResult<(), FlowyError> {
  158. let cloned_self = self.clone();
  159. FutureResult::new(async move {
  160. #[cfg(feature = "sync")]
  161. let _ = cloned_self.ws_manager.receive_ws_data(data).await?;
  162. Ok(())
  163. })
  164. }
  165. #[allow(unused_variables)]
  166. fn receive_ws_state(&self, state: &WSConnectState) {
  167. #[cfg(feature = "sync")]
  168. self.ws_manager.connect_state_changed(state.clone());
  169. }
  170. fn compose_local_operations(&self, data: Bytes) -> FutureResult<(), FlowyError> {
  171. let edit_cmd_tx = self.edit_cmd_tx.clone();
  172. FutureResult::new(async move {
  173. let operations = DeltaTextOperations::from_bytes(&data)?;
  174. let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
  175. let msg = EditorCommand::ComposeLocalOperations { operations, ret };
  176. let _ = edit_cmd_tx.send(msg).await;
  177. let _ = rx.await.map_err(internal_error)??;
  178. Ok(())
  179. })
  180. }
  181. fn as_any(&self) -> &dyn Any {
  182. self
  183. }
  184. }
  185. impl std::ops::Drop for DeltaDocumentEditor {
  186. fn drop(&mut self) {
  187. tracing::trace!("{} DocumentEditor was dropped", self.doc_id)
  188. }
  189. }
  190. // The edit queue will exit after the EditorCommandSender was dropped.
  191. fn spawn_edit_queue(
  192. user: Arc<dyn DocumentUser>,
  193. rev_manager: Arc<RevisionManager<Arc<ConnectionPool>>>,
  194. delta: DeltaTextOperations,
  195. ) -> EditorCommandSender {
  196. let (sender, receiver) = mpsc::channel(1000);
  197. let edit_queue = EditDocumentQueue::new(user, rev_manager, delta, receiver);
  198. // We can use tokio::task::spawn_local here by using tokio::spawn_blocking.
  199. // https://github.com/tokio-rs/tokio/issues/2095
  200. // tokio::task::spawn_blocking(move || {
  201. // let rt = tokio::runtime::Handle::current();
  202. // rt.block_on(async {
  203. // let local = tokio::task::LocalSet::new();
  204. // local.run_until(edit_queue.run()).await;
  205. // });
  206. // });
  207. tokio::spawn(edit_queue.run());
  208. sender
  209. }
  210. #[cfg(feature = "flowy_unit_test")]
  211. impl DeltaDocumentEditor {
  212. pub async fn document_operations(&self) -> FlowyResult<DeltaTextOperations> {
  213. let (ret, rx) = oneshot::channel::<CollaborateResult<DeltaTextOperations>>();
  214. let msg = EditorCommand::GetOperations { ret };
  215. let _ = self.edit_cmd_tx.send(msg).await;
  216. let delta = rx.await.map_err(internal_error)??;
  217. Ok(delta)
  218. }
  219. pub fn rev_manager(&self) -> Arc<RevisionManager<Arc<ConnectionPool>>> {
  220. self.rev_manager.clone()
  221. }
  222. }
  223. pub struct DeltaDocumentRevisionSerde();
  224. impl RevisionObjectDeserializer for DeltaDocumentRevisionSerde {
  225. type Output = DocumentPayloadPB;
  226. fn deserialize_revisions(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
  227. let (base_rev_id, rev_id) = revisions.last().unwrap().pair_rev_id();
  228. let mut delta = make_operations_from_revisions(revisions)?;
  229. correct_delta(&mut delta);
  230. Result::<DocumentPayloadPB, FlowyError>::Ok(DocumentPayloadPB {
  231. doc_id: object_id.to_owned(),
  232. content: delta.json_str(),
  233. rev_id,
  234. base_rev_id,
  235. })
  236. }
  237. }
  238. impl RevisionObjectSerializer for DeltaDocumentRevisionSerde {
  239. fn combine_revisions(revisions: Vec<Revision>) -> FlowyResult<Bytes> {
  240. let operations = make_operations_from_revisions::<AttributeHashMap>(revisions)?;
  241. Ok(operations.json_bytes())
  242. }
  243. }
  244. pub(crate) struct DeltaDocumentRevisionCompress();
  245. impl RevisionCompress for DeltaDocumentRevisionCompress {
  246. fn combine_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
  247. DeltaDocumentRevisionSerde::combine_revisions(revisions)
  248. }
  249. }
  250. // quill-editor requires the delta should end with '\n' and only contains the
  251. // insert operation. The function, correct_delta maybe be removed in the future.
  252. fn correct_delta(delta: &mut DeltaTextOperations) {
  253. if let Some(op) = delta.ops.last() {
  254. let op_data = op.get_data();
  255. if !op_data.ends_with('\n') {
  256. tracing::warn!("The document must end with newline. Correcting it by inserting newline op");
  257. delta.ops.push(DeltaOperation::Insert("\n".into()));
  258. }
  259. }
  260. if let Some(op) = delta.ops.iter().find(|op| !op.is_insert()) {
  261. tracing::warn!("The document can only contains insert operations, but found {:?}", op);
  262. delta.ops.retain(|op| op.is_insert());
  263. }
  264. }