editor.rs 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. use crate::editor::document::{Document, DocumentRevisionSerde};
  2. use crate::editor::document_serde::DocumentTransaction;
  3. use crate::editor::make_transaction_from_revisions;
  4. use crate::editor::queue::{Command, CommandSender, DocumentQueue};
  5. use crate::{DocumentEditor, DocumentUser};
  6. use bytes::Bytes;
  7. use flowy_error::{internal_error, FlowyError, FlowyResult};
  8. use flowy_revision::{RevisionCloudService, RevisionManager};
  9. use flowy_sync::entities::ws_data::ServerRevisionWSData;
  10. use lib_infra::future::FutureResult;
  11. use lib_ot::core::Transaction;
  12. use lib_ws::WSConnectState;
  13. use std::any::Any;
  14. use std::sync::Arc;
  15. use tokio::sync::{mpsc, oneshot};
  16. pub struct AppFlowyDocumentEditor {
  17. #[allow(dead_code)]
  18. doc_id: String,
  19. command_sender: CommandSender,
  20. rev_manager: Arc<RevisionManager>,
  21. }
  22. impl AppFlowyDocumentEditor {
  23. pub async fn new(
  24. doc_id: &str,
  25. user: Arc<dyn DocumentUser>,
  26. mut rev_manager: RevisionManager,
  27. cloud_service: Arc<dyn RevisionCloudService>,
  28. ) -> FlowyResult<Arc<Self>> {
  29. let document = rev_manager.load::<DocumentRevisionSerde>(Some(cloud_service)).await?;
  30. let rev_manager = Arc::new(rev_manager);
  31. let command_sender = spawn_edit_queue(user, rev_manager.clone(), document);
  32. let doc_id = doc_id.to_string();
  33. let editor = Arc::new(Self {
  34. doc_id,
  35. command_sender,
  36. rev_manager,
  37. });
  38. Ok(editor)
  39. }
  40. pub async fn apply_transaction(&self, transaction: Transaction) -> FlowyResult<()> {
  41. let (ret, rx) = oneshot::channel::<FlowyResult<()>>();
  42. let _ = self
  43. .command_sender
  44. .send(Command::ComposeTransaction { transaction, ret })
  45. .await;
  46. let _ = rx.await.map_err(internal_error)??;
  47. Ok(())
  48. }
  49. pub async fn get_content(&self, pretty: bool) -> FlowyResult<String> {
  50. let (ret, rx) = oneshot::channel::<FlowyResult<String>>();
  51. let _ = self
  52. .command_sender
  53. .send(Command::GetDocumentContent { pretty, ret })
  54. .await;
  55. let content = rx.await.map_err(internal_error)??;
  56. Ok(content)
  57. }
  58. pub async fn duplicate_document(&self) -> FlowyResult<String> {
  59. let revisions = self.rev_manager.load_revisions().await?;
  60. let transaction = make_transaction_from_revisions(&revisions)?;
  61. let json = transaction.to_json()?;
  62. Ok(json)
  63. }
  64. }
  65. fn spawn_edit_queue(
  66. user: Arc<dyn DocumentUser>,
  67. rev_manager: Arc<RevisionManager>,
  68. document: Document,
  69. ) -> CommandSender {
  70. let (sender, receiver) = mpsc::channel(1000);
  71. let queue = DocumentQueue::new(user, rev_manager, document, receiver);
  72. tokio::spawn(queue.run());
  73. sender
  74. }
  75. impl DocumentEditor for Arc<AppFlowyDocumentEditor> {
  76. fn close(&self) {}
  77. fn export(&self) -> FutureResult<String, FlowyError> {
  78. let this = self.clone();
  79. FutureResult::new(async move { this.get_content(false).await })
  80. }
  81. fn duplicate(&self) -> FutureResult<String, FlowyError> {
  82. let this = self.clone();
  83. FutureResult::new(async move { this.duplicate_document().await })
  84. }
  85. fn receive_ws_data(&self, _data: ServerRevisionWSData) -> FutureResult<(), FlowyError> {
  86. FutureResult::new(async move { Ok(()) })
  87. }
  88. fn receive_ws_state(&self, _state: &WSConnectState) {}
  89. fn compose_local_operations(&self, data: Bytes) -> FutureResult<(), FlowyError> {
  90. let this = self.clone();
  91. FutureResult::new(async move {
  92. let transaction = DocumentTransaction::from_bytes(data)?;
  93. let _ = this.apply_transaction(transaction.into()).await?;
  94. Ok(())
  95. })
  96. }
  97. fn as_any(&self) -> &dyn Any {
  98. self
  99. }
  100. }