document.rs 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. use std::{
  2. ops::{Deref, DerefMut},
  3. sync::Arc,
  4. };
  5. use collab::core::collab::MutexCollab;
  6. use collab_document::{blocks::DocumentData, document::Document};
  7. use futures::StreamExt;
  8. use parking_lot::Mutex;
  9. use tokio_stream::wrappers::WatchStream;
  10. use flowy_error::FlowyResult;
  11. use crate::entities::{DocEventPB, DocumentSnapshotStatePB, DocumentSyncStatePB};
  12. use crate::notification::{send_notification, DocumentNotification};
  13. /// This struct wrap the document::Document
  14. #[derive(Clone)]
  15. pub struct MutexDocument(Arc<Mutex<Document>>);
  16. impl MutexDocument {
  17. /// Open a document with the given collab.
  18. /// # Arguments
  19. /// * `collab` - the identifier of the collaboration instance
  20. ///
  21. /// # Returns
  22. /// * `Result<Document, FlowyError>` - a Result containing either a new Document object or an Error if the document creation failed
  23. pub fn open(doc_id: &str, collab: Arc<MutexCollab>) -> FlowyResult<Self> {
  24. let document = Document::open(collab.clone()).map(|inner| Self(Arc::new(Mutex::new(inner))))?;
  25. subscribe_document_changed(doc_id, &document);
  26. subscribe_document_snapshot_state(&collab);
  27. subscribe_document_sync_state(&collab);
  28. Ok(document)
  29. }
  30. /// Creates and returns a new Document object with initial data.
  31. /// # Arguments
  32. /// * `collab` - the identifier of the collaboration instance
  33. /// * `data` - the initial data to include in the document
  34. ///
  35. /// # Returns
  36. /// * `Result<Document, FlowyError>` - a Result containing either a new Document object or an Error if the document creation failed
  37. pub fn create_with_data(collab: Arc<MutexCollab>, data: DocumentData) -> FlowyResult<Self> {
  38. let document =
  39. Document::create_with_data(collab, data).map(|inner| Self(Arc::new(Mutex::new(inner))))?;
  40. Ok(document)
  41. }
  42. }
  43. fn subscribe_document_changed(doc_id: &str, document: &MutexDocument) {
  44. let doc_id = doc_id.to_string();
  45. document
  46. .lock()
  47. .subscribe_block_changed(move |events, is_remote| {
  48. // send notification to the client.
  49. send_notification(&doc_id, DocumentNotification::DidReceiveUpdate)
  50. .payload::<DocEventPB>((events, is_remote).into())
  51. .send();
  52. });
  53. }
  54. fn subscribe_document_snapshot_state(collab: &Arc<MutexCollab>) {
  55. let document_id = collab.lock().object_id.clone();
  56. let mut snapshot_state = WatchStream::new(collab.lock().subscribe_snapshot_state());
  57. tokio::spawn(async move {
  58. while let Some(snapshot_state) = snapshot_state.next().await {
  59. if let Some(new_snapshot_id) = snapshot_state.snapshot_id() {
  60. tracing::debug!("Did create document remote snapshot: {}", new_snapshot_id);
  61. send_notification(
  62. &document_id,
  63. DocumentNotification::DidUpdateDocumentSnapshotState,
  64. )
  65. .payload(DocumentSnapshotStatePB { new_snapshot_id })
  66. .send();
  67. }
  68. }
  69. });
  70. }
  71. fn subscribe_document_sync_state(collab: &Arc<MutexCollab>) {
  72. let document_id = collab.lock().object_id.clone();
  73. let mut sync_state_stream = WatchStream::new(collab.lock().subscribe_sync_state());
  74. tokio::spawn(async move {
  75. while let Some(sync_state) = sync_state_stream.next().await {
  76. send_notification(
  77. &document_id,
  78. DocumentNotification::DidUpdateDocumentSyncState,
  79. )
  80. .payload(DocumentSyncStatePB::from(sync_state))
  81. .send();
  82. }
  83. });
  84. }
  85. unsafe impl Sync for MutexDocument {}
  86. unsafe impl Send for MutexDocument {}
  87. impl Deref for MutexDocument {
  88. type Target = Arc<Mutex<Document>>;
  89. fn deref(&self) -> &Self::Target {
  90. &self.0
  91. }
  92. }
  93. impl DerefMut for MutexDocument {
  94. fn deref_mut(&mut self) -> &mut Self::Target {
  95. &mut self.0
  96. }
  97. }