document.rs 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. use std::{
  2. ops::{Deref, DerefMut},
  3. sync::Arc,
  4. };
  5. use collab::core::collab::MutexCollab;
  6. use collab_document::{blocks::DocumentData, document::Document};
  7. use futures::StreamExt;
  8. use parking_lot::Mutex;
  9. use flowy_error::FlowyResult;
  10. use crate::entities::{DocEventPB, DocumentSnapshotStatePB, DocumentSyncStatePB};
  11. use crate::notification::{send_notification, DocumentNotification};
  12. /// This struct wrap the document::Document
  13. #[derive(Clone)]
  14. pub struct MutexDocument(Arc<Mutex<Document>>);
  15. impl MutexDocument {
  16. /// Open a document with the given collab.
  17. /// # Arguments
  18. /// * `collab` - the identifier of the collaboration instance
  19. ///
  20. /// # Returns
  21. /// * `Result<Document, FlowyError>` - a Result containing either a new Document object or an Error if the document creation failed
  22. pub fn open(doc_id: &str, collab: Arc<MutexCollab>) -> FlowyResult<Self> {
  23. let document = Document::open(collab.clone()).map(|inner| Self(Arc::new(Mutex::new(inner))))?;
  24. subscribe_document_changed(doc_id, &document);
  25. subscribe_document_snapshot_state(&collab);
  26. subscribe_document_sync_state(&collab);
  27. Ok(document)
  28. }
  29. /// Creates and returns a new Document object with initial data.
  30. /// # Arguments
  31. /// * `collab` - the identifier of the collaboration instance
  32. /// * `data` - the initial data to include in the document
  33. ///
  34. /// # Returns
  35. /// * `Result<Document, FlowyError>` - a Result containing either a new Document object or an Error if the document creation failed
  36. pub fn create_with_data(collab: Arc<MutexCollab>, data: DocumentData) -> FlowyResult<Self> {
  37. let document =
  38. Document::create_with_data(collab, data).map(|inner| Self(Arc::new(Mutex::new(inner))))?;
  39. Ok(document)
  40. }
  41. }
  42. fn subscribe_document_changed(doc_id: &str, document: &MutexDocument) {
  43. let doc_id = doc_id.to_string();
  44. document
  45. .lock()
  46. .subscribe_block_changed(move |events, is_remote| {
  47. // send notification to the client.
  48. send_notification(&doc_id, DocumentNotification::DidReceiveUpdate)
  49. .payload::<DocEventPB>((events, is_remote).into())
  50. .send();
  51. });
  52. }
  53. fn subscribe_document_snapshot_state(collab: &Arc<MutexCollab>) {
  54. let document_id = collab.lock().object_id.clone();
  55. let mut snapshot_state = collab.lock().subscribe_snapshot_state();
  56. tokio::spawn(async move {
  57. while let Some(snapshot_state) = snapshot_state.next().await {
  58. if let Some(new_snapshot_id) = snapshot_state.snapshot_id() {
  59. tracing::debug!("Did create document remote snapshot: {}", new_snapshot_id);
  60. send_notification(
  61. &document_id,
  62. DocumentNotification::DidUpdateDocumentSnapshotState,
  63. )
  64. .payload(DocumentSnapshotStatePB { new_snapshot_id })
  65. .send();
  66. }
  67. }
  68. });
  69. }
  70. fn subscribe_document_sync_state(collab: &Arc<MutexCollab>) {
  71. let document_id = collab.lock().object_id.clone();
  72. let mut sync_state_stream = collab.lock().subscribe_sync_state();
  73. tokio::spawn(async move {
  74. while let Some(sync_state) = sync_state_stream.next().await {
  75. send_notification(
  76. &document_id,
  77. DocumentNotification::DidUpdateDocumentSyncState,
  78. )
  79. .payload(DocumentSyncStatePB::from(sync_state))
  80. .send();
  81. }
  82. });
  83. }
  84. unsafe impl Sync for MutexDocument {}
  85. unsafe impl Send for MutexDocument {}
  86. impl Deref for MutexDocument {
  87. type Target = Arc<Mutex<Document>>;
  88. fn deref(&self) -> &Self::Target {
  89. &self.0
  90. }
  91. }
  92. impl DerefMut for MutexDocument {
  93. fn deref_mut(&mut self) -> &mut Self::Target {
  94. &mut self.0
  95. }
  96. }