doc_controller.rs 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. use std::sync::Arc;
  2. use bytes::Bytes;
  3. use parking_lot::RwLock;
  4. use tokio::time::{interval, Duration};
  5. use flowy_database::{ConnectionPool, SqliteConnection};
  6. use flowy_infra::future::{wrap_future, FnFuture, ResultFuture};
  7. use crate::{
  8. entities::doc::{CreateDocParams, Doc, DocDelta, QueryDocParams},
  9. errors::{internal_error, DocError},
  10. module::DocumentUser,
  11. services::{
  12. cache::DocCache,
  13. doc::{
  14. edit::ClientEditDoc,
  15. revision::{DocRevision, RevisionServer},
  16. },
  17. server::Server,
  18. ws::WsDocumentManager,
  19. },
  20. sql_tables::doc::{DocTable, DocTableSql},
  21. };
  22. use flowy_ot::core::Delta;
  23. pub(crate) struct DocController {
  24. server: Server,
  25. doc_sql: Arc<DocTableSql>,
  26. ws_manager: Arc<WsDocumentManager>,
  27. cache: Arc<DocCache>,
  28. user: Arc<dyn DocumentUser>,
  29. }
  30. impl DocController {
  31. pub(crate) fn new(server: Server, user: Arc<dyn DocumentUser>, ws: Arc<WsDocumentManager>) -> Self {
  32. let doc_sql = Arc::new(DocTableSql {});
  33. let cache = Arc::new(DocCache::new());
  34. let controller = Self {
  35. server,
  36. doc_sql,
  37. user,
  38. ws_manager: ws,
  39. cache: cache.clone(),
  40. };
  41. controller
  42. }
  43. #[tracing::instrument(skip(self, conn), err)]
  44. pub(crate) fn create(&self, params: CreateDocParams, conn: &SqliteConnection) -> Result<(), DocError> {
  45. let doc = Doc {
  46. id: params.id,
  47. data: params.data,
  48. rev_id: 0,
  49. };
  50. let _ = self.doc_sql.create_doc_table(DocTable::new(doc), conn)?;
  51. Ok(())
  52. }
  53. #[tracing::instrument(level = "debug", skip(self, pool), err)]
  54. pub(crate) async fn open(
  55. &self,
  56. params: QueryDocParams,
  57. pool: Arc<ConnectionPool>,
  58. ) -> Result<Arc<ClientEditDoc>, DocError> {
  59. if self.cache.is_opened(&params.doc_id) == false {
  60. let edit_ctx = self.make_edit_context(&params.doc_id, pool.clone()).await?;
  61. return Ok(edit_ctx);
  62. }
  63. let edit_doc_ctx = self.cache.get(&params.doc_id)?;
  64. Ok(edit_doc_ctx)
  65. }
  66. pub(crate) fn close(&self, doc_id: &str) -> Result<(), DocError> {
  67. self.cache.remove(doc_id);
  68. self.ws_manager.remove_handler(doc_id);
  69. Ok(())
  70. }
  71. #[tracing::instrument(level = "debug", skip(self, conn), err)]
  72. pub(crate) fn delete(&self, params: QueryDocParams, conn: &SqliteConnection) -> Result<(), DocError> {
  73. let doc_id = &params.doc_id;
  74. let _ = self.doc_sql.delete_doc(doc_id, &*conn)?;
  75. self.cache.remove(doc_id);
  76. self.ws_manager.remove_handler(doc_id);
  77. let _ = self.delete_doc_on_server(params)?;
  78. Ok(())
  79. }
  80. #[tracing::instrument(level = "debug", skip(self, delta), err)]
  81. pub(crate) async fn edit_doc(&self, delta: DocDelta) -> Result<Doc, DocError> {
  82. let edit_doc_ctx = self.cache.get(&delta.doc_id)?;
  83. let _ = edit_doc_ctx.compose_local_delta(Bytes::from(delta.data)).await?;
  84. Ok(edit_doc_ctx.doc().await?)
  85. }
  86. }
  87. impl DocController {
  88. #[tracing::instrument(level = "debug", skip(self), err)]
  89. fn delete_doc_on_server(&self, params: QueryDocParams) -> Result<(), DocError> {
  90. let token = self.user.token()?;
  91. let server = self.server.clone();
  92. tokio::spawn(async move {
  93. match server.delete_doc(&token, params).await {
  94. Ok(_) => {},
  95. Err(e) => {
  96. // TODO: retry?
  97. log::error!("Delete doc failed: {:?}", e);
  98. },
  99. }
  100. });
  101. Ok(())
  102. }
  103. async fn make_edit_context(&self, doc_id: &str, pool: Arc<ConnectionPool>) -> Result<Arc<ClientEditDoc>, DocError> {
  104. // Opti: require upgradable_read lock and then upgrade to write lock using
  105. // RwLockUpgradableReadGuard::upgrade(xx) of ws
  106. // let doc = self.read_doc(doc_id, pool.clone()).await?;
  107. let ws = self.ws_manager.ws();
  108. let token = self.user.token()?;
  109. let user = self.user.clone();
  110. let server = Arc::new(RevisionServerImpl {
  111. token,
  112. server: self.server.clone(),
  113. });
  114. let edit_ctx = Arc::new(ClientEditDoc::new(doc_id, pool, ws, server, user).await?);
  115. self.ws_manager.register_handler(doc_id, edit_ctx.clone());
  116. self.cache.set(edit_ctx.clone());
  117. Ok(edit_ctx)
  118. }
  119. #[allow(dead_code)]
  120. #[tracing::instrument(level = "debug", skip(self, pool), err)]
  121. async fn read_doc(&self, doc_id: &str, pool: Arc<ConnectionPool>) -> Result<Doc, DocError> {
  122. match self.doc_sql.read_doc_table(doc_id, pool.clone()) {
  123. Ok(doc_table) => Ok(doc_table.into()),
  124. Err(error) => {
  125. if error.is_record_not_found() {
  126. let token = self.user.token()?;
  127. let params = QueryDocParams {
  128. doc_id: doc_id.to_string(),
  129. };
  130. match self.server.read_doc(&token, params).await? {
  131. None => Err(DocError::not_found()),
  132. Some(doc) => {
  133. let conn = &*pool.get().map_err(internal_error)?;
  134. let _ = self.doc_sql.create_doc_table(doc.clone().into(), conn)?;
  135. Ok(doc)
  136. },
  137. }
  138. } else {
  139. return Err(error);
  140. }
  141. },
  142. }
  143. }
  144. }
  145. struct RevisionServerImpl {
  146. token: String,
  147. server: Server,
  148. }
  149. impl RevisionServer for RevisionServerImpl {
  150. fn fetch_document_from_remote(&self, doc_id: &str) -> ResultFuture<DocRevision, DocError> {
  151. let params = QueryDocParams {
  152. doc_id: doc_id.to_string(),
  153. };
  154. let server = self.server.clone();
  155. let token = self.token.clone();
  156. ResultFuture::new(async move {
  157. match server.read_doc(&token, params).await? {
  158. None => Err(DocError::not_found()),
  159. Some(doc) => {
  160. let delta = Delta::from_bytes(doc.data)?;
  161. Ok(DocRevision {
  162. rev_id: doc.rev_id.into(),
  163. delta,
  164. })
  165. },
  166. }
  167. })
  168. }
  169. }
  170. #[allow(dead_code)]
  171. fn event_loop(_cache: Arc<DocCache>) -> FnFuture<()> {
  172. let mut i = interval(Duration::from_secs(3));
  173. wrap_future(async move {
  174. loop {
  175. // cache.all_docs().iter().for_each(|doc| doc.tick());
  176. i.tick().await;
  177. }
  178. })
  179. }