doc_controller.rs 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. use crate::{
  2. entities::doc::{CreateDocParams, Doc, QueryDocParams, SaveDocParams},
  3. errors::DocError,
  4. module::DocumentUser,
  5. services::server::Server,
  6. sql_tables::doc::{DocTable, DocTableChangeset, DocTableSql},
  7. };
  8. use flowy_database::{ConnectionPool, SqliteConnection};
  9. use crate::errors::internal_error;
  10. use std::sync::Arc;
  11. use tokio::task::JoinHandle;
  12. pub(crate) struct DocController {
  13. server: Server,
  14. sql: Arc<DocTableSql>,
  15. user: Arc<dyn DocumentUser>,
  16. }
  17. impl DocController {
  18. pub(crate) fn new(server: Server, user: Arc<dyn DocumentUser>) -> Self {
  19. let sql = Arc::new(DocTableSql {});
  20. Self { sql, server, user }
  21. }
  22. #[tracing::instrument(skip(self, conn), err)]
  23. pub(crate) fn create(&self, params: CreateDocParams, conn: &SqliteConnection) -> Result<(), DocError> {
  24. let doc = Doc {
  25. id: params.id,
  26. data: params.data,
  27. };
  28. let _ = self.sql.create_doc_table(DocTable::new(doc), conn)?;
  29. Ok(())
  30. }
  31. #[tracing::instrument(level = "debug", skip(self, conn, params), err)]
  32. pub(crate) fn update(&self, params: SaveDocParams, conn: &SqliteConnection) -> Result<(), DocError> {
  33. let changeset = DocTableChangeset::new(params.clone());
  34. let _ = self.sql.update_doc_table(changeset, &*conn)?;
  35. let _ = self.update_doc_on_server(params)?;
  36. Ok(())
  37. }
  38. #[tracing::instrument(level = "debug", skip(self, pool), err)]
  39. pub(crate) async fn open(&self, params: QueryDocParams, pool: Arc<ConnectionPool>) -> Result<Doc, DocError> {
  40. match self._open(params.clone(), pool.clone()) {
  41. Ok(doc_table) => Ok(doc_table.into()),
  42. Err(error) => self.try_read_on_server(params, pool.clone(), error).await,
  43. }
  44. }
  45. #[tracing::instrument(level = "debug", skip(self, conn), err)]
  46. pub(crate) fn delete(&self, params: QueryDocParams, conn: &SqliteConnection) -> Result<(), DocError> {
  47. let _ = self.sql.delete_doc(&params.doc_id, &*conn)?;
  48. let _ = self.delete_doc_on_server(params)?;
  49. Ok(())
  50. }
  51. }
  52. impl DocController {
  53. #[tracing::instrument(level = "debug", skip(self, params), err)]
  54. fn update_doc_on_server(&self, params: SaveDocParams) -> Result<(), DocError> {
  55. let token = self.user.token()?;
  56. let server = self.server.clone();
  57. tokio::spawn(async move {
  58. match server.update_doc(&token, params).await {
  59. Ok(_) => {},
  60. Err(e) => {
  61. // TODO: retry?
  62. log::error!("Update doc failed: {}", e);
  63. },
  64. }
  65. });
  66. Ok(())
  67. }
  68. #[tracing::instrument(level = "debug", skip(self, pool), err)]
  69. fn read_doc_from_server(
  70. &self,
  71. params: QueryDocParams,
  72. pool: Arc<ConnectionPool>,
  73. ) -> Result<JoinHandle<Result<Doc, DocError>>, DocError> {
  74. let token = self.user.token()?;
  75. let server = self.server.clone();
  76. let sql = self.sql.clone();
  77. Ok(tokio::spawn(async move {
  78. match server.read_doc(&token, params).await? {
  79. None => Err(DocError::not_found()),
  80. Some(doc) => {
  81. let doc_table = DocTable::new(doc.clone());
  82. let _ = sql.create_doc_table(doc_table, &*(pool.get().map_err(internal_error)?))?;
  83. // TODO: notify
  84. Ok(doc)
  85. },
  86. }
  87. }))
  88. }
  89. #[tracing::instrument(level = "debug", skip(self), err)]
  90. async fn sync_read_doc_from_server(&self, params: QueryDocParams) -> Result<Doc, DocError> {
  91. let token = self.user.token()?;
  92. match self.server.read_doc(&token, params).await? {
  93. None => Err(DocError::not_found()),
  94. Some(doc) => Ok(doc),
  95. }
  96. }
  97. #[tracing::instrument(level = "debug", skip(self), err)]
  98. fn delete_doc_on_server(&self, params: QueryDocParams) -> Result<(), DocError> {
  99. let token = self.user.token()?;
  100. let server = self.server.clone();
  101. tokio::spawn(async move {
  102. match server.delete_doc(&token, params).await {
  103. Ok(_) => {},
  104. Err(e) => {
  105. // TODO: retry?
  106. log::error!("Delete doc failed: {:?}", e);
  107. },
  108. }
  109. });
  110. Ok(())
  111. }
  112. fn _open(&self, params: QueryDocParams, pool: Arc<ConnectionPool>) -> Result<Doc, DocError> {
  113. let doc_table = self.sql.read_doc_table(&params.doc_id, &*(pool.get().map_err(internal_error)?))?;
  114. let doc: Doc = doc_table.into();
  115. let _ = self.read_doc_from_server(params, pool.clone())?;
  116. Ok(doc)
  117. }
  118. async fn try_read_on_server(&self, params: QueryDocParams, pool: Arc<ConnectionPool>, error: DocError) -> Result<Doc, DocError> {
  119. if error.is_record_not_found() {
  120. log::debug!("Doc:{} don't exist, reading from server", params.doc_id);
  121. self.read_doc_from_server(params, pool)?.await.map_err(internal_error)?
  122. } else {
  123. Err(error)
  124. }
  125. }
  126. }