document.rs 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. use std::{
  2. ops::{Deref, DerefMut},
  3. sync::Arc,
  4. };
  5. use collab::core::collab::MutexCollab;
  6. use collab_document::{blocks::DocumentData, document::Document};
  7. use futures::StreamExt;
  8. use parking_lot::Mutex;
  9. use tokio_stream::wrappers::WatchStream;
  10. use flowy_error::FlowyResult;
  11. use crate::entities::{DocEventPB, DocumentSnapshotStatePB, DocumentSyncStatePB};
  12. use crate::notification::{send_notification, DocumentNotification};
  13. /// This struct wrap the document::Document
  14. #[derive(Clone)]
  15. pub struct MutexDocument(Arc<Mutex<Document>>);
  16. impl MutexDocument {
  17. /// Open a document with the given collab.
  18. /// # Arguments
  19. /// * `collab` - the identifier of the collaboration instance
  20. ///
  21. /// # Returns
  22. /// * `Result<Document, FlowyError>` - a Result containing either a new Document object or an Error if the document creation failed
  23. pub fn open(doc_id: &str, collab: Arc<MutexCollab>) -> FlowyResult<Self> {
  24. let document = Document::open(collab.clone()).map(|inner| Self(Arc::new(Mutex::new(inner))))?;
  25. subscribe_document_changed(doc_id, &document);
  26. subscribe_document_snapshot_state(&collab);
  27. subscribe_document_sync_state(&collab);
  28. Ok(document)
  29. }
  30. /// Creates and returns a new Document object with initial data.
  31. /// # Arguments
  32. /// * `collab` - the identifier of the collaboration instance
  33. /// * `data` - the initial data to include in the document
  34. ///
  35. /// # Returns
  36. /// * `Result<Document, FlowyError>` - a Result containing either a new Document object or an Error if the document creation failed
  37. pub fn create_with_data(collab: Arc<MutexCollab>, data: DocumentData) -> FlowyResult<Self> {
  38. let document =
  39. Document::create_with_data(collab, data).map(|inner| Self(Arc::new(Mutex::new(inner))))?;
  40. Ok(document)
  41. }
  42. }
  43. fn subscribe_document_changed(doc_id: &str, document: &MutexDocument) {
  44. let doc_id = doc_id.to_string();
  45. document
  46. .lock()
  47. .subscribe_block_changed(move |events, is_remote| {
  48. tracing::trace!(
  49. "document changed: {:?}, from remote: {}",
  50. &events,
  51. is_remote
  52. );
  53. // send notification to the client.
  54. send_notification(&doc_id, DocumentNotification::DidReceiveUpdate)
  55. .payload::<DocEventPB>((events, is_remote).into())
  56. .send();
  57. });
  58. }
  59. fn subscribe_document_snapshot_state(collab: &Arc<MutexCollab>) {
  60. let document_id = collab.lock().object_id.clone();
  61. let mut snapshot_state = WatchStream::new(collab.lock().subscribe_snapshot_state());
  62. tokio::spawn(async move {
  63. while let Some(snapshot_state) = snapshot_state.next().await {
  64. if let Some(new_snapshot_id) = snapshot_state.snapshot_id() {
  65. tracing::debug!("Did create document snapshot: {}", new_snapshot_id);
  66. send_notification(
  67. &document_id,
  68. DocumentNotification::DidUpdateDocumentSnapshotState,
  69. )
  70. .payload(DocumentSnapshotStatePB { new_snapshot_id })
  71. .send();
  72. }
  73. }
  74. });
  75. }
  76. fn subscribe_document_sync_state(collab: &Arc<MutexCollab>) {
  77. let document_id = collab.lock().object_id.clone();
  78. let mut sync_state_stream = WatchStream::new(collab.lock().subscribe_sync_state());
  79. tokio::spawn(async move {
  80. while let Some(sync_state) = sync_state_stream.next().await {
  81. send_notification(
  82. &document_id,
  83. DocumentNotification::DidUpdateDocumentSyncState,
  84. )
  85. .payload(DocumentSyncStatePB::from(sync_state))
  86. .send();
  87. }
  88. });
  89. }
  90. unsafe impl Sync for MutexDocument {}
  91. unsafe impl Send for MutexDocument {}
  92. impl Deref for MutexDocument {
  93. type Target = Arc<Mutex<Document>>;
  94. fn deref(&self) -> &Self::Target {
  95. &self.0
  96. }
  97. }
  98. impl DerefMut for MutexDocument {
  99. fn deref_mut(&mut self) -> &mut Self::Target {
  100. &mut self.0
  101. }
  102. }