doc_controller.rs 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. use crate::{
  2. entities::doc::{CreateDocParams, Doc, QueryDocParams, SaveDocParams},
  3. errors::DocError,
  4. module::DocumentUser,
  5. services::server::Server,
  6. sql_tables::doc::{DocTable, DocTableChangeset, DocTableSql},
  7. };
  8. use flowy_database::{ConnectionPool, SqliteConnection};
  9. use crate::{errors::internal_error, services::open_doc::OpenedDocPersistence};
  10. use std::sync::Arc;
  11. use tokio::task::JoinHandle;
  12. pub(crate) struct DocController {
  13. server: Server,
  14. sql: Arc<DocTableSql>,
  15. user: Arc<dyn DocumentUser>,
  16. }
  17. impl DocController {
  18. pub(crate) fn new(server: Server, user: Arc<dyn DocumentUser>) -> Self {
  19. let sql = Arc::new(DocTableSql {});
  20. Self { sql, server, user }
  21. }
  22. #[tracing::instrument(skip(self, conn), err)]
  23. pub(crate) fn create(&self, params: CreateDocParams, conn: &SqliteConnection) -> Result<(), DocError> {
  24. let doc = Doc {
  25. id: params.id,
  26. data: params.data,
  27. };
  28. let _ = self.sql.create_doc_table(DocTable::new(doc), conn)?;
  29. Ok(())
  30. }
  31. #[tracing::instrument(level = "debug", skip(self, pool), err)]
  32. pub(crate) async fn open(&self, params: QueryDocParams, pool: Arc<ConnectionPool>) -> Result<Doc, DocError> {
  33. match self._open(params.clone(), pool.clone()) {
  34. Ok(doc_table) => Ok(doc_table.into()),
  35. Err(error) => self.try_read_on_server(params, pool.clone(), error).await,
  36. }
  37. }
  38. #[tracing::instrument(level = "debug", skip(self, conn), err)]
  39. pub(crate) fn delete(&self, params: QueryDocParams, conn: &SqliteConnection) -> Result<(), DocError> {
  40. let _ = self.sql.delete_doc(&params.doc_id, &*conn)?;
  41. let _ = self.delete_doc_on_server(params)?;
  42. Ok(())
  43. }
  44. }
  45. impl DocController {
  46. #[tracing::instrument(level = "debug", skip(self, params), err)]
  47. fn update_doc_on_server(&self, params: SaveDocParams) -> Result<(), DocError> {
  48. let token = self.user.token()?;
  49. let server = self.server.clone();
  50. tokio::spawn(async move {
  51. match server.update_doc(&token, params).await {
  52. Ok(_) => {},
  53. Err(e) => {
  54. // TODO: retry?
  55. log::error!("Update doc failed: {}", e);
  56. },
  57. }
  58. });
  59. Ok(())
  60. }
  61. #[tracing::instrument(level = "debug", skip(self, pool), err)]
  62. fn read_doc_from_server(
  63. &self,
  64. params: QueryDocParams,
  65. pool: Arc<ConnectionPool>,
  66. ) -> Result<JoinHandle<Result<Doc, DocError>>, DocError> {
  67. let token = self.user.token()?;
  68. let server = self.server.clone();
  69. let sql = self.sql.clone();
  70. Ok(tokio::spawn(async move {
  71. match server.read_doc(&token, params).await? {
  72. None => Err(DocError::not_found()),
  73. Some(doc) => {
  74. let doc_table = DocTable::new(doc.clone());
  75. let _ = sql.create_doc_table(doc_table, &*(pool.get().map_err(internal_error)?))?;
  76. // TODO: notify
  77. Ok(doc)
  78. },
  79. }
  80. }))
  81. }
  82. #[tracing::instrument(level = "debug", skip(self), err)]
  83. async fn sync_read_doc_from_server(&self, params: QueryDocParams) -> Result<Doc, DocError> {
  84. let token = self.user.token()?;
  85. match self.server.read_doc(&token, params).await? {
  86. None => Err(DocError::not_found()),
  87. Some(doc) => Ok(doc),
  88. }
  89. }
  90. #[tracing::instrument(level = "debug", skip(self), err)]
  91. fn delete_doc_on_server(&self, params: QueryDocParams) -> Result<(), DocError> {
  92. let token = self.user.token()?;
  93. let server = self.server.clone();
  94. tokio::spawn(async move {
  95. match server.delete_doc(&token, params).await {
  96. Ok(_) => {},
  97. Err(e) => {
  98. // TODO: retry?
  99. log::error!("Delete doc failed: {:?}", e);
  100. },
  101. }
  102. });
  103. Ok(())
  104. }
  105. fn _open(&self, params: QueryDocParams, pool: Arc<ConnectionPool>) -> Result<Doc, DocError> {
  106. let doc_table = self.sql.read_doc_table(&params.doc_id, &*(pool.get().map_err(internal_error)?))?;
  107. let doc: Doc = doc_table.into();
  108. let _ = self.read_doc_from_server(params, pool.clone())?;
  109. Ok(doc)
  110. }
  111. async fn try_read_on_server(&self, params: QueryDocParams, pool: Arc<ConnectionPool>, error: DocError) -> Result<Doc, DocError> {
  112. if error.is_record_not_found() {
  113. log::debug!("Doc:{} don't exist, reading from server", params.doc_id);
  114. self.read_doc_from_server(params, pool)?.await.map_err(internal_error)?
  115. } else {
  116. Err(error)
  117. }
  118. }
  119. }
  120. impl OpenedDocPersistence for DocController {
  121. fn save(&self, params: SaveDocParams, pool: Arc<ConnectionPool>) -> Result<(), DocError> {
  122. let changeset = DocTableChangeset::new(params.clone());
  123. let _ = self.sql.update_doc_table(changeset, &*(pool.get().map_err(internal_error)?))?;
  124. Ok(())
  125. }
  126. }