doc_controller.rs 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. use crate::{
  2. errors::{DocError, DocResult},
  3. module::DocumentUser,
  4. services::{
  5. cache::DocCache,
  6. doc::{
  7. edit::{ClientDocEditor, EditDocWsHandler},
  8. revision::RevisionServer,
  9. },
  10. server::Server,
  11. ws::WsDocumentManager,
  12. },
  13. };
  14. use bytes::Bytes;
  15. use flowy_database::ConnectionPool;
  16. use flowy_document_infra::entities::doc::{Doc, DocDelta, DocIdentifier};
  17. use lib_infra::future::{wrap_future, FnFuture, ResultFuture};
  18. use std::sync::Arc;
  19. use tokio::time::{interval, Duration};
  20. pub(crate) struct DocController {
  21. server: Server,
  22. ws_manager: Arc<WsDocumentManager>,
  23. cache: Arc<DocCache>,
  24. user: Arc<dyn DocumentUser>,
  25. }
  26. impl DocController {
  27. pub(crate) fn new(server: Server, user: Arc<dyn DocumentUser>, ws: Arc<WsDocumentManager>) -> Self {
  28. let cache = Arc::new(DocCache::new());
  29. Self {
  30. server,
  31. user,
  32. ws_manager: ws,
  33. cache,
  34. }
  35. }
  36. pub(crate) fn init(&self) -> DocResult<()> {
  37. self.ws_manager.init();
  38. Ok(())
  39. }
  40. pub(crate) async fn open(
  41. &self,
  42. params: DocIdentifier,
  43. pool: Arc<ConnectionPool>,
  44. ) -> Result<Arc<ClientDocEditor>, DocError> {
  45. if !self.cache.contains(&params.doc_id) {
  46. let edit_ctx = self.make_edit_context(&params.doc_id, pool.clone()).await?;
  47. return Ok(edit_ctx);
  48. }
  49. let edit_doc_ctx = self.cache.get(&params.doc_id)?;
  50. Ok(edit_doc_ctx)
  51. }
  52. pub(crate) fn close(&self, doc_id: &str) -> Result<(), DocError> {
  53. self.cache.remove(doc_id);
  54. self.ws_manager.remove_handler(doc_id);
  55. Ok(())
  56. }
  57. #[tracing::instrument(level = "debug", skip(self), err)]
  58. pub(crate) fn delete(&self, params: DocIdentifier) -> Result<(), DocError> {
  59. let doc_id = &params.doc_id;
  60. self.cache.remove(doc_id);
  61. self.ws_manager.remove_handler(doc_id);
  62. Ok(())
  63. }
  64. // the delta's data that contains attributes with null value will be considered
  65. // as None e.g.
  66. // json : {"retain":7,"attributes":{"bold":null}}
  67. // deserialize delta: [ {retain: 7, attributes: {Bold: AttributeValue(None)}} ]
  68. #[tracing::instrument(level = "debug", skip(self, delta, db_pool), fields(doc_id = %delta.doc_id), err)]
  69. pub(crate) async fn apply_local_delta(
  70. &self,
  71. delta: DocDelta,
  72. db_pool: Arc<ConnectionPool>,
  73. ) -> Result<DocDelta, DocError> {
  74. if !self.cache.contains(&delta.doc_id) {
  75. let doc_identifier: DocIdentifier = delta.doc_id.clone().into();
  76. let _ = self.open(doc_identifier, db_pool).await?;
  77. }
  78. let edit_doc_ctx = self.cache.get(&delta.doc_id)?;
  79. let _ = edit_doc_ctx.composing_local_delta(Bytes::from(delta.data)).await?;
  80. Ok(edit_doc_ctx.delta().await?)
  81. }
  82. }
  83. impl DocController {
  84. async fn make_edit_context(
  85. &self,
  86. doc_id: &str,
  87. pool: Arc<ConnectionPool>,
  88. ) -> Result<Arc<ClientDocEditor>, DocError> {
  89. // Opti: require upgradable_read lock and then upgrade to write lock using
  90. // RwLockUpgradableReadGuard::upgrade(xx) of ws
  91. // let doc = self.read_doc(doc_id, pool.clone()).await?;
  92. let ws = self.ws_manager.ws();
  93. let token = self.user.token()?;
  94. let user = self.user.clone();
  95. let server = Arc::new(RevisionServerImpl {
  96. token,
  97. server: self.server.clone(),
  98. });
  99. let edit_ctx = Arc::new(ClientDocEditor::new(doc_id, pool, ws, server, user).await?);
  100. let ws_handler = Arc::new(EditDocWsHandler(edit_ctx.clone()));
  101. self.ws_manager.register_handler(doc_id, ws_handler);
  102. self.cache.set(edit_ctx.clone());
  103. Ok(edit_ctx)
  104. }
  105. }
  106. struct RevisionServerImpl {
  107. token: String,
  108. server: Server,
  109. }
  110. impl RevisionServer for RevisionServerImpl {
  111. #[tracing::instrument(level = "debug", skip(self))]
  112. fn fetch_document_from_remote(&self, doc_id: &str) -> ResultFuture<Doc, DocError> {
  113. let params = DocIdentifier {
  114. doc_id: doc_id.to_string(),
  115. };
  116. let server = self.server.clone();
  117. let token = self.token.clone();
  118. ResultFuture::new(async move {
  119. match server.read_doc(&token, params).await? {
  120. None => Err(DocError::record_not_found().context("Remote doesn't have this document")),
  121. Some(doc) => Ok(doc),
  122. }
  123. })
  124. }
  125. }
  126. #[allow(dead_code)]
  127. fn event_loop(_cache: Arc<DocCache>) -> FnFuture<()> {
  128. let mut i = interval(Duration::from_secs(3));
  129. wrap_future(async move {
  130. loop {
  131. // cache.all_docs().iter().for_each(|doc| doc.tick());
  132. i.tick().await;
  133. }
  134. })
  135. }