manager.rs 9.9 KB


  1. use crate::entities::view::ViewDataTypePB;
  2. use crate::entities::ViewLayoutTypePB;
  3. use crate::services::folder_editor::FolderRevisionCompactor;
  4. use crate::{
  5. dart_notification::{send_dart_notification, FolderNotification},
  6. entities::workspace::RepeatedWorkspacePB,
  7. errors::FlowyResult,
  8. event_map::{FolderCouldServiceV1, WorkspaceDatabase, WorkspaceUser},
  9. services::{
  10. folder_editor::FolderEditor, persistence::FolderPersistence, set_current_workspace, AppController,
  11. TrashController, ViewController, WorkspaceController,
  12. },
  13. };
  14. use bytes::Bytes;
  15. use flowy_error::FlowyError;
  16. use flowy_folder_data_model::user_default;
  17. use flowy_revision::disk::SQLiteTextBlockRevisionPersistence;
  18. use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionSnapshotPersistence};
  19. use flowy_sync::client_document::default::{initial_quill_delta_string, initial_read_me};
  20. use flowy_sync::{client_folder::FolderPad, entities::ws_data::ServerRevisionWSData};
  21. use lazy_static::lazy_static;
  22. use lib_infra::future::FutureResult;
  23. use std::{collections::HashMap, convert::TryInto, fmt::Formatter, sync::Arc};
  24. use tokio::sync::RwLock as TokioRwLock;
  25. lazy_static! {
  26. static ref INIT_FOLDER_FLAG: TokioRwLock<HashMap<String, bool>> = TokioRwLock::new(HashMap::new());
  27. }
  28. const FOLDER_ID: &str = "folder";
  29. const FOLDER_ID_SPLIT: &str = ":";
  30. #[derive(Clone)]
  31. pub struct FolderId(String);
  32. impl FolderId {
  33. pub fn new(user_id: &str) -> Self {
  34. Self(format!("{}{}{}", user_id, FOLDER_ID_SPLIT, FOLDER_ID))
  35. }
  36. }
  37. impl std::fmt::Display for FolderId {
  38. fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
  39. f.write_str(FOLDER_ID)
  40. }
  41. }
  42. impl std::fmt::Debug for FolderId {
  43. fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
  44. f.write_str(FOLDER_ID)
  45. }
  46. }
  47. impl AsRef<str> for FolderId {
  48. fn as_ref(&self) -> &str {
  49. &self.0
  50. }
  51. }
  52. pub struct FolderManager {
  53. pub user: Arc<dyn WorkspaceUser>,
  54. pub(crate) cloud_service: Arc<dyn FolderCouldServiceV1>,
  55. pub(crate) persistence: Arc<FolderPersistence>,
  56. pub(crate) workspace_controller: Arc<WorkspaceController>,
  57. pub(crate) app_controller: Arc<AppController>,
  58. pub(crate) view_controller: Arc<ViewController>,
  59. pub(crate) trash_controller: Arc<TrashController>,
  60. web_socket: Arc<dyn RevisionWebSocket>,
  61. folder_editor: Arc<TokioRwLock<Option<Arc<FolderEditor>>>>,
  62. data_processors: ViewDataProcessorMap,
  63. }
  64. impl FolderManager {
  65. pub async fn new(
  66. user: Arc<dyn WorkspaceUser>,
  67. cloud_service: Arc<dyn FolderCouldServiceV1>,
  68. database: Arc<dyn WorkspaceDatabase>,
  69. data_processors: ViewDataProcessorMap,
  70. web_socket: Arc<dyn RevisionWebSocket>,
  71. ) -> Self {
  72. if let Ok(user_id) = user.user_id() {
  73. // Reset the flag if the folder manager gets initialized, otherwise,
  74. // the folder_editor will not be initialized after flutter hot reload.
  75. INIT_FOLDER_FLAG.write().await.insert(user_id.to_owned(), false);
  76. }
  77. let folder_editor = Arc::new(TokioRwLock::new(None));
  78. let persistence = Arc::new(FolderPersistence::new(database.clone(), folder_editor.clone()));
  79. let trash_controller = Arc::new(TrashController::new(
  80. persistence.clone(),
  81. cloud_service.clone(),
  82. user.clone(),
  83. ));
  84. let view_controller = Arc::new(ViewController::new(
  85. user.clone(),
  86. persistence.clone(),
  87. cloud_service.clone(),
  88. trash_controller.clone(),
  89. data_processors.clone(),
  90. ));
  91. let app_controller = Arc::new(AppController::new(
  92. user.clone(),
  93. persistence.clone(),
  94. trash_controller.clone(),
  95. cloud_service.clone(),
  96. ));
  97. let workspace_controller = Arc::new(WorkspaceController::new(
  98. user.clone(),
  99. persistence.clone(),
  100. trash_controller.clone(),
  101. cloud_service.clone(),
  102. ));
  103. Self {
  104. user,
  105. cloud_service,
  106. persistence,
  107. workspace_controller,
  108. app_controller,
  109. view_controller,
  110. trash_controller,
  111. web_socket,
  112. folder_editor,
  113. data_processors,
  114. }
  115. }
  116. // pub fn network_state_changed(&self, new_type: NetworkType) {
  117. // match new_type {
  118. // NetworkType::UnknownNetworkType => {},
  119. // NetworkType::Wifi => {},
  120. // NetworkType::Cell => {},
  121. // NetworkType::Ethernet => {},
  122. // }
  123. // }
  124. pub async fn did_receive_ws_data(&self, data: Bytes) {
  125. let result: Result<ServerRevisionWSData, protobuf::ProtobufError> = data.try_into();
  126. match result {
  127. Ok(data) => match self.folder_editor.read().await.clone() {
  128. None => {}
  129. Some(editor) => match editor.receive_ws_data(data).await {
  130. Ok(_) => {}
  131. Err(e) => tracing::error!("Folder receive data error: {:?}", e),
  132. },
  133. },
  134. Err(e) => {
  135. tracing::error!("Folder ws data parser failed: {:?}", e);
  136. }
  137. }
  138. }
  139. #[tracing::instrument(level = "trace", skip(self), err)]
  140. pub async fn initialize(&self, user_id: &str, token: &str) -> FlowyResult<()> {
  141. let mut write_guard = INIT_FOLDER_FLAG.write().await;
  142. if let Some(is_init) = write_guard.get(user_id) {
  143. if *is_init {
  144. return Ok(());
  145. }
  146. }
  147. tracing::debug!("Initialize folder editor");
  148. let folder_id = FolderId::new(user_id);
  149. let _ = self.persistence.initialize(user_id, &folder_id).await?;
  150. let pool = self.persistence.db_pool()?;
  151. let object_id = folder_id.as_ref();
  152. let disk_cache = SQLiteTextBlockRevisionPersistence::new(user_id, pool.clone());
  153. let rev_persistence = RevisionPersistence::new(user_id, object_id, disk_cache);
  154. let rev_compactor = FolderRevisionCompactor();
  155. // let history_persistence = SQLiteRevisionHistoryPersistence::new(object_id, pool.clone());
  156. let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(object_id, pool);
  157. let rev_manager = RevisionManager::new(
  158. user_id,
  159. folder_id.as_ref(),
  160. rev_persistence,
  161. rev_compactor,
  162. // history_persistence,
  163. snapshot_persistence,
  164. );
  165. let folder_editor = FolderEditor::new(user_id, &folder_id, token, rev_manager, self.web_socket.clone()).await?;
  166. *self.folder_editor.write().await = Some(Arc::new(folder_editor));
  167. let _ = self.app_controller.initialize()?;
  168. let _ = self.view_controller.initialize()?;
  169. self.data_processors.iter().for_each(|(_, processor)| {
  170. processor.initialize();
  171. });
  172. write_guard.insert(user_id.to_owned(), true);
  173. Ok(())
  174. }
  175. pub async fn initialize_with_new_user(&self, user_id: &str, token: &str) -> FlowyResult<()> {
  176. DefaultFolderBuilder::build(token, user_id, self.persistence.clone(), self.view_controller.clone()).await?;
  177. self.initialize(user_id, token).await
  178. }
  179. pub async fn clear(&self) {
  180. *self.folder_editor.write().await = None;
  181. }
  182. }
  183. struct DefaultFolderBuilder();
  184. impl DefaultFolderBuilder {
  185. async fn build(
  186. token: &str,
  187. user_id: &str,
  188. persistence: Arc<FolderPersistence>,
  189. view_controller: Arc<ViewController>,
  190. ) -> FlowyResult<()> {
  191. log::debug!("Create user default workspace");
  192. let workspace_rev = user_default::create_default_workspace();
  193. set_current_workspace(&workspace_rev.id);
  194. for app in workspace_rev.apps.iter() {
  195. for (index, view) in app.belongings.iter().enumerate() {
  196. let view_data = if index == 0 {
  197. initial_read_me().json_str()
  198. } else {
  199. initial_quill_delta_string()
  200. };
  201. let _ = view_controller.set_latest_view(&view.id);
  202. let layout_type = ViewLayoutTypePB::from(view.layout.clone());
  203. let _ = view_controller
  204. .create_view(&view.id, ViewDataTypePB::Text, layout_type, Bytes::from(view_data))
  205. .await?;
  206. }
  207. }
  208. let folder = FolderPad::new(vec![workspace_rev.clone()], vec![])?;
  209. let folder_id = FolderId::new(user_id);
  210. let _ = persistence.save_folder(user_id, &folder_id, folder).await?;
  211. let repeated_workspace = RepeatedWorkspacePB {
  212. items: vec![workspace_rev.into()],
  213. };
  214. send_dart_notification(token, FolderNotification::UserCreateWorkspace)
  215. .payload(repeated_workspace)
  216. .send();
  217. Ok(())
  218. }
  219. }
  220. #[cfg(feature = "flowy_unit_test")]
  221. impl FolderManager {
  222. pub async fn folder_editor(&self) -> Arc<FolderEditor> {
  223. self.folder_editor.read().await.clone().unwrap()
  224. }
  225. }
  226. pub trait ViewDataProcessor {
  227. fn initialize(&self) -> FutureResult<(), FlowyError>;
  228. fn create_container(
  229. &self,
  230. user_id: &str,
  231. view_id: &str,
  232. layout: ViewLayoutTypePB,
  233. delta_data: Bytes,
  234. ) -> FutureResult<(), FlowyError>;
  235. fn close_container(&self, view_id: &str) -> FutureResult<(), FlowyError>;
  236. fn get_delta_data(&self, view_id: &str) -> FutureResult<Bytes, FlowyError>;
  237. fn create_default_view(
  238. &self,
  239. user_id: &str,
  240. view_id: &str,
  241. layout: ViewLayoutTypePB,
  242. ) -> FutureResult<Bytes, FlowyError>;
  243. fn create_view_from_delta_data(
  244. &self,
  245. user_id: &str,
  246. view_id: &str,
  247. data: Vec<u8>,
  248. layout: ViewLayoutTypePB,
  249. ) -> FutureResult<Bytes, FlowyError>;
  250. fn data_type(&self) -> ViewDataTypePB;
  251. }
  252. pub type ViewDataProcessorMap = Arc<HashMap<ViewDataTypePB, Arc<dyn ViewDataProcessor + Send + Sync>>>;