|
@@ -2,18 +2,11 @@ use std::collections::HashMap;
|
|
|
use std::fmt::{Display, Formatter};
|
|
|
use std::sync::{Arc, Weak};
|
|
|
|
|
|
-use appflowy_integrate::collab_builder::{CollabStorageProvider, CollabStorageType};
|
|
|
-use appflowy_integrate::{RemoteCollabStorage, YrsDocAction};
|
|
|
-use bytes::Bytes;
|
|
|
-use collab_define::{CollabObject, CollabType};
|
|
|
use parking_lot::RwLock;
|
|
|
use serde_repr::*;
|
|
|
|
|
|
-use flowy_database_deps::cloud::*;
|
|
|
-use flowy_document2::deps::DocumentData;
|
|
|
-use flowy_document_deps::cloud::{DocumentCloudService, DocumentSnapshot};
|
|
|
+use collab_integrate::YrsDocAction;
|
|
|
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
|
|
|
-use flowy_folder_deps::cloud::*;
|
|
|
use flowy_server::af_cloud::configuration::appflowy_cloud_server_configuration;
|
|
|
use flowy_server::af_cloud::AFCloudServer;
|
|
|
use flowy_server::local_server::{LocalServer, LocalServerDB};
|
|
@@ -21,22 +14,19 @@ use flowy_server::supabase::SupabaseServer;
|
|
|
use flowy_server::{AppFlowyEncryption, AppFlowyServer, EncryptionImpl};
|
|
|
use flowy_server_config::supabase_config::SupabaseConfiguration;
|
|
|
use flowy_sqlite::kv::StorePreferences;
|
|
|
-use flowy_storage::{FileStorageService, StorageObject};
|
|
|
-use flowy_user::event_map::UserCloudServiceProvider;
|
|
|
use flowy_user::services::database::{
|
|
|
get_user_profile, get_user_workspace, open_collab_db, open_user_db,
|
|
|
};
|
|
|
use flowy_user_deps::cloud::UserCloudService;
|
|
|
use flowy_user_deps::entities::*;
|
|
|
-use lib_infra::future::FutureResult;
|
|
|
|
|
|
use crate::AppFlowyCoreConfig;
|
|
|
|
|
|
-const SERVER_PROVIDER_TYPE_KEY: &str = "server_provider_type";
|
|
|
+pub(crate) const SERVER_PROVIDER_TYPE_KEY: &str = "server_provider_type";
|
|
|
|
|
|
#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize_repr, Deserialize_repr)]
|
|
|
#[repr(u8)]
|
|
|
-pub enum ServerProviderType {
|
|
|
+pub enum ServerType {
|
|
|
/// Local server provider.
|
|
|
/// Offline mode, no user authentication and the data is stored locally.
|
|
|
Local = 0,
|
|
@@ -45,48 +35,48 @@ pub enum ServerProviderType {
|
|
|
/// progress.
|
|
|
AppFlowyCloud = 1,
|
|
|
/// Supabase server provider.
|
|
|
- /// It uses supabase's postgresql database to store data and user authentication.
|
|
|
+ /// It uses supabase postgresql database to store data and user authentication.
|
|
|
Supabase = 2,
|
|
|
}
|
|
|
|
|
|
-impl Display for ServerProviderType {
|
|
|
+impl Display for ServerType {
|
|
|
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
|
|
match self {
|
|
|
- ServerProviderType::Local => write!(f, "Local"),
|
|
|
- ServerProviderType::AppFlowyCloud => write!(f, "AppFlowyCloud"),
|
|
|
- ServerProviderType::Supabase => write!(f, "Supabase"),
|
|
|
+ ServerType::Local => write!(f, "Local"),
|
|
|
+ ServerType::AppFlowyCloud => write!(f, "AppFlowyCloud"),
|
|
|
+ ServerType::Supabase => write!(f, "Supabase"),
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-/// The [AppFlowyServerProvider] provides list of [AppFlowyServer] base on the [AuthType]. Using
|
|
|
-/// the auth type, the [AppFlowyServerProvider] will create a new [AppFlowyServer] if it doesn't
|
|
|
+/// The [ServerProvider] provides list of [AppFlowyServer] base on the [AuthType]. Using
|
|
|
+/// the auth type, the [ServerProvider] will create a new [AppFlowyServer] if it doesn't
|
|
|
/// exist.
|
|
|
/// Each server implements the [AppFlowyServer] trait, which provides the [UserCloudService], etc.
|
|
|
-pub struct AppFlowyServerProvider {
|
|
|
+pub struct ServerProvider {
|
|
|
config: AppFlowyCoreConfig,
|
|
|
- provider_type: RwLock<ServerProviderType>,
|
|
|
- providers: RwLock<HashMap<ServerProviderType, Arc<dyn AppFlowyServer>>>,
|
|
|
- encryption: RwLock<Arc<dyn AppFlowyEncryption>>,
|
|
|
- store_preferences: Weak<StorePreferences>,
|
|
|
- cache_user_service: RwLock<HashMap<ServerProviderType, Arc<dyn UserCloudService>>>,
|
|
|
-
|
|
|
- device_id: Arc<RwLock<String>>,
|
|
|
- enable_sync: RwLock<bool>,
|
|
|
- uid: Arc<RwLock<Option<i64>>>,
|
|
|
+ server_type: RwLock<ServerType>,
|
|
|
+ providers: RwLock<HashMap<ServerType, Arc<dyn AppFlowyServer>>>,
|
|
|
+ pub(crate) encryption: RwLock<Arc<dyn AppFlowyEncryption>>,
|
|
|
+ pub(crate) store_preferences: Weak<StorePreferences>,
|
|
|
+ pub(crate) cache_user_service: RwLock<HashMap<ServerType, Arc<dyn UserCloudService>>>,
|
|
|
+
|
|
|
+ pub(crate) device_id: Arc<RwLock<String>>,
|
|
|
+ pub(crate) enable_sync: RwLock<bool>,
|
|
|
+ pub(crate) uid: Arc<RwLock<Option<i64>>>,
|
|
|
}
|
|
|
|
|
|
-impl AppFlowyServerProvider {
|
|
|
+impl ServerProvider {
|
|
|
pub fn new(
|
|
|
config: AppFlowyCoreConfig,
|
|
|
- provider_type: ServerProviderType,
|
|
|
+ provider_type: ServerType,
|
|
|
store_preferences: Weak<StorePreferences>,
|
|
|
) -> Self {
|
|
|
let encryption = EncryptionImpl::new(None);
|
|
|
Self {
|
|
|
config,
|
|
|
- provider_type: RwLock::new(provider_type),
|
|
|
- device_id: Default::default(),
|
|
|
+ server_type: RwLock::new(provider_type),
|
|
|
+ device_id: Arc::new(RwLock::new(uuid::Uuid::new_v4().to_string())),
|
|
|
providers: RwLock::new(HashMap::new()),
|
|
|
enable_sync: RwLock::new(true),
|
|
|
encryption: RwLock::new(Arc::new(encryption)),
|
|
@@ -96,42 +86,51 @@ impl AppFlowyServerProvider {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- pub fn provider_type(&self) -> ServerProviderType {
|
|
|
- self.provider_type.read().clone()
|
|
|
+ pub fn get_server_type(&self) -> ServerType {
|
|
|
+ self.server_type.read().clone()
|
|
|
+ }
|
|
|
+
|
|
|
+ pub fn set_server_type(&self, server_type: ServerType) {
|
|
|
+ *self.server_type.write() = server_type;
|
|
|
}
|
|
|
|
|
|
/// Returns a [AppFlowyServer] trait implementation base on the provider_type.
|
|
|
- fn get_provider(
|
|
|
+ pub(crate) fn get_server(
|
|
|
&self,
|
|
|
- provider_type: &ServerProviderType,
|
|
|
+ server_type: &ServerType,
|
|
|
) -> FlowyResult<Arc<dyn AppFlowyServer>> {
|
|
|
- if let Some(provider) = self.providers.read().get(provider_type) {
|
|
|
+ if let Some(provider) = self.providers.read().get(server_type) {
|
|
|
return Ok(provider.clone());
|
|
|
}
|
|
|
|
|
|
- let server = match provider_type {
|
|
|
- ServerProviderType::Local => {
|
|
|
+ let server = match server_type {
|
|
|
+ ServerType::Local => {
|
|
|
let local_db = Arc::new(LocalServerDBImpl {
|
|
|
storage_path: self.config.storage_path.clone(),
|
|
|
});
|
|
|
let server = Arc::new(LocalServer::new(local_db));
|
|
|
-
|
|
|
Ok::<Arc<dyn AppFlowyServer>, FlowyError>(server)
|
|
|
},
|
|
|
- ServerProviderType::AppFlowyCloud => {
|
|
|
+ ServerType::AppFlowyCloud => {
|
|
|
let config = appflowy_cloud_server_configuration().map_err(|e| {
|
|
|
FlowyError::new(
|
|
|
ErrorCode::InvalidAuthConfig,
|
|
|
format!(
|
|
|
"Missing self host config: {:?}. Error: {:?}",
|
|
|
- provider_type, e
|
|
|
+ server_type, e
|
|
|
),
|
|
|
)
|
|
|
})?;
|
|
|
- let server = Arc::new(AFCloudServer::new(config));
|
|
|
+ tracing::trace!("🔑AppFlowy cloud config: {:?}", config);
|
|
|
+ let server = Arc::new(AFCloudServer::new(
|
|
|
+ config,
|
|
|
+ *self.enable_sync.read(),
|
|
|
+ self.device_id.clone(),
|
|
|
+ ));
|
|
|
+
|
|
|
Ok::<Arc<dyn AppFlowyServer>, FlowyError>(server)
|
|
|
},
|
|
|
- ServerProviderType::Supabase => {
|
|
|
+ ServerType::Supabase => {
|
|
|
let config = match SupabaseConfiguration::from_env() {
|
|
|
Ok(config) => config,
|
|
|
Err(e) => {
|
|
@@ -155,287 +154,30 @@ impl AppFlowyServerProvider {
|
|
|
self
|
|
|
.providers
|
|
|
.write()
|
|
|
- .insert(provider_type.clone(), server.clone());
|
|
|
+ .insert(server_type.clone(), server.clone());
|
|
|
Ok(server)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-impl FileStorageService for AppFlowyServerProvider {
|
|
|
- fn create_object(&self, object: StorageObject) -> FutureResult<String, FlowyError> {
|
|
|
- let server = self.get_provider(&self.provider_type.read());
|
|
|
- FutureResult::new(async move {
|
|
|
- let storage = server?.file_storage().ok_or(FlowyError::internal())?;
|
|
|
- storage.create_object(object).await
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
- fn delete_object_by_url(&self, object_url: String) -> FutureResult<(), FlowyError> {
|
|
|
- let server = self.get_provider(&self.provider_type.read());
|
|
|
- FutureResult::new(async move {
|
|
|
- let storage = server?.file_storage().ok_or(FlowyError::internal())?;
|
|
|
- storage.delete_object_by_url(object_url).await
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
- fn get_object_by_url(&self, object_url: String) -> FutureResult<Bytes, FlowyError> {
|
|
|
- let server = self.get_provider(&self.provider_type.read());
|
|
|
- FutureResult::new(async move {
|
|
|
- let storage = server?.file_storage().ok_or(FlowyError::internal())?;
|
|
|
- storage.get_object_by_url(object_url).await
|
|
|
- })
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-impl UserCloudServiceProvider for AppFlowyServerProvider {
|
|
|
- fn set_enable_sync(&self, uid: i64, enable_sync: bool) {
|
|
|
- match self.get_provider(&self.provider_type.read()) {
|
|
|
- Ok(server) => {
|
|
|
- server.set_enable_sync(uid, enable_sync);
|
|
|
- *self.enable_sync.write() = enable_sync;
|
|
|
- *self.uid.write() = Some(uid);
|
|
|
- },
|
|
|
- Err(e) => tracing::error!("🔴Failed to enable sync: {:?}", e),
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- fn set_encrypt_secret(&self, secret: String) {
|
|
|
- tracing::info!("🔑Set encrypt secret");
|
|
|
- self.encryption.write().set_secret(secret);
|
|
|
- }
|
|
|
-
|
|
|
- /// When user login, the provider type is set by the [AuthType] and save to disk for next use.
|
|
|
- ///
|
|
|
- /// Each [AuthType] has a corresponding [ServerProviderType]. The [ServerProviderType] is used
|
|
|
- /// to create a new [AppFlowyServer] if it doesn't exist. Once the [ServerProviderType] is set,
|
|
|
- /// it will be used when user open the app again.
|
|
|
- ///
|
|
|
- fn set_auth_type(&self, auth_type: AuthType) {
|
|
|
- let provider_type: ServerProviderType = auth_type.into();
|
|
|
- *self.provider_type.write() = provider_type.clone();
|
|
|
-
|
|
|
- match self.store_preferences.upgrade() {
|
|
|
- None => tracing::error!("🔴Failed to update server provider type: store preferences is drop"),
|
|
|
- Some(store_preferences) => {
|
|
|
- match store_preferences.set_object(SERVER_PROVIDER_TYPE_KEY, provider_type.clone()) {
|
|
|
- Ok(_) => tracing::trace!("Update server provider type to: {:?}", provider_type),
|
|
|
- Err(e) => {
|
|
|
- tracing::error!("🔴Failed to update server provider type: {:?}", e);
|
|
|
- },
|
|
|
- }
|
|
|
- },
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- fn set_device_id(&self, device_id: &str) {
|
|
|
- *self.device_id.write() = device_id.to_string();
|
|
|
- }
|
|
|
-
|
|
|
- /// Returns the [UserCloudService] base on the current [ServerProviderType].
|
|
|
- /// Creates a new [AppFlowyServer] if it doesn't exist.
|
|
|
- fn get_user_service(&self) -> Result<Arc<dyn UserCloudService>, FlowyError> {
|
|
|
- if let Some(user_service) = self
|
|
|
- .cache_user_service
|
|
|
- .read()
|
|
|
- .get(&self.provider_type.read())
|
|
|
- {
|
|
|
- return Ok(user_service.clone());
|
|
|
- }
|
|
|
-
|
|
|
- let provider_type = self.provider_type.read().clone();
|
|
|
- let user_service = self.get_provider(&provider_type)?.user_service();
|
|
|
- self
|
|
|
- .cache_user_service
|
|
|
- .write()
|
|
|
- .insert(provider_type, user_service.clone());
|
|
|
- Ok(user_service)
|
|
|
- }
|
|
|
-
|
|
|
- fn service_name(&self) -> String {
|
|
|
- self.provider_type.read().to_string()
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-impl FolderCloudService for AppFlowyServerProvider {
|
|
|
- fn create_workspace(&self, uid: i64, name: &str) -> FutureResult<Workspace, Error> {
|
|
|
- let server = self.get_provider(&self.provider_type.read());
|
|
|
- let name = name.to_string();
|
|
|
- FutureResult::new(async move { server?.folder_service().create_workspace(uid, &name).await })
|
|
|
- }
|
|
|
-
|
|
|
- fn get_folder_data(&self, workspace_id: &str) -> FutureResult<Option<FolderData>, Error> {
|
|
|
- let server = self.get_provider(&self.provider_type.read());
|
|
|
- let workspace_id = workspace_id.to_string();
|
|
|
- FutureResult::new(async move {
|
|
|
- server?
|
|
|
- .folder_service()
|
|
|
- .get_folder_data(&workspace_id)
|
|
|
- .await
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
- fn get_folder_snapshots(
|
|
|
- &self,
|
|
|
- workspace_id: &str,
|
|
|
- limit: usize,
|
|
|
- ) -> FutureResult<Vec<FolderSnapshot>, Error> {
|
|
|
- let workspace_id = workspace_id.to_string();
|
|
|
- let server = self.get_provider(&self.provider_type.read());
|
|
|
- FutureResult::new(async move {
|
|
|
- server?
|
|
|
- .folder_service()
|
|
|
- .get_folder_snapshots(&workspace_id, limit)
|
|
|
- .await
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
- fn get_folder_updates(&self, workspace_id: &str, uid: i64) -> FutureResult<Vec<Vec<u8>>, Error> {
|
|
|
- let workspace_id = workspace_id.to_string();
|
|
|
- let server = self.get_provider(&self.provider_type.read());
|
|
|
- FutureResult::new(async move {
|
|
|
- server?
|
|
|
- .folder_service()
|
|
|
- .get_folder_updates(&workspace_id, uid)
|
|
|
- .await
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
- fn service_name(&self) -> String {
|
|
|
- self
|
|
|
- .get_provider(&self.provider_type.read())
|
|
|
- .map(|provider| provider.folder_service().service_name())
|
|
|
- .unwrap_or_default()
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-impl DatabaseCloudService for AppFlowyServerProvider {
|
|
|
- fn get_collab_update(
|
|
|
- &self,
|
|
|
- object_id: &str,
|
|
|
- object_ty: CollabType,
|
|
|
- ) -> FutureResult<CollabObjectUpdate, Error> {
|
|
|
- let server = self.get_provider(&self.provider_type.read());
|
|
|
- let database_id = object_id.to_string();
|
|
|
- FutureResult::new(async move {
|
|
|
- server?
|
|
|
- .database_service()
|
|
|
- .get_collab_update(&database_id, object_ty)
|
|
|
- .await
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
- fn batch_get_collab_updates(
|
|
|
- &self,
|
|
|
- object_ids: Vec<String>,
|
|
|
- object_ty: CollabType,
|
|
|
- ) -> FutureResult<CollabObjectUpdateByOid, Error> {
|
|
|
- let server = self.get_provider(&self.provider_type.read());
|
|
|
- FutureResult::new(async move {
|
|
|
- server?
|
|
|
- .database_service()
|
|
|
- .batch_get_collab_updates(object_ids, object_ty)
|
|
|
- .await
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
- fn get_collab_snapshots(
|
|
|
- &self,
|
|
|
- object_id: &str,
|
|
|
- limit: usize,
|
|
|
- ) -> FutureResult<Vec<DatabaseSnapshot>, Error> {
|
|
|
- let server = self.get_provider(&self.provider_type.read());
|
|
|
- let database_id = object_id.to_string();
|
|
|
- FutureResult::new(async move {
|
|
|
- server?
|
|
|
- .database_service()
|
|
|
- .get_collab_snapshots(&database_id, limit)
|
|
|
- .await
|
|
|
- })
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-impl DocumentCloudService for AppFlowyServerProvider {
|
|
|
- fn get_document_updates(&self, document_id: &str) -> FutureResult<Vec<Vec<u8>>, Error> {
|
|
|
- let server = self.get_provider(&self.provider_type.read());
|
|
|
- let document_id = document_id.to_string();
|
|
|
- FutureResult::new(async move {
|
|
|
- server?
|
|
|
- .document_service()
|
|
|
- .get_document_updates(&document_id)
|
|
|
- .await
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
- fn get_document_snapshots(
|
|
|
- &self,
|
|
|
- document_id: &str,
|
|
|
- limit: usize,
|
|
|
- ) -> FutureResult<Vec<DocumentSnapshot>, Error> {
|
|
|
- let server = self.get_provider(&self.provider_type.read());
|
|
|
- let document_id = document_id.to_string();
|
|
|
- FutureResult::new(async move {
|
|
|
- server?
|
|
|
- .document_service()
|
|
|
- .get_document_snapshots(&document_id, limit)
|
|
|
- .await
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
- fn get_document_data(&self, document_id: &str) -> FutureResult<Option<DocumentData>, Error> {
|
|
|
- let server = self.get_provider(&self.provider_type.read());
|
|
|
- let document_id = document_id.to_string();
|
|
|
- FutureResult::new(async move {
|
|
|
- server?
|
|
|
- .document_service()
|
|
|
- .get_document_data(&document_id)
|
|
|
- .await
|
|
|
- })
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-impl CollabStorageProvider for AppFlowyServerProvider {
|
|
|
- fn storage_type(&self) -> CollabStorageType {
|
|
|
- self.provider_type().into()
|
|
|
- }
|
|
|
-
|
|
|
- fn get_storage(
|
|
|
- &self,
|
|
|
- collab_object: &CollabObject,
|
|
|
- storage_type: &CollabStorageType,
|
|
|
- ) -> Option<Arc<dyn RemoteCollabStorage>> {
|
|
|
- match storage_type {
|
|
|
- CollabStorageType::Local => None,
|
|
|
- CollabStorageType::AWS => None,
|
|
|
- CollabStorageType::Supabase => self
|
|
|
- .get_provider(&ServerProviderType::Supabase)
|
|
|
- .ok()
|
|
|
- .and_then(|provider| provider.collab_storage(collab_object)),
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- fn is_sync_enabled(&self) -> bool {
|
|
|
- *self.enable_sync.read()
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-impl From<AuthType> for ServerProviderType {
|
|
|
+impl From<AuthType> for ServerType {
|
|
|
fn from(auth_provider: AuthType) -> Self {
|
|
|
match auth_provider {
|
|
|
- AuthType::Local => ServerProviderType::Local,
|
|
|
- AuthType::SelfHosted => ServerProviderType::AppFlowyCloud,
|
|
|
- AuthType::Supabase => ServerProviderType::Supabase,
|
|
|
+ AuthType::Local => ServerType::Local,
|
|
|
+ AuthType::SelfHosted => ServerType::AppFlowyCloud,
|
|
|
+ AuthType::Supabase => ServerType::Supabase,
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-impl From<&AuthType> for ServerProviderType {
|
|
|
+impl From<&AuthType> for ServerType {
|
|
|
fn from(auth_provider: &AuthType) -> Self {
|
|
|
Self::from(auth_provider.clone())
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-pub fn current_server_provider(store_preferences: &Arc<StorePreferences>) -> ServerProviderType {
|
|
|
- match store_preferences.get_object::<ServerProviderType>(SERVER_PROVIDER_TYPE_KEY) {
|
|
|
- None => ServerProviderType::Local,
|
|
|
+pub fn current_server_provider(store_preferences: &Arc<StorePreferences>) -> ServerType {
|
|
|
+ match store_preferences.get_object::<ServerType>(SERVER_PROVIDER_TYPE_KEY) {
|
|
|
+ None => ServerType::Local,
|
|
|
Some(provider_type) => provider_type,
|
|
|
}
|
|
|
}
|