controller.rs 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. use crate::{
  2. context::DocumentUser,
  3. core::{
  4. edit::ClientDocumentEditor,
  5. revision::{DocumentRevisionCache, DocumentRevisionManager, RevisionServer},
  6. DocumentWSReceivers,
  7. DocumentWebSocket,
  8. WSStateReceiver,
  9. },
  10. errors::FlowyError,
  11. server::Server,
  12. };
  13. use bytes::Bytes;
  14. use dashmap::DashMap;
  15. use flowy_collaboration::entities::{
  16. doc::{DocumentDelta, DocumentId, DocumentInfo},
  17. revision::RepeatedRevision,
  18. };
  19. use flowy_database::ConnectionPool;
  20. use flowy_error::FlowyResult;
  21. use lib_infra::future::FutureResult;
  22. use std::sync::Arc;
  23. pub struct DocumentController {
  24. server: Server,
  25. ws_receivers: Arc<DocumentWSReceivers>,
  26. ws_sender: Arc<dyn DocumentWebSocket>,
  27. open_cache: Arc<OpenDocCache>,
  28. user: Arc<dyn DocumentUser>,
  29. }
  30. impl DocumentController {
  31. pub(crate) fn new(
  32. server: Server,
  33. user: Arc<dyn DocumentUser>,
  34. ws_receivers: Arc<DocumentWSReceivers>,
  35. ws_sender: Arc<dyn DocumentWebSocket>,
  36. ) -> Self {
  37. let open_cache = Arc::new(OpenDocCache::new());
  38. Self {
  39. server,
  40. ws_receivers,
  41. ws_sender,
  42. open_cache,
  43. user,
  44. }
  45. }
  46. pub(crate) fn init(&self) -> FlowyResult<()> {
  47. let notify = self.ws_sender.subscribe_state_changed();
  48. listen_ws_state_changed(notify, self.ws_receivers.clone());
  49. Ok(())
  50. }
  51. #[tracing::instrument(level = "debug", skip(self, doc_id), fields(doc_id), err)]
  52. pub async fn open_document<T: AsRef<str>>(&self, doc_id: T) -> Result<Arc<ClientDocumentEditor>, FlowyError> {
  53. let doc_id = doc_id.as_ref();
  54. tracing::Span::current().record("doc_id", &doc_id);
  55. self.get_editor(doc_id).await
  56. }
  57. #[tracing::instrument(level = "debug", skip(self, doc_id), fields(doc_id), err)]
  58. pub fn close_document<T: AsRef<str>>(&self, doc_id: T) -> Result<(), FlowyError> {
  59. let doc_id = doc_id.as_ref();
  60. tracing::Span::current().record("doc_id", &doc_id);
  61. self.open_cache.remove(doc_id);
  62. self.ws_receivers.remove(doc_id);
  63. Ok(())
  64. }
  65. #[tracing::instrument(level = "debug", skip(self, doc_id), fields(doc_id), err)]
  66. pub fn delete<T: AsRef<str>>(&self, doc_id: T) -> Result<(), FlowyError> {
  67. let doc_id = doc_id.as_ref();
  68. tracing::Span::current().record("doc_id", &doc_id);
  69. self.open_cache.remove(doc_id);
  70. self.ws_receivers.remove(doc_id);
  71. Ok(())
  72. }
  73. #[tracing::instrument(level = "debug", skip(self, delta), fields(doc_id = %delta.doc_id), err)]
  74. pub async fn receive_local_delta(&self, delta: DocumentDelta) -> Result<DocumentDelta, FlowyError> {
  75. let editor = self.get_editor(&delta.doc_id).await?;
  76. let _ = editor.compose_local_delta(Bytes::from(delta.delta_json)).await?;
  77. let document_json = editor.document_json().await?;
  78. Ok(DocumentDelta {
  79. doc_id: delta.doc_id.clone(),
  80. delta_json: document_json,
  81. })
  82. }
  83. pub async fn save_document<T: AsRef<str>>(&self, doc_id: T, revisions: RepeatedRevision) -> FlowyResult<()> {
  84. let doc_id = doc_id.as_ref().to_owned();
  85. let db_pool = self.user.db_pool()?;
  86. let rev_manager = self.make_rev_manager(&doc_id, db_pool)?;
  87. let _ = rev_manager.reset_document(revisions).await?;
  88. Ok(())
  89. }
  90. async fn get_editor(&self, doc_id: &str) -> FlowyResult<Arc<ClientDocumentEditor>> {
  91. match self.open_cache.get(doc_id) {
  92. None => {
  93. let db_pool = self.user.db_pool()?;
  94. self.make_editor(&doc_id, db_pool).await
  95. },
  96. Some(editor) => Ok(editor),
  97. }
  98. }
  99. }
  100. impl DocumentController {
  101. async fn make_editor(
  102. &self,
  103. doc_id: &str,
  104. pool: Arc<ConnectionPool>,
  105. ) -> Result<Arc<ClientDocumentEditor>, FlowyError> {
  106. let user = self.user.clone();
  107. let token = self.user.token()?;
  108. let rev_manager = self.make_rev_manager(doc_id, pool.clone())?;
  109. let server = Arc::new(RevisionServerImpl {
  110. token,
  111. server: self.server.clone(),
  112. });
  113. let doc_editor = ClientDocumentEditor::new(doc_id, user, rev_manager, self.ws_sender.clone(), server).await?;
  114. self.ws_receivers.add(doc_id, doc_editor.ws_handler());
  115. self.open_cache.insert(&doc_id, &doc_editor);
  116. Ok(doc_editor)
  117. }
  118. fn make_rev_manager(&self, doc_id: &str, pool: Arc<ConnectionPool>) -> Result<DocumentRevisionManager, FlowyError> {
  119. let user_id = self.user.user_id()?;
  120. let cache = Arc::new(DocumentRevisionCache::new(&user_id, doc_id, pool));
  121. Ok(DocumentRevisionManager::new(&user_id, doc_id, cache))
  122. }
  123. }
  124. struct RevisionServerImpl {
  125. token: String,
  126. server: Server,
  127. }
  128. impl RevisionServer for RevisionServerImpl {
  129. #[tracing::instrument(level = "debug", skip(self))]
  130. fn fetch_document(&self, doc_id: &str) -> FutureResult<DocumentInfo, FlowyError> {
  131. let params = DocumentId {
  132. doc_id: doc_id.to_string(),
  133. };
  134. let server = self.server.clone();
  135. let token = self.token.clone();
  136. FutureResult::new(async move {
  137. match server.read_doc(&token, params).await? {
  138. None => Err(FlowyError::record_not_found().context("Remote doesn't have this document")),
  139. Some(doc) => Ok(doc),
  140. }
  141. })
  142. }
  143. }
  144. pub struct OpenDocCache {
  145. inner: DashMap<String, Arc<ClientDocumentEditor>>,
  146. }
  147. impl OpenDocCache {
  148. fn new() -> Self { Self { inner: DashMap::new() } }
  149. pub(crate) fn insert(&self, doc_id: &str, doc: &Arc<ClientDocumentEditor>) {
  150. if self.inner.contains_key(doc_id) {
  151. log::warn!("Doc:{} already exists in cache", doc_id);
  152. }
  153. self.inner.insert(doc_id.to_string(), doc.clone());
  154. }
  155. pub(crate) fn contains(&self, doc_id: &str) -> bool { self.inner.get(doc_id).is_some() }
  156. pub(crate) fn get(&self, doc_id: &str) -> Option<Arc<ClientDocumentEditor>> {
  157. if !self.contains(&doc_id) {
  158. return None;
  159. }
  160. let opened_doc = self.inner.get(doc_id).unwrap();
  161. Some(opened_doc.clone())
  162. }
  163. pub(crate) fn remove(&self, id: &str) {
  164. let doc_id = id.to_string();
  165. if let Some(editor) = self.get(id) {
  166. editor.stop()
  167. }
  168. self.inner.remove(&doc_id);
  169. }
  170. }
  171. #[tracing::instrument(level = "debug", skip(state_receiver, receivers))]
  172. fn listen_ws_state_changed(mut state_receiver: WSStateReceiver, receivers: Arc<DocumentWSReceivers>) {
  173. tokio::spawn(async move {
  174. while let Ok(state) = state_receiver.recv().await {
  175. receivers.ws_connect_state_changed(&state);
  176. }
  177. });
  178. }