Explorar o código

refactor workspace sql with transaction

appflowy %!s(int64=3) %!d(string=hai) anos
pai
achega
9a724d6881

+ 0 - 1
backend/src/middleware/cors_middleware.rs

@@ -1,6 +1,5 @@
 use actix_cors::Cors;
 use actix_web::http;
-use flowy_net::config::HEADER_TOKEN;
 
 // https://javascript.info/fetch-crossorigin#cors-for-safe-requests
 // https://docs.rs/actix-cors/0.5.4/actix_cors/index.html

+ 3 - 2
rust-lib/flowy-database/src/lib.rs

@@ -15,9 +15,10 @@ pub use diesel_derives::*;
 extern crate diesel_migrations;
 
 pub use flowy_sqlite::{DBConnection, Database};
+pub type Error = diesel::result::Error;
 
 use diesel_migrations::*;
-use flowy_sqlite::{Error, PoolConfig};
+use flowy_sqlite::PoolConfig;
 use std::{fmt::Debug, io, path::Path};
 
 pub mod prelude {
@@ -41,7 +42,7 @@ pub fn init(storage_path: &str) -> Result<Database, io::Error> {
 
 fn as_io_error<E>(e: E) -> io::Error
 where
-    E: Into<Error> + Debug,
+    E: Into<flowy_sqlite::Error> + Debug,
 {
     let msg = format!("{:?}", e);
     io::Error::new(io::ErrorKind::NotConnected, msg)

+ 3 - 13
rust-lib/flowy-dispatch/src/errors/errors.rs

@@ -17,11 +17,7 @@ pub trait Error: fmt::Debug + DynClone + Send + Sync {
 dyn_clone::clone_trait_object!(Error);
 
 impl<T: Error + 'static> From<T> for DispatchError {
-    fn from(err: T) -> DispatchError {
-        DispatchError {
-            inner: Box::new(err),
-        }
-    }
+    fn from(err: T) -> DispatchError { DispatchError { inner: Box::new(err) } }
 }
 
 #[derive(Clone)]
@@ -48,9 +44,7 @@ impl std::error::Error for DispatchError {
 }
 
 impl From<SendError<EventRequest>> for DispatchError {
-    fn from(err: SendError<EventRequest>) -> Self {
-        InternalError::Other(format!("{}", err)).into()
-    }
+    fn from(err: SendError<EventRequest>) -> Self { InternalError::Other(format!("{}", err)).into() }
 }
 
 impl From<String> for DispatchError {
@@ -59,9 +53,7 @@ impl From<String> for DispatchError {
 
 #[cfg(feature = "use_protobuf")]
 impl From<protobuf::ProtobufError> for DispatchError {
-    fn from(e: protobuf::ProtobufError) -> Self {
-        InternalError::ProtobufError(format!("{:?}", e)).into()
-    }
+    fn from(e: protobuf::ProtobufError) -> Self { InternalError::ProtobufError(format!("{:?}", e)).into() }
 }
 
 impl FromBytes for DispatchError {
@@ -90,7 +82,6 @@ pub(crate) enum InternalError {
     UnexpectedNone(String),
     DeserializeFromBytes(String),
     JoinError(String),
-    Lock(String),
     ServiceNotFound(String),
     HandleNotFound(String),
     Other(String),
@@ -103,7 +94,6 @@ impl fmt::Display for InternalError {
             InternalError::UnexpectedNone(s) => fmt::Display::fmt(&s, f),
             InternalError::DeserializeFromBytes(s) => fmt::Display::fmt(&s, f),
             InternalError::JoinError(s) => fmt::Display::fmt(&s, f),
-            InternalError::Lock(s) => fmt::Display::fmt(&s, f),
             InternalError::ServiceNotFound(s) => fmt::Display::fmt(&s, f),
             InternalError::HandleNotFound(s) => fmt::Display::fmt(&s, f),
             InternalError::Other(s) => fmt::Display::fmt(&s, f),

+ 0 - 2
rust-lib/flowy-dispatch/src/response/response.rs

@@ -47,8 +47,6 @@ impl EventResponse {
             },
         }
     }
-
-    pub(crate) fn is_success(&self) -> bool { self.status_code == StatusCode::Ok }
 }
 
 impl std::fmt::Display for EventResponse {

+ 4 - 17
rust-lib/flowy-document/src/errors.rs

@@ -15,12 +15,7 @@ pub struct DocError {
 }
 
 impl DocError {
-    fn new(code: DocErrorCode, msg: &str) -> Self {
-        Self {
-            code,
-            msg: msg.to_owned(),
-        }
-    }
+    fn new(code: DocErrorCode, msg: &str) -> Self { Self { code, msg: msg.to_owned() } }
 }
 
 #[derive(Debug, Clone, ProtoBuf_Enum, Display, PartialEq, Eq)]
@@ -57,20 +52,12 @@ impl std::default::Default for DocErrorCode {
     fn default() -> Self { DocErrorCode::Unknown }
 }
 
-impl std::convert::From<flowy_database::result::Error> for DocError {
-    fn from(error: flowy_database::result::Error) -> Self {
-        ErrorBuilder::new(DocErrorCode::EditorDBInternalError)
-            .error(error)
-            .build()
-    }
+impl std::convert::From<flowy_database::Error> for DocError {
+    fn from(error: flowy_database::Error) -> Self { ErrorBuilder::new(DocErrorCode::EditorDBInternalError).error(error).build() }
 }
 
 impl std::convert::From<FileError> for DocError {
-    fn from(error: FileError) -> Self {
-        ErrorBuilder::new(DocErrorCode::DocOpenFileError)
-            .error(error)
-            .build()
-    }
+    fn from(error: FileError) -> Self { ErrorBuilder::new(DocErrorCode::DocOpenFileError).error(error).build() }
 }
 
 impl flowy_dispatch::Error for DocError {

+ 2 - 2
rust-lib/flowy-user/src/errors.rs

@@ -98,8 +98,8 @@ impl std::default::Default for ErrorCode {
     fn default() -> Self { ErrorCode::Unknown }
 }
 
-impl std::convert::From<flowy_database::result::Error> for UserError {
-    fn from(error: flowy_database::result::Error) -> Self { ErrorBuilder::new(ErrorCode::UserDatabaseInternalError).error(error).build() }
+impl std::convert::From<flowy_database::Error> for UserError {
+    fn from(error: flowy_database::Error) -> Self { ErrorBuilder::new(ErrorCode::UserDatabaseInternalError).error(error).build() }
 }
 
 impl std::convert::From<::r2d2::Error> for UserError {

+ 2 - 2
rust-lib/flowy-workspace/src/errors.rs

@@ -91,8 +91,8 @@ impl std::convert::From<flowy_net::errors::ServerError> for WorkspaceError {
     }
 }
 
-impl std::convert::From<flowy_database::result::Error> for WorkspaceError {
-    fn from(error: flowy_database::result::Error) -> Self { ErrorBuilder::new(ErrorCode::WorkspaceDatabaseError).error(error).build() }
+impl std::convert::From<flowy_database::Error> for WorkspaceError {
+    fn from(error: flowy_database::Error) -> Self { ErrorBuilder::new(ErrorCode::WorkspaceDatabaseError).error(error).build() }
 }
 
 impl flowy_dispatch::Error for WorkspaceError {

+ 10 - 13
rust-lib/flowy-workspace/src/handlers/app_handler.rs

@@ -1,17 +1,14 @@
 use crate::{
-    entities::{
-        app::{
-            App,
-            CreateAppParams,
-            CreateAppRequest,
-            DeleteAppParams,
-            DeleteAppRequest,
-            QueryAppParams,
-            QueryAppRequest,
-            UpdateAppParams,
-            UpdateAppRequest,
-        },
-        view::RepeatedView,
+    entities::app::{
+        App,
+        CreateAppParams,
+        CreateAppRequest,
+        DeleteAppParams,
+        DeleteAppRequest,
+        QueryAppParams,
+        QueryAppRequest,
+        UpdateAppParams,
+        UpdateAppRequest,
     },
     errors::WorkspaceError,
     services::{AppController, ViewController},

+ 0 - 1
rust-lib/flowy-workspace/src/handlers/view_handler.rs

@@ -6,7 +6,6 @@ use crate::{
         DeleteViewRequest,
         QueryViewParams,
         QueryViewRequest,
-        RepeatedView,
         UpdateViewParams,
         UpdateViewRequest,
         View,

+ 1 - 6
rust-lib/flowy-workspace/src/module.rs

@@ -28,12 +28,7 @@ pub fn create(user: Arc<dyn WorkspaceUser>, database: Arc<dyn WorkspaceDatabase>
     let server = construct_workspace_server();
     let view_controller = Arc::new(ViewController::new(user.clone(), database.clone(), server.clone()));
 
-    let app_controller = Arc::new(AppController::new(
-        user.clone(),
-        database.clone(),
-        view_controller.clone(),
-        server.clone(),
-    ));
+    let app_controller = Arc::new(AppController::new(user.clone(), database.clone(), server.clone()));
 
     let workspace_controller = Arc::new(WorkspaceController::new(
         user.clone(),

+ 24 - 25
rust-lib/flowy-workspace/src/services/app_controller.rs

@@ -3,33 +3,27 @@ use crate::{
     errors::*,
     module::{WorkspaceDatabase, WorkspaceUser},
     observable::*,
-    services::{helper::spawn, server::Server, ViewController},
+    services::{helper::spawn, server::Server},
     sql_tables::app::{AppTable, AppTableChangeset, AppTableSql},
 };
 
-use crate::entities::view::RepeatedView;
+use flowy_database::SqliteConnection;
 use std::sync::Arc;
 
 pub(crate) struct AppController {
     user: Arc<dyn WorkspaceUser>,
     sql: Arc<AppTableSql>,
-    #[allow(dead_code)]
-    view_controller: Arc<ViewController>,
+    database: Arc<dyn WorkspaceDatabase>,
     server: Server,
 }
 
 impl AppController {
-    pub(crate) fn new(
-        user: Arc<dyn WorkspaceUser>,
-        database: Arc<dyn WorkspaceDatabase>,
-        view_controller: Arc<ViewController>,
-        server: Server,
-    ) -> Self {
-        let sql = Arc::new(AppTableSql::new(database));
+    pub(crate) fn new(user: Arc<dyn WorkspaceUser>, database: Arc<dyn WorkspaceDatabase>, server: Server) -> Self {
+        let sql = Arc::new(AppTableSql {});
         Self {
             user,
             sql,
-            view_controller,
+            database,
             server,
         }
     }
@@ -38,10 +32,11 @@ impl AppController {
     pub(crate) async fn create_app(&self, params: CreateAppParams) -> Result<App, WorkspaceError> {
         let app = self.create_app_on_server(params).await?;
         let app_table = AppTable::new(app.clone());
-        let _ = self.sql.create_app(app_table)?;
+        let conn = self.database.db_connection()?;
+        let _ = self.sql.create_app(app_table, &*conn)?;
 
         // Opti: transaction
-        let apps = self.read_local_apps(&app.workspace_id)?;
+        let apps = self.read_local_apps(&app.workspace_id, &*conn)?;
         ObservableBuilder::new(&app.workspace_id, WorkspaceObservable::WorkspaceCreateApp)
             .payload(apps)
             .build();
@@ -49,36 +44,40 @@ impl AppController {
     }
 
     pub(crate) async fn read_app(&self, params: QueryAppParams) -> Result<App, WorkspaceError> {
-        let app_table = self.sql.read_app(&params.app_id, params.is_trash)?;
+        let app_table = self
+            .sql
+            .read_app(&params.app_id, params.is_trash, &*self.database.db_connection()?)?;
         let _ = self.read_app_on_server(params).await?;
         Ok(app_table.into())
     }
 
     pub(crate) async fn delete_app(&self, app_id: &str) -> Result<(), WorkspaceError> {
-        let app = self.sql.delete_app(app_id)?;
-        let _ = self.delete_app_on_server(app_id).await?;
+        let _ = self.delete_app_on_server(app_id);
 
+        let conn = self.database.db_connection()?;
+        let app = self.sql.delete_app(app_id, &*conn)?;
         // Opti: transaction
-        let apps = self.read_local_apps(&app.workspace_id)?;
+        let apps = self.read_local_apps(&app.workspace_id, &*conn)?;
         ObservableBuilder::new(&app.workspace_id, WorkspaceObservable::WorkspaceDeleteApp)
             .payload(apps)
             .build();
         Ok(())
     }
 
-    fn read_local_apps(&self, workspace_id: &str) -> Result<RepeatedApp, WorkspaceError> {
-        let app_tables = self.sql.read_apps(workspace_id, false)?;
+    fn read_local_apps(&self, workspace_id: &str, conn: &SqliteConnection) -> Result<RepeatedApp, WorkspaceError> {
+        let app_tables = self.sql.read_apps(workspace_id, false, conn)?;
         let apps = app_tables.into_iter().map(|table| table.into()).collect::<Vec<App>>();
         Ok(RepeatedApp { items: apps })
     }
 
     pub(crate) async fn update_app(&self, params: UpdateAppParams) -> Result<(), WorkspaceError> {
-        let changeset = AppTableChangeset::new(params.clone());
-        let app_id = changeset.id.clone();
-        let _ = self.sql.update_app(changeset)?;
-        let _ = self.update_app_on_server(params).await?;
+        let _ = self.update_app_on_server(params.clone()).await?;
 
-        let app: App = self.sql.read_app(&app_id, false)?.into();
+        let changeset = AppTableChangeset::new(params);
+        let app_id = changeset.id.clone();
+        let conn = self.database.db_connection()?;
+        let _ = self.sql.update_app(changeset, &*conn)?;
+        let app: App = self.sql.read_app(&app_id, false, &*conn)?.into();
         ObservableBuilder::new(&app_id, WorkspaceObservable::AppUpdated)
             .payload(app)
             .build();

+ 51 - 31
rust-lib/flowy-workspace/src/services/view_controller.rs

@@ -12,73 +12,93 @@ use crate::{
     module::WorkspaceUser,
     observable::ObservableBuilder,
 };
+use flowy_database::SqliteConnection;
 use std::sync::Arc;
 
 pub(crate) struct ViewController {
     user: Arc<dyn WorkspaceUser>,
     sql: Arc<ViewTableSql>,
     server: Server,
+    database: Arc<dyn WorkspaceDatabase>,
 }
 
 impl ViewController {
     pub(crate) fn new(user: Arc<dyn WorkspaceUser>, database: Arc<dyn WorkspaceDatabase>, server: Server) -> Self {
-        let sql = Arc::new(ViewTableSql { database });
-        Self { user, sql, server }
+        let sql = Arc::new(ViewTableSql {});
+        Self {
+            user,
+            sql,
+            server,
+            database,
+        }
     }
 
     pub(crate) async fn create_view(&self, params: CreateViewParams) -> Result<View, WorkspaceError> {
         let view = self.create_view_on_server(params).await?;
+        let conn = &*self.database.db_connection()?;
         let view_table = ViewTable::new(view.clone());
-        let _ = self.sql.create_view(view_table)?;
 
-        let repeated_view = self.read_local_views_belong_to(&view.belong_to_id)?;
-        ObservableBuilder::new(&view.belong_to_id, WorkspaceObservable::AppCreateView)
-            .payload(repeated_view)
-            .build();
+        (conn).immediate_transaction::<_, WorkspaceError, _>(|| {
+            let _ = self.sql.create_view(view_table, conn)?;
+            let repeated_view = self.read_local_views_belong_to(&view.belong_to_id, conn)?;
+            ObservableBuilder::new(&view.belong_to_id, WorkspaceObservable::AppCreateView)
+                .payload(repeated_view)
+                .build();
+            Ok(())
+        })?;
+
         Ok(view)
     }
 
     pub(crate) async fn read_view(&self, params: QueryViewParams) -> Result<View, WorkspaceError> {
-        let view_table = self.sql.read_view(&params.view_id, params.is_trash)?;
+        let conn = self.database.db_connection()?;
+        let view_table = self.sql.read_view(&params.view_id, Some(params.is_trash), &*conn)?;
         let view: View = view_table.into();
-        let _ = self.read_view_on_server(params).await?;
+        let _ = self.read_view_on_server(params);
         Ok(view)
     }
 
     pub(crate) async fn delete_view(&self, view_id: &str) -> Result<(), WorkspaceError> {
-        let view_table = self.sql.delete_view(view_id)?;
-        let _ = self.delete_view_on_server(view_id).await?;
+        let conn = &*self.database.db_connection()?;
+
+        (conn).immediate_transaction::<_, WorkspaceError, _>(|| {
+            let view_table = self.sql.delete_view(view_id, conn)?;
+            let repeated_view = self.read_local_views_belong_to(&view_table.belong_to_id, conn)?;
+            ObservableBuilder::new(&view_table.belong_to_id, WorkspaceObservable::AppDeleteView)
+                .payload(repeated_view)
+                .build();
+            Ok(())
+        })?;
+
+        let _ = self.delete_view_on_server(view_id);
 
-        let repeated_view = self.read_local_views_belong_to(&view_table.belong_to_id)?;
-        ObservableBuilder::new(&view_table.belong_to_id, WorkspaceObservable::AppDeleteView)
-            .payload(repeated_view)
-            .build();
         Ok(())
     }
 
     // belong_to_id will be the app_id or view_id.
+    #[tracing::instrument(level = "debug", skip(self), err)]
     pub(crate) async fn read_views_belong_to(&self, belong_to_id: &str) -> Result<RepeatedView, WorkspaceError> {
         // TODO: read from server
-        let views = self
-            .sql
-            .read_views_belong_to(belong_to_id)?
-            .into_iter()
-            .map(|view_table| view_table.into())
-            .collect::<Vec<View>>();
-
-        Ok(RepeatedView { items: views })
+        let conn = self.database.db_connection()?;
+        let repeated_view = self.read_local_views_belong_to(belong_to_id, &*conn)?;
+        Ok(repeated_view)
     }
 
     pub(crate) async fn update_view(&self, params: UpdateViewParams) -> Result<(), WorkspaceError> {
+        let conn = &*self.database.db_connection()?;
         let changeset = ViewTableChangeset::new(params.clone());
         let view_id = changeset.id.clone();
-        let _ = self.sql.update_view(changeset)?;
-        let _ = self.update_view_on_server(params).await?;
 
-        let view: View = self.sql.read_view(&view_id, false)?.into();
-        ObservableBuilder::new(&view_id, WorkspaceObservable::ViewUpdated)
-            .payload(view)
-            .build();
+        (conn).immediate_transaction::<_, WorkspaceError, _>(|| {
+            let _ = self.sql.update_view(changeset, conn)?;
+            let view: View = self.sql.read_view(&view_id, None, conn)?.into();
+            ObservableBuilder::new(&view_id, WorkspaceObservable::ViewUpdated)
+                .payload(view)
+                .build();
+            Ok(())
+        })?;
+
+        let _ = self.update_view_on_server(params);
         Ok(())
     }
 }
@@ -143,10 +163,10 @@ impl ViewController {
     }
 
     // belong_to_id will be the app_id or view_id.
-    fn read_local_views_belong_to(&self, belong_to_id: &str) -> Result<RepeatedView, WorkspaceError> {
+    fn read_local_views_belong_to(&self, belong_to_id: &str, conn: &SqliteConnection) -> Result<RepeatedView, WorkspaceError> {
         let views = self
             .sql
-            .read_views_belong_to(belong_to_id)?
+            .read_views_belong_to(belong_to_id, conn)?
             .into_iter()
             .map(|view_table| view_table.into())
             .collect::<Vec<View>>();

+ 106 - 48
rust-lib/flowy-workspace/src/services/workspace_controller.rs

@@ -4,17 +4,20 @@ use crate::{
     module::{WorkspaceDatabase, WorkspaceUser},
     observable::WorkspaceObservable,
     services::{helper::spawn, server::Server, AppController},
-    sql_tables::workspace::{WorkspaceSql, WorkspaceTable, WorkspaceTableChangeset},
+    sql_tables::workspace::{WorkspaceTable, WorkspaceTableChangeset, WorkspaceTableSql},
 };
 
 use flowy_infra::kv::KV;
 
 use crate::{entities::app::RepeatedApp, observable::ObservableBuilder};
+use flowy_database::SqliteConnection;
 use std::sync::Arc;
 
 pub(crate) struct WorkspaceController {
     pub user: Arc<dyn WorkspaceUser>,
-    pub sql: Arc<WorkspaceSql>,
+    pub workspace_sql: Arc<WorkspaceTableSql>,
+    // pub app_sql: Arc<AppTableSql>,
+    pub database: Arc<dyn WorkspaceDatabase>,
     pub app_controller: Arc<AppController>,
     server: Server,
 }
@@ -26,10 +29,11 @@ impl WorkspaceController {
         app_controller: Arc<AppController>,
         server: Server,
     ) -> Self {
-        let sql = Arc::new(WorkspaceSql::new(database));
+        let sql = Arc::new(WorkspaceTableSql {});
         Self {
             user,
-            sql,
+            workspace_sql: sql,
+            database,
             app_controller,
             server,
         }
@@ -39,48 +43,73 @@ impl WorkspaceController {
         let workspace = self.create_workspace_on_server(params.clone()).await?;
         let user_id = self.user.user_id()?;
         let workspace_table = WorkspaceTable::new(workspace.clone(), &user_id);
-        let _ = self.sql.create_workspace(workspace_table)?;
+        let conn = &*self.database.db_connection()?;
+        //[[immediate_transaction]]
+        // https://sqlite.org/lang_transaction.html
+        // IMMEDIATE cause the database connection to start a new write immediately,
+        // without waiting for a write statement. The BEGIN IMMEDIATE might fail
+        // with SQLITE_BUSY if another write transaction is already active on another
+        // database connection.
+        //
+        // EXCLUSIVE is similar to IMMEDIATE in that a write transaction is started
+        // immediately. EXCLUSIVE and IMMEDIATE are the same in WAL mode, but in
+        // other journaling modes, EXCLUSIVE prevents other database connections from
+        // reading the database while the transaction is underway.
+        (conn).immediate_transaction::<_, WorkspaceError, _>(|| {
+            self.workspace_sql.create_workspace(workspace_table, conn)?;
+            let repeated_workspace = self.read_local_workspaces(None, &user_id, conn)?;
+            ObservableBuilder::new(&user_id, WorkspaceObservable::UserCreateWorkspace)
+                .payload(repeated_workspace)
+                .build();
+
+            Ok(())
+        })?;
 
-        // Opti: read all local workspaces may cause performance issues
-        let repeated_workspace = self.read_local_workspaces(None, &user_id)?;
-        ObservableBuilder::new(&user_id, WorkspaceObservable::UserCreateWorkspace)
-            .payload(repeated_workspace)
-            .build();
         Ok(workspace)
     }
 
     pub(crate) async fn update_workspace(&self, params: UpdateWorkspaceParams) -> Result<(), WorkspaceError> {
-        let changeset = WorkspaceTableChangeset::new(params.clone());
+        let _ = self.update_workspace_on_server(params.clone()).await?;
+
+        let changeset = WorkspaceTableChangeset::new(params);
         let workspace_id = changeset.id.clone();
-        let _ = self.sql.update_workspace(changeset)?;
-        let _ = self.update_workspace_on_server(params).await?;
+        let conn = &*self.database.db_connection()?;
+        (conn).immediate_transaction::<_, WorkspaceError, _>(|| {
+            let _ = self.workspace_sql.update_workspace(changeset, conn)?;
+            let user_id = self.user.user_id()?;
+            let workspace = self.read_local_workspace(workspace_id.clone(), &user_id, conn)?;
+            ObservableBuilder::new(&workspace_id, WorkspaceObservable::WorkspaceUpdated)
+                .payload(workspace)
+                .build();
+
+            Ok(())
+        })?;
 
-        // Opti: transaction
-        let user_id = self.user.user_id()?;
-        let workspace = self.read_local_workspace(workspace_id.clone(), &user_id)?;
-        ObservableBuilder::new(&workspace_id, WorkspaceObservable::WorkspaceUpdated)
-            .payload(workspace)
-            .build();
         Ok(())
     }
 
     pub(crate) async fn delete_workspace(&self, workspace_id: &str) -> Result<(), WorkspaceError> {
         let user_id = self.user.user_id()?;
-        let _ = self.sql.delete_workspace(workspace_id)?;
         let _ = self.delete_workspace_on_server(workspace_id).await?;
+        let conn = &*self.database.db_connection()?;
+        (conn).immediate_transaction::<_, WorkspaceError, _>(|| {
+            let _ = self.workspace_sql.delete_workspace(workspace_id, conn)?;
+            let repeated_workspace = self.read_local_workspaces(None, &user_id, conn)?;
+            ObservableBuilder::new(&user_id, WorkspaceObservable::UserDeleteWorkspace)
+                .payload(repeated_workspace)
+                .build();
+
+            Ok(())
+        })?;
 
-        // Opti: read all local workspaces may cause performance issues
-        let repeated_workspace = self.read_local_workspaces(None, &user_id)?;
-        ObservableBuilder::new(&user_id, WorkspaceObservable::UserDeleteWorkspace)
-            .payload(repeated_workspace)
-            .build();
         Ok(())
     }
 
     pub(crate) async fn open_workspace(&self, params: QueryWorkspaceParams) -> Result<Workspace, WorkspaceError> {
         let user_id = self.user.user_id()?;
+        let conn = self.database.db_connection()?;
         if let Some(workspace_id) = params.workspace_id.clone() {
-            let workspace = self.read_local_workspace(workspace_id, &user_id)?;
+            let workspace = self.read_local_workspace(workspace_id, &user_id, &*conn)?;
             set_current_workspace(&workspace.id);
             Ok(workspace)
         } else {
@@ -92,34 +121,47 @@ impl WorkspaceController {
 
     pub(crate) async fn read_workspaces(&self, params: QueryWorkspaceParams) -> Result<RepeatedWorkspace, WorkspaceError> {
         let user_id = self.user.user_id()?;
-        let workspaces = self.read_local_workspaces(params.workspace_id.clone(), &user_id)?;
-        let _ = self.read_workspaces_on_server(user_id, params).await?;
+        let _ = self.read_workspaces_on_server(user_id.clone(), params.clone()).await;
+
+        let conn = self.database.db_connection()?;
+        let workspaces = self.read_local_workspaces(params.workspace_id.clone(), &user_id, &*conn)?;
         Ok(workspaces)
     }
 
     pub(crate) async fn read_cur_workspace(&self) -> Result<Workspace, WorkspaceError> {
         let workspace_id = get_current_workspace()?;
         let user_id = self.user.user_id()?;
-        let workspace = self.read_local_workspace(workspace_id, &user_id)?;
+        let params = QueryWorkspaceParams {
+            workspace_id: Some(workspace_id.clone()),
+        };
+        let _ = self.read_workspaces_on_server(user_id.clone(), params).await?;
+
+        let conn = self.database.db_connection()?;
+        let workspace = self.read_local_workspace(workspace_id, &user_id, &*conn)?;
         Ok(workspace)
     }
 
     pub(crate) async fn read_workspace_apps(&self) -> Result<RepeatedApp, WorkspaceError> {
         let workspace_id = get_current_workspace()?;
-        let apps = self.read_local_apps(&workspace_id)?;
+        let conn = self.database.db_connection()?;
+        let apps = self.read_local_apps(&workspace_id, &*conn)?;
         // TODO: read from server
         Ok(RepeatedApp { items: apps })
     }
 
-    #[tracing::instrument(level = "debug", skip(self), err)]
-    fn read_local_workspaces(&self, workspace_id: Option<String>, user_id: &str) -> Result<RepeatedWorkspace, WorkspaceError> {
-        let sql = self.sql.clone();
+    #[tracing::instrument(level = "debug", skip(self, conn), err)]
+    fn read_local_workspaces(
+        &self,
+        workspace_id: Option<String>,
+        user_id: &str,
+        conn: &SqliteConnection,
+    ) -> Result<RepeatedWorkspace, WorkspaceError> {
         let workspace_id = workspace_id.to_owned();
-        let workspace_tables = sql.read_workspaces(workspace_id, user_id)?;
+        let workspace_tables = self.workspace_sql.read_workspaces(workspace_id, user_id, conn)?;
 
         let mut workspaces = vec![];
         for table in workspace_tables {
-            let apps = self.read_local_apps(&table.id)?;
+            let apps = self.read_local_apps(&table.id, conn)?;
             let mut workspace: Workspace = table.into();
             workspace.apps.items = apps;
             workspaces.push(workspace);
@@ -127,9 +169,9 @@ impl WorkspaceController {
         Ok(RepeatedWorkspace { items: workspaces })
     }
 
-    fn read_local_workspace(&self, workspace_id: String, user_id: &str) -> Result<Workspace, WorkspaceError> {
+    fn read_local_workspace(&self, workspace_id: String, user_id: &str, conn: &SqliteConnection) -> Result<Workspace, WorkspaceError> {
         // Opti: fetch single workspace from local db
-        let mut repeated_workspace = self.read_local_workspaces(Some(workspace_id), user_id)?;
+        let mut repeated_workspace = self.read_local_workspaces(Some(workspace_id), user_id, conn)?;
         if repeated_workspace.is_empty() {
             return Err(ErrorBuilder::new(ErrorCode::RecordNotFound).build());
         }
@@ -139,11 +181,11 @@ impl WorkspaceController {
         Ok(workspace)
     }
 
-    #[tracing::instrument(level = "debug", skip(self), err)]
-    fn read_local_apps(&self, workspace_id: &str) -> Result<Vec<App>, WorkspaceError> {
+    #[tracing::instrument(level = "debug", skip(self, conn), err)]
+    fn read_local_apps(&self, workspace_id: &str, conn: &SqliteConnection) -> Result<Vec<App>, WorkspaceError> {
         let apps = self
-            .sql
-            .read_apps_belong_to_workspace(workspace_id)?
+            .workspace_sql
+            .read_apps_belong_to_workspace(workspace_id, conn)?
             .into_iter()
             .map(|app_table| app_table.into())
             .collect::<Vec<App>>();
@@ -202,20 +244,36 @@ impl WorkspaceController {
     #[tracing::instrument(skip(self), err)]
     async fn read_workspaces_on_server(&self, user_id: String, params: QueryWorkspaceParams) -> Result<(), WorkspaceError> {
         let (token, server) = self.token_with_server()?;
-        let sql = self.sql.clone();
-        let conn = self.sql.get_db_conn()?;
+        let sql = self.workspace_sql.clone();
+        let conn = self.database.db_connection()?;
         spawn(async move {
             // Opti: retry?
             let workspaces = server.read_workspace(&token, params).await?;
             let _ = (&*conn).immediate_transaction::<_, WorkspaceError, _>(|| {
                 for workspace in &workspaces.items {
                     let mut m_workspace = workspace.clone();
-                    let repeated_app = m_workspace.apps.take_items();
+                    let apps = m_workspace.apps.take_items();
                     let workspace_table = WorkspaceTable::new(m_workspace, &user_id);
-                    log::debug!("Save workspace: {} to disk", &workspace.id);
-                    let _ = sql.create_workspace_with(workspace_table, &*conn)?;
-                    log::debug!("Save workspace: {} apps to disk", &workspace.id);
-                    let _ = sql.create_apps(repeated_app, &*conn)?;
+                    log::debug!("Save workspace");
+                    let _ = sql.create_workspace(workspace_table, &*conn)?;
+                    log::debug!("Save apps");
+
+                    for mut app in apps {
+                        let views = app.belongings.take_items();
+                        // let _ = sql.create_apps(vec![app], &*conn)?;
+
+                        // pub(crate) fn create_apps(&self, apps: Vec<App>, conn: &SqliteConnection) ->
+                        // Result<(), WorkspaceError> {     for app in apps {
+                        //         let _ = self.app_sql.create_app_with(AppTable::new(app), conn)?;
+                        //     }
+                        //     Ok(())
+                        // }
+
+                        log::debug!("Save views");
+                        for _view in views {
+                            //
+                        }
+                    }
                 }
                 Ok(())
             })?;

+ 10 - 29
rust-lib/flowy-workspace/src/sql_tables/app/app_sql.rs

@@ -1,6 +1,5 @@
 use crate::{
     errors::WorkspaceError,
-    module::WorkspaceDatabase,
     sql_tables::app::{AppTable, AppTableChangeset},
 };
 use flowy_database::{
@@ -8,24 +7,11 @@ use flowy_database::{
     schema::{app_table, app_table::dsl},
     SqliteConnection,
 };
-use std::sync::Arc;
 
-pub struct AppTableSql {
-    database: Arc<dyn WorkspaceDatabase>,
-}
+pub struct AppTableSql {}
 
 impl AppTableSql {
-    pub fn new(database: Arc<dyn WorkspaceDatabase>) -> Self { Self { database } }
-}
-
-impl AppTableSql {
-    pub(crate) fn create_app(&self, app_table: AppTable) -> Result<(), WorkspaceError> {
-        let conn = self.database.db_connection()?;
-        let _ = self.create_app_with(app_table, &*conn)?;
-        Ok(())
-    }
-
-    pub(crate) fn create_app_with(&self, app_table: AppTable, conn: &SqliteConnection) -> Result<(), WorkspaceError> {
+    pub(crate) fn create_app(&self, app_table: AppTable, conn: &SqliteConnection) -> Result<(), WorkspaceError> {
         match diesel_record_count!(app_table, &app_table.id, conn) {
             0 => diesel_insert_table!(app_table, &app_table, conn),
             _ => {
@@ -36,36 +22,31 @@ impl AppTableSql {
         Ok(())
     }
 
-    pub(crate) fn update_app(&self, changeset: AppTableChangeset) -> Result<(), WorkspaceError> {
-        let conn = self.database.db_connection()?;
-        diesel_update_table!(app_table, changeset, &*conn);
+    pub(crate) fn update_app(&self, changeset: AppTableChangeset, conn: &SqliteConnection) -> Result<(), WorkspaceError> {
+        diesel_update_table!(app_table, changeset, conn);
         Ok(())
     }
 
-    pub(crate) fn read_app(&self, app_id: &str, is_trash: bool) -> Result<AppTable, WorkspaceError> {
+    pub(crate) fn read_app(&self, app_id: &str, is_trash: bool, conn: &SqliteConnection) -> Result<AppTable, WorkspaceError> {
         let app_table = dsl::app_table
             .filter(app_table::id.eq(app_id))
             .filter(app_table::is_trash.eq(is_trash))
-            .first::<AppTable>(&*(self.database.db_connection()?))?;
+            .first::<AppTable>(conn)?;
 
         Ok(app_table)
     }
 
-    pub(crate) fn read_apps(&self, workspace_id: &str, is_trash: bool) -> Result<Vec<AppTable>, WorkspaceError> {
+    pub(crate) fn read_apps(&self, workspace_id: &str, is_trash: bool, conn: &SqliteConnection) -> Result<Vec<AppTable>, WorkspaceError> {
         let app_table = dsl::app_table
             .filter(app_table::workspace_id.eq(workspace_id))
             .filter(app_table::is_trash.eq(is_trash))
-            .load::<AppTable>(&*(self.database.db_connection()?))?;
+            .load::<AppTable>(conn)?;
 
         Ok(app_table)
     }
 
-    pub(crate) fn delete_app(&self, app_id: &str) -> Result<AppTable, WorkspaceError> {
-        let conn = self.database.db_connection()?;
-        // TODO: group into sql transaction
-        let app_table = dsl::app_table
-            .filter(app_table::id.eq(app_id))
-            .first::<AppTable>(&*(self.database.db_connection()?))?;
+    pub(crate) fn delete_app(&self, app_id: &str, conn: &SqliteConnection) -> Result<AppTable, WorkspaceError> {
+        let app_table = dsl::app_table.filter(app_table::id.eq(app_id)).first::<AppTable>(conn)?;
         diesel_delete_table!(app_table, app_id, conn);
         Ok(app_table)
     }

+ 2 - 2
rust-lib/flowy-workspace/src/sql_tables/app/mod.rs

@@ -1,5 +1,5 @@
 mod app_sql;
 mod app_table;
 
-pub use app_sql::*;
-pub use app_table::*;
+pub(crate) use app_sql::*;
+pub(crate) use app_table::*;

+ 17 - 27
rust-lib/flowy-workspace/src/sql_tables/view/view_sql.rs

@@ -1,56 +1,46 @@
 use crate::{
     errors::WorkspaceError,
-    module::WorkspaceDatabase,
     sql_tables::view::{ViewTable, ViewTableChangeset},
 };
 use flowy_database::{
     prelude::*,
     schema::{view_table, view_table::dsl},
+    SqliteConnection,
 };
-use std::sync::Arc;
 
-pub struct ViewTableSql {
-    pub database: Arc<dyn WorkspaceDatabase>,
-}
+pub struct ViewTableSql {}
 
 impl ViewTableSql {
-    pub(crate) fn create_view(&self, view_table: ViewTable) -> Result<(), WorkspaceError> {
-        let conn = self.database.db_connection()?;
-        let _ = diesel::insert_into(view_table::table).values(view_table).execute(&*conn)?;
+    pub(crate) fn create_view(&self, view_table: ViewTable, conn: &SqliteConnection) -> Result<(), WorkspaceError> {
+        let _ = diesel::insert_into(view_table::table).values(view_table).execute(conn)?;
         Ok(())
     }
 
-    pub(crate) fn read_view(&self, view_id: &str, is_trash: bool) -> Result<ViewTable, WorkspaceError> {
-        let view_table = dsl::view_table
-            .filter(view_table::id.eq(view_id))
-            .filter(view_table::is_trash.eq(is_trash))
-            .first::<ViewTable>(&*(self.database.db_connection()?))?;
-
+    pub(crate) fn read_view(&self, view_id: &str, is_trash: Option<bool>, conn: &SqliteConnection) -> Result<ViewTable, WorkspaceError> {
+        // https://docs.diesel.rs/diesel/query_builder/struct.UpdateStatement.html
+        let mut filter = dsl::view_table.filter(view_table::id.eq(view_id)).into_boxed();
+        if let Some(is_trash) = is_trash {
+            filter = filter.filter(view_table::is_trash.eq(is_trash));
+        }
+        let view_table = filter.first::<ViewTable>(conn)?;
         Ok(view_table)
     }
 
-    pub(crate) fn read_views_belong_to(&self, belong_to_id: &str) -> Result<Vec<ViewTable>, WorkspaceError> {
+    pub(crate) fn read_views_belong_to(&self, belong_to_id: &str, conn: &SqliteConnection) -> Result<Vec<ViewTable>, WorkspaceError> {
         let view_tables = dsl::view_table
             .filter(view_table::belong_to_id.eq(belong_to_id))
-            .load::<ViewTable>(&*(self.database.db_connection()?))?;
+            .load::<ViewTable>(conn)?;
 
         Ok(view_tables)
     }
 
-    pub(crate) fn update_view(&self, changeset: ViewTableChangeset) -> Result<(), WorkspaceError> {
-        let conn = self.database.db_connection()?;
-        diesel_update_table!(view_table, changeset, &*conn);
+    pub(crate) fn update_view(&self, changeset: ViewTableChangeset, conn: &SqliteConnection) -> Result<(), WorkspaceError> {
+        diesel_update_table!(view_table, changeset, conn);
         Ok(())
     }
 
-    pub(crate) fn delete_view(&self, view_id: &str) -> Result<ViewTable, WorkspaceError> {
-        let conn = self.database.db_connection()?;
-
-        // TODO: group into transaction
-        let view_table = dsl::view_table
-            .filter(view_table::id.eq(view_id))
-            .first::<ViewTable>(&*(self.database.db_connection()?))?;
-
+    pub(crate) fn delete_view(&self, view_id: &str, conn: &SqliteConnection) -> Result<ViewTable, WorkspaceError> {
+        let view_table = dsl::view_table.filter(view_table::id.eq(view_id)).first::<ViewTable>(conn)?;
         diesel_delete_table!(view_table, view_id, conn);
         Ok(view_table)
     }

+ 2 - 2
rust-lib/flowy-workspace/src/sql_tables/workspace/mod.rs

@@ -1,5 +1,5 @@
 mod workspace_sql;
 mod workspace_table;
 
-pub use workspace_sql::*;
-pub use workspace_table::*;
+pub(crate) use workspace_sql::*;
+pub(crate) use workspace_table::*;

+ 22 - 62
rust-lib/flowy-workspace/src/sql_tables/workspace/workspace_sql.rs

@@ -1,53 +1,20 @@
 use crate::{
-    entities::app::App,
     errors::WorkspaceError,
-    module::WorkspaceDatabase,
     sql_tables::{
-        app::{AppTable, AppTableSql},
+        app::AppTable,
         workspace::{WorkspaceTable, WorkspaceTableChangeset},
     },
 };
 use diesel::SqliteConnection;
 use flowy_database::{
-    macros::*,
     prelude::*,
     schema::{workspace_table, workspace_table::dsl},
-    DBConnection,
 };
-use std::sync::Arc;
 
-pub(crate) struct WorkspaceSql {
-    database: Arc<dyn WorkspaceDatabase>,
-    app_sql: Arc<AppTableSql>,
-}
-
-impl WorkspaceSql {
-    pub fn new(database: Arc<dyn WorkspaceDatabase>) -> Self {
-        Self {
-            database: database.clone(),
-            app_sql: Arc::new(AppTableSql::new(database.clone())),
-        }
-    }
-}
-
-impl WorkspaceSql {
-    pub(crate) fn create_workspace(&self, table: WorkspaceTable) -> Result<(), WorkspaceError> {
-        let conn = &*self.database.db_connection()?;
-        //[[immediate_transaction]]
-        // https://sqlite.org/lang_transaction.html
-        // IMMEDIATE cause the database connection to start a new write immediately,
-        // without waiting for a write statement. The BEGIN IMMEDIATE might fail
-        // with SQLITE_BUSY if another write transaction is already active on another
-        // database connection.
-        //
-        // EXCLUSIVE is similar to IMMEDIATE in that a write transaction is started
-        // immediately. EXCLUSIVE and IMMEDIATE are the same in WAL mode, but in
-        // other journaling modes, EXCLUSIVE prevents other database connections from
-        // reading the database while the transaction is underway.
-        (conn).immediate_transaction::<_, WorkspaceError, _>(|| self.create_workspace_with(table, conn))
-    }
+pub(crate) struct WorkspaceTableSql {}
 
-    pub(crate) fn create_workspace_with(&self, table: WorkspaceTable, conn: &SqliteConnection) -> Result<(), WorkspaceError> {
+impl WorkspaceTableSql {
+    pub(crate) fn create_workspace(&self, table: WorkspaceTable, conn: &SqliteConnection) -> Result<(), WorkspaceError> {
         match diesel_record_count!(workspace_table, &table.id, conn) {
             0 => diesel_insert_table!(workspace_table, &table, conn),
             _ => {
@@ -58,55 +25,48 @@ impl WorkspaceSql {
         Ok(())
     }
 
-    pub(crate) fn create_apps(&self, apps: Vec<App>, conn: &SqliteConnection) -> Result<(), WorkspaceError> {
-        for app in apps {
-            let _ = self.app_sql.create_app_with(AppTable::new(app), conn)?;
-        }
-        Ok(())
-    }
-
-    pub(crate) fn read_workspaces(&self, workspace_id: Option<String>, user_id: &str) -> Result<Vec<WorkspaceTable>, WorkspaceError> {
+    pub(crate) fn read_workspaces(
+        &self,
+        workspace_id: Option<String>,
+        user_id: &str,
+        conn: &SqliteConnection,
+    ) -> Result<Vec<WorkspaceTable>, WorkspaceError> {
         let workspaces = match workspace_id {
             None => dsl::workspace_table
                 .filter(workspace_table::user_id.eq(user_id))
-                .load::<WorkspaceTable>(&*(self.database.db_connection()?))?,
+                .load::<WorkspaceTable>(conn)?,
             Some(workspace_id) => dsl::workspace_table
                 .filter(workspace_table::user_id.eq(user_id))
                 .filter(workspace_table::id.eq(&workspace_id))
-                .load::<WorkspaceTable>(&*(self.database.db_connection()?))?,
+                .load::<WorkspaceTable>(conn)?,
         };
 
         Ok(workspaces)
     }
 
-    pub(crate) fn update_workspace(&self, changeset: WorkspaceTableChangeset) -> Result<(), WorkspaceError> {
-        let conn = self.database.db_connection()?;
-        diesel_update_table!(workspace_table, changeset, &*conn);
+    pub(crate) fn update_workspace(&self, changeset: WorkspaceTableChangeset, conn: &SqliteConnection) -> Result<(), WorkspaceError> {
+        diesel_update_table!(workspace_table, changeset, conn);
         Ok(())
     }
 
-    pub(crate) fn delete_workspace(&self, workspace_id: &str) -> Result<(), WorkspaceError> {
-        let conn = self.database.db_connection()?;
+    pub(crate) fn delete_workspace(&self, workspace_id: &str, conn: &SqliteConnection) -> Result<(), WorkspaceError> {
         diesel_delete_table!(workspace_table, workspace_id, conn);
         Ok(())
     }
 
-    pub(crate) fn read_apps_belong_to_workspace(&self, workspace_id: &str) -> Result<Vec<AppTable>, WorkspaceError> {
-        let conn = self.database.db_connection()?;
-
+    pub(crate) fn read_apps_belong_to_workspace(
+        &self,
+        workspace_id: &str,
+        conn: &SqliteConnection,
+    ) -> Result<Vec<AppTable>, WorkspaceError> {
         let apps = conn.immediate_transaction::<_, WorkspaceError, _>(|| {
             let workspace_table: WorkspaceTable = dsl::workspace_table
                 .filter(workspace_table::id.eq(workspace_id))
-                .first::<WorkspaceTable>(&*(conn))?;
-            let apps = AppTable::belonging_to(&workspace_table).load::<AppTable>(&*conn)?;
+                .first::<WorkspaceTable>(conn)?;
+            let apps = AppTable::belonging_to(&workspace_table).load::<AppTable>(conn)?;
             Ok(apps)
         })?;
 
         Ok(apps)
     }
-
-    pub(crate) fn get_db_conn(&self) -> Result<DBConnection, WorkspaceError> {
-        let db = self.database.db_connection()?;
-        Ok(db)
-    }
 }