queue.rs 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. use crate::editor::document::Document;
  2. use crate::DocumentUser;
  3. use async_stream::stream;
  4. use bytes::Bytes;
  5. use flowy_error::FlowyError;
  6. use flowy_http_model::revision::{RevId, Revision};
  7. use flowy_revision::RevisionManager;
  8. use futures::stream::StreamExt;
  9. use lib_ot::core::Transaction;
  10. use flowy_database::ConnectionPool;
  11. use std::sync::Arc;
  12. use tokio::sync::mpsc::{Receiver, Sender};
  13. use tokio::sync::{oneshot, RwLock};
  14. pub struct DocumentQueue {
  15. #[allow(dead_code)]
  16. user: Arc<dyn DocumentUser>,
  17. document: Arc<RwLock<Document>>,
  18. #[allow(dead_code)]
  19. rev_manager: Arc<RevisionManager<Arc<ConnectionPool>>>,
  20. receiver: Option<CommandReceiver>,
  21. }
  22. impl DocumentQueue {
  23. pub fn new(
  24. user: Arc<dyn DocumentUser>,
  25. rev_manager: Arc<RevisionManager<Arc<ConnectionPool>>>,
  26. document: Document,
  27. receiver: CommandReceiver,
  28. ) -> Self {
  29. let document = Arc::new(RwLock::new(document));
  30. Self {
  31. user,
  32. document,
  33. rev_manager,
  34. receiver: Some(receiver),
  35. }
  36. }
  37. pub async fn run(mut self) {
  38. let mut receiver = self.receiver.take().expect("Only take once");
  39. let stream = stream! {
  40. loop {
  41. match receiver.recv().await {
  42. Some(msg) => yield msg,
  43. None => break,
  44. }
  45. }
  46. };
  47. stream
  48. .for_each(|command| async {
  49. match self.handle_command(command).await {
  50. Ok(_) => {}
  51. Err(e) => tracing::debug!("[DocumentQueue]: {}", e),
  52. }
  53. })
  54. .await;
  55. }
  56. async fn handle_command(&self, command: Command) -> Result<(), FlowyError> {
  57. match command {
  58. Command::ComposeTransaction { transaction, ret } => {
  59. self.document.write().await.apply_transaction(transaction.clone())?;
  60. let _ = self
  61. .save_local_operations(transaction, self.document.read().await.document_md5())
  62. .await?;
  63. let _ = ret.send(Ok(()));
  64. }
  65. Command::GetDocumentContent { pretty, ret } => {
  66. let content = self.document.read().await.get_content(pretty)?;
  67. let _ = ret.send(Ok(content));
  68. }
  69. }
  70. Ok(())
  71. }
  72. #[tracing::instrument(level = "trace", skip(self, transaction, md5), err)]
  73. async fn save_local_operations(&self, transaction: Transaction, md5: String) -> Result<RevId, FlowyError> {
  74. let bytes = Bytes::from(transaction.to_bytes()?);
  75. let (base_rev_id, rev_id) = self.rev_manager.next_rev_id_pair();
  76. let revision = Revision::new(&self.rev_manager.object_id, base_rev_id, rev_id, bytes, md5);
  77. let _ = self.rev_manager.add_local_revision(&revision).await?;
  78. Ok(rev_id.into())
  79. }
  80. }
  81. pub(crate) type CommandSender = Sender<Command>;
  82. pub(crate) type CommandReceiver = Receiver<Command>;
  83. pub(crate) type Ret<T> = oneshot::Sender<Result<T, FlowyError>>;
  84. pub enum Command {
  85. ComposeTransaction { transaction: Transaction, ret: Ret<()> },
  86. GetDocumentContent { pretty: bool, ret: Ret<String> },
  87. }