manager.rs 10 KB


  1. use crate::entities::view::ViewDataFormatPB;
  2. use crate::entities::{ViewLayoutTypePB, ViewPB};
  3. use crate::services::folder_editor::FolderRevisionMergeable;
  4. use crate::{
  5. dart_notification::{send_dart_notification, FolderNotification},
  6. entities::workspace::RepeatedWorkspacePB,
  7. errors::FlowyResult,
  8. event_map::{FolderCouldServiceV1, WorkspaceDatabase, WorkspaceUser},
  9. services::{
  10. folder_editor::FolderEditor, persistence::FolderPersistence, set_current_workspace, AppController,
  11. TrashController, ViewController, WorkspaceController,
  12. },
  13. };
  14. use bytes::Bytes;
  15. use flowy_document::editor::initial_read_me;
  16. use flowy_error::FlowyError;
  17. use flowy_revision::{RevisionManager, RevisionPersistence, RevisionPersistenceConfiguration, RevisionWebSocket};
  18. use folder_rev_model::user_default;
  19. use lazy_static::lazy_static;
  20. use lib_infra::future::FutureResult;
  21. use crate::services::clear_current_workspace;
  22. use crate::services::persistence::rev_sqlite::{
  23. SQLiteFolderRevisionPersistence, SQLiteFolderRevisionSnapshotPersistence,
  24. };
  25. use flowy_http_model::ws_data::ServerRevisionWSData;
  26. use flowy_sync::client_folder::FolderPad;
  27. use std::convert::TryFrom;
  28. use std::{collections::HashMap, fmt::Formatter, sync::Arc};
  29. use tokio::sync::RwLock as TokioRwLock;
  30. lazy_static! {
  31. static ref INIT_FOLDER_FLAG: TokioRwLock<HashMap<String, bool>> = TokioRwLock::new(HashMap::new());
  32. }
  33. const FOLDER_ID: &str = "folder";
  34. const FOLDER_ID_SPLIT: &str = ":";
  35. #[derive(Clone)]
  36. pub struct FolderId(String);
  37. impl FolderId {
  38. pub fn new(user_id: &str) -> Self {
  39. Self(format!("{}{}{}", user_id, FOLDER_ID_SPLIT, FOLDER_ID))
  40. }
  41. }
  42. impl std::fmt::Display for FolderId {
  43. fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
  44. f.write_str(FOLDER_ID)
  45. }
  46. }
  47. impl std::fmt::Debug for FolderId {
  48. fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
  49. f.write_str(FOLDER_ID)
  50. }
  51. }
  52. impl AsRef<str> for FolderId {
  53. fn as_ref(&self) -> &str {
  54. &self.0
  55. }
  56. }
  57. pub struct FolderManager {
  58. pub user: Arc<dyn WorkspaceUser>,
  59. pub(crate) cloud_service: Arc<dyn FolderCouldServiceV1>,
  60. pub(crate) persistence: Arc<FolderPersistence>,
  61. pub(crate) workspace_controller: Arc<WorkspaceController>,
  62. pub(crate) app_controller: Arc<AppController>,
  63. pub(crate) view_controller: Arc<ViewController>,
  64. pub(crate) trash_controller: Arc<TrashController>,
  65. web_socket: Arc<dyn RevisionWebSocket>,
  66. folder_editor: Arc<TokioRwLock<Option<Arc<FolderEditor>>>>,
  67. }
  68. impl FolderManager {
  69. pub async fn new(
  70. user: Arc<dyn WorkspaceUser>,
  71. cloud_service: Arc<dyn FolderCouldServiceV1>,
  72. database: Arc<dyn WorkspaceDatabase>,
  73. data_processors: ViewDataProcessorMap,
  74. web_socket: Arc<dyn RevisionWebSocket>,
  75. ) -> Self {
  76. if let Ok(user_id) = user.user_id() {
  77. // Reset the flag if the folder manager gets initialized, otherwise,
  78. // the folder_editor will not be initialized after flutter hot reload.
  79. INIT_FOLDER_FLAG.write().await.insert(user_id.to_owned(), false);
  80. }
  81. let folder_editor = Arc::new(TokioRwLock::new(None));
  82. let persistence = Arc::new(FolderPersistence::new(database.clone(), folder_editor.clone()));
  83. let trash_controller = Arc::new(TrashController::new(
  84. persistence.clone(),
  85. cloud_service.clone(),
  86. user.clone(),
  87. ));
  88. let view_controller = Arc::new(ViewController::new(
  89. user.clone(),
  90. persistence.clone(),
  91. cloud_service.clone(),
  92. trash_controller.clone(),
  93. data_processors,
  94. ));
  95. let app_controller = Arc::new(AppController::new(
  96. user.clone(),
  97. persistence.clone(),
  98. trash_controller.clone(),
  99. cloud_service.clone(),
  100. ));
  101. let workspace_controller = Arc::new(WorkspaceController::new(
  102. user.clone(),
  103. persistence.clone(),
  104. trash_controller.clone(),
  105. cloud_service.clone(),
  106. ));
  107. Self {
  108. user,
  109. cloud_service,
  110. persistence,
  111. workspace_controller,
  112. app_controller,
  113. view_controller,
  114. trash_controller,
  115. web_socket,
  116. folder_editor,
  117. }
  118. }
  119. // pub fn network_state_changed(&self, new_type: NetworkType) {
  120. // match new_type {
  121. // NetworkType::UnknownNetworkType => {},
  122. // NetworkType::Wifi => {},
  123. // NetworkType::Cell => {},
  124. // NetworkType::Ethernet => {},
  125. // }
  126. // }
  127. pub async fn did_receive_ws_data(&self, data: Bytes) {
  128. let result = ServerRevisionWSData::try_from(data);
  129. match result {
  130. Ok(data) => match self.folder_editor.read().await.clone() {
  131. None => {}
  132. Some(editor) => match editor.receive_ws_data(data).await {
  133. Ok(_) => {}
  134. Err(e) => tracing::error!("Folder receive data error: {:?}", e),
  135. },
  136. },
  137. Err(e) => {
  138. tracing::error!("Folder ws data parser failed: {:?}", e);
  139. }
  140. }
  141. }
  142. /// Called immediately after the application launched with the user sign in/sign up.
  143. #[tracing::instrument(level = "trace", skip(self), err)]
  144. pub async fn initialize(&self, user_id: &str, token: &str) -> FlowyResult<()> {
  145. let mut write_guard = INIT_FOLDER_FLAG.write().await;
  146. if let Some(is_init) = write_guard.get(user_id) {
  147. if *is_init {
  148. return Ok(());
  149. }
  150. }
  151. tracing::debug!("Initialize folder editor");
  152. let folder_id = FolderId::new(user_id);
  153. self.persistence.initialize(user_id, &folder_id).await?;
  154. let pool = self.persistence.db_pool()?;
  155. let object_id = folder_id.as_ref();
  156. let disk_cache = SQLiteFolderRevisionPersistence::new(user_id, pool.clone());
  157. let configuration = RevisionPersistenceConfiguration::new(200, false);
  158. let rev_persistence = RevisionPersistence::new(user_id, object_id, disk_cache, configuration);
  159. let rev_compactor = FolderRevisionMergeable();
  160. let snapshot_object_id = format!("folder:{}", object_id);
  161. let snapshot_persistence = SQLiteFolderRevisionSnapshotPersistence::new(&snapshot_object_id, pool);
  162. let rev_manager = RevisionManager::new(
  163. user_id,
  164. folder_id.as_ref(),
  165. rev_persistence,
  166. rev_compactor,
  167. snapshot_persistence,
  168. );
  169. let folder_editor = FolderEditor::new(user_id, &folder_id, token, rev_manager, self.web_socket.clone()).await?;
  170. *self.folder_editor.write().await = Some(Arc::new(folder_editor));
  171. self.app_controller.initialize()?;
  172. self.view_controller.initialize()?;
  173. write_guard.insert(user_id.to_owned(), true);
  174. Ok(())
  175. }
  176. pub async fn initialize_with_new_user(
  177. &self,
  178. user_id: &str,
  179. token: &str,
  180. view_data_format: ViewDataFormatPB,
  181. ) -> FlowyResult<()> {
  182. DefaultFolderBuilder::build(
  183. token,
  184. user_id,
  185. self.persistence.clone(),
  186. self.view_controller.clone(),
  187. || (view_data_format.clone(), Bytes::from(initial_read_me())),
  188. )
  189. .await?;
  190. self.initialize(user_id, token).await
  191. }
  192. /// Called when the current user logout
  193. ///
  194. pub async fn clear(&self, user_id: &str) {
  195. self.view_controller.clear_latest_view();
  196. clear_current_workspace(user_id);
  197. *self.folder_editor.write().await = None;
  198. }
  199. }
  200. struct DefaultFolderBuilder();
  201. impl DefaultFolderBuilder {
  202. async fn build<F: Fn() -> (ViewDataFormatPB, Bytes)>(
  203. token: &str,
  204. user_id: &str,
  205. persistence: Arc<FolderPersistence>,
  206. view_controller: Arc<ViewController>,
  207. create_view_fn: F,
  208. ) -> FlowyResult<()> {
  209. let workspace_rev = user_default::create_default_workspace();
  210. tracing::debug!("Create user:{} default workspace:{}", user_id, workspace_rev.id);
  211. set_current_workspace(user_id, &workspace_rev.id);
  212. for app in workspace_rev.apps.iter() {
  213. for (index, view) in app.belongings.iter().enumerate() {
  214. let (view_data_type, view_data) = create_view_fn();
  215. if index == 0 {
  216. let _ = view_controller.set_latest_view(&view.id);
  217. let layout_type = ViewLayoutTypePB::from(view.layout.clone());
  218. view_controller
  219. .create_view(&view.id, view_data_type, layout_type, view_data)
  220. .await?;
  221. }
  222. }
  223. }
  224. let folder = FolderPad::new(vec![workspace_rev.clone()], vec![])?;
  225. let folder_id = FolderId::new(user_id);
  226. persistence.save_folder(user_id, &folder_id, folder).await?;
  227. let repeated_workspace = RepeatedWorkspacePB {
  228. items: vec![workspace_rev.into()],
  229. };
  230. send_dart_notification(token, FolderNotification::UserCreateWorkspace)
  231. .payload(repeated_workspace)
  232. .send();
  233. Ok(())
  234. }
  235. }
  236. #[cfg(feature = "flowy_unit_test")]
  237. impl FolderManager {
  238. pub async fn folder_editor(&self) -> Arc<FolderEditor> {
  239. self.folder_editor.read().await.clone().unwrap()
  240. }
  241. }
  242. pub trait ViewDataProcessor {
  243. fn create_view(
  244. &self,
  245. user_id: &str,
  246. view_id: &str,
  247. layout: ViewLayoutTypePB,
  248. view_data: Bytes,
  249. ) -> FutureResult<(), FlowyError>;
  250. fn close_view(&self, view_id: &str) -> FutureResult<(), FlowyError>;
  251. fn get_view_data(&self, view: &ViewPB) -> FutureResult<Bytes, FlowyError>;
  252. fn create_default_view(
  253. &self,
  254. user_id: &str,
  255. view_id: &str,
  256. layout: ViewLayoutTypePB,
  257. data_format: ViewDataFormatPB,
  258. ) -> FutureResult<Bytes, FlowyError>;
  259. fn create_view_from_delta_data(
  260. &self,
  261. user_id: &str,
  262. view_id: &str,
  263. data: Vec<u8>,
  264. layout: ViewLayoutTypePB,
  265. ) -> FutureResult<Bytes, FlowyError>;
  266. fn data_types(&self) -> Vec<ViewDataFormatPB>;
  267. }
  268. pub type ViewDataProcessorMap = Arc<HashMap<ViewDataFormatPB, Arc<dyn ViewDataProcessor + Send + Sync>>>;