manager.rs 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. use crate::{
  2. dart_notification::{send_dart_notification, FolderNotification},
  3. entities::workspace::RepeatedWorkspace,
  4. errors::FlowyResult,
  5. event_map::{FolderCouldServiceV1, WorkspaceDatabase, WorkspaceUser},
  6. services::{
  7. folder_editor::FolderEditor, persistence::FolderPersistence, set_current_workspace, AppController,
  8. TrashController, ViewController, WorkspaceController,
  9. },
  10. };
  11. use bytes::Bytes;
  12. use flowy_error::FlowyError;
  13. use flowy_folder_data_model::entities::view::ViewDataType;
  14. use flowy_folder_data_model::user_default;
  15. use flowy_revision::disk::SQLiteTextBlockRevisionPersistence;
  16. use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket};
  17. use flowy_sync::client_document::default::{initial_quill_delta_string, initial_read_me};
  18. use flowy_sync::{client_folder::FolderPad, entities::ws_data::ServerRevisionWSData};
  19. use lazy_static::lazy_static;
  20. use lib_infra::future::FutureResult;
  21. use std::{collections::HashMap, convert::TryInto, fmt::Formatter, sync::Arc};
  22. use tokio::sync::RwLock as TokioRwLock;
  23. lazy_static! {
  24. static ref INIT_FOLDER_FLAG: TokioRwLock<HashMap<String, bool>> = TokioRwLock::new(HashMap::new());
  25. }
  26. const FOLDER_ID: &str = "folder";
  27. const FOLDER_ID_SPLIT: &str = ":";
  28. #[derive(Clone)]
  29. pub struct FolderId(String);
  30. impl FolderId {
  31. pub fn new(user_id: &str) -> Self {
  32. Self(format!("{}{}{}", user_id, FOLDER_ID_SPLIT, FOLDER_ID))
  33. }
  34. }
  35. impl std::fmt::Display for FolderId {
  36. fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
  37. f.write_str(FOLDER_ID)
  38. }
  39. }
  40. impl std::fmt::Debug for FolderId {
  41. fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
  42. f.write_str(FOLDER_ID)
  43. }
  44. }
  45. impl AsRef<str> for FolderId {
  46. fn as_ref(&self) -> &str {
  47. &self.0
  48. }
  49. }
  50. pub struct FolderManager {
  51. pub user: Arc<dyn WorkspaceUser>,
  52. pub(crate) cloud_service: Arc<dyn FolderCouldServiceV1>,
  53. pub(crate) persistence: Arc<FolderPersistence>,
  54. pub(crate) workspace_controller: Arc<WorkspaceController>,
  55. pub(crate) app_controller: Arc<AppController>,
  56. pub(crate) view_controller: Arc<ViewController>,
  57. pub(crate) trash_controller: Arc<TrashController>,
  58. web_socket: Arc<dyn RevisionWebSocket>,
  59. folder_editor: Arc<TokioRwLock<Option<Arc<FolderEditor>>>>,
  60. data_processors: ViewDataProcessorMap,
  61. }
  62. impl FolderManager {
  63. pub async fn new(
  64. user: Arc<dyn WorkspaceUser>,
  65. cloud_service: Arc<dyn FolderCouldServiceV1>,
  66. database: Arc<dyn WorkspaceDatabase>,
  67. data_processors: ViewDataProcessorMap,
  68. web_socket: Arc<dyn RevisionWebSocket>,
  69. ) -> Self {
  70. if let Ok(user_id) = user.user_id() {
  71. // Reset the flag if the folder manager gets initialized, otherwise,
  72. // the folder_editor will not be initialized after flutter hot reload.
  73. INIT_FOLDER_FLAG.write().await.insert(user_id.to_owned(), false);
  74. }
  75. let folder_editor = Arc::new(TokioRwLock::new(None));
  76. let persistence = Arc::new(FolderPersistence::new(database.clone(), folder_editor.clone()));
  77. let trash_controller = Arc::new(TrashController::new(
  78. persistence.clone(),
  79. cloud_service.clone(),
  80. user.clone(),
  81. ));
  82. let view_controller = Arc::new(ViewController::new(
  83. user.clone(),
  84. persistence.clone(),
  85. cloud_service.clone(),
  86. trash_controller.clone(),
  87. data_processors.clone(),
  88. ));
  89. let app_controller = Arc::new(AppController::new(
  90. user.clone(),
  91. persistence.clone(),
  92. trash_controller.clone(),
  93. cloud_service.clone(),
  94. ));
  95. let workspace_controller = Arc::new(WorkspaceController::new(
  96. user.clone(),
  97. persistence.clone(),
  98. trash_controller.clone(),
  99. cloud_service.clone(),
  100. ));
  101. Self {
  102. user,
  103. cloud_service,
  104. persistence,
  105. workspace_controller,
  106. app_controller,
  107. view_controller,
  108. trash_controller,
  109. web_socket,
  110. folder_editor,
  111. data_processors,
  112. }
  113. }
  114. // pub fn network_state_changed(&self, new_type: NetworkType) {
  115. // match new_type {
  116. // NetworkType::UnknownNetworkType => {},
  117. // NetworkType::Wifi => {},
  118. // NetworkType::Cell => {},
  119. // NetworkType::Ethernet => {},
  120. // }
  121. // }
  122. pub async fn did_receive_ws_data(&self, data: Bytes) {
  123. let result: Result<ServerRevisionWSData, protobuf::ProtobufError> = data.try_into();
  124. match result {
  125. Ok(data) => match self.folder_editor.read().await.clone() {
  126. None => {}
  127. Some(editor) => match editor.receive_ws_data(data).await {
  128. Ok(_) => {}
  129. Err(e) => tracing::error!("Folder receive data error: {:?}", e),
  130. },
  131. },
  132. Err(e) => {
  133. tracing::error!("Folder ws data parser failed: {:?}", e);
  134. }
  135. }
  136. }
  137. #[tracing::instrument(level = "trace", skip(self), err)]
  138. pub async fn initialize(&self, user_id: &str, token: &str) -> FlowyResult<()> {
  139. let mut write_guard = INIT_FOLDER_FLAG.write().await;
  140. if let Some(is_init) = write_guard.get(user_id) {
  141. if *is_init {
  142. return Ok(());
  143. }
  144. }
  145. tracing::debug!("Initialize folder editor");
  146. let folder_id = FolderId::new(user_id);
  147. let _ = self.persistence.initialize(user_id, &folder_id).await?;
  148. let pool = self.persistence.db_pool()?;
  149. let disk_cache = Arc::new(SQLiteTextBlockRevisionPersistence::new(user_id, pool));
  150. let rev_persistence = Arc::new(RevisionPersistence::new(user_id, folder_id.as_ref(), disk_cache));
  151. let rev_manager = RevisionManager::new(user_id, folder_id.as_ref(), rev_persistence);
  152. let folder_editor = FolderEditor::new(user_id, &folder_id, token, rev_manager, self.web_socket.clone()).await?;
  153. *self.folder_editor.write().await = Some(Arc::new(folder_editor));
  154. let _ = self.app_controller.initialize()?;
  155. let _ = self.view_controller.initialize()?;
  156. self.data_processors.iter().for_each(|(_, processor)| {
  157. processor.initialize();
  158. });
  159. write_guard.insert(user_id.to_owned(), true);
  160. Ok(())
  161. }
  162. pub async fn initialize_with_new_user(&self, user_id: &str, token: &str) -> FlowyResult<()> {
  163. DefaultFolderBuilder::build(token, user_id, self.persistence.clone(), self.view_controller.clone()).await?;
  164. self.initialize(user_id, token).await
  165. }
  166. pub async fn clear(&self) {
  167. *self.folder_editor.write().await = None;
  168. }
  169. }
  170. struct DefaultFolderBuilder();
  171. impl DefaultFolderBuilder {
  172. async fn build(
  173. token: &str,
  174. user_id: &str,
  175. persistence: Arc<FolderPersistence>,
  176. view_controller: Arc<ViewController>,
  177. ) -> FlowyResult<()> {
  178. log::debug!("Create user default workspace");
  179. let workspace_rev = user_default::create_default_workspace();
  180. set_current_workspace(&workspace_rev.id);
  181. for app in workspace_rev.apps.iter() {
  182. for (index, view) in app.belongings.iter().enumerate() {
  183. let view_data = if index == 0 {
  184. initial_read_me().to_delta_str()
  185. } else {
  186. initial_quill_delta_string()
  187. };
  188. let _ = view_controller.set_latest_view(&view.id);
  189. let _ = view_controller
  190. .create_view(&view.id, ViewDataType::TextBlock, Bytes::from(view_data))
  191. .await?;
  192. }
  193. }
  194. let folder = FolderPad::new(vec![workspace_rev.clone()], vec![])?;
  195. let folder_id = FolderId::new(user_id);
  196. let _ = persistence.save_folder(user_id, &folder_id, folder).await?;
  197. let repeated_workspace = RepeatedWorkspace {
  198. items: vec![workspace_rev.into()],
  199. };
  200. send_dart_notification(token, FolderNotification::UserCreateWorkspace)
  201. .payload(repeated_workspace)
  202. .send();
  203. Ok(())
  204. }
  205. }
  206. #[cfg(feature = "flowy_unit_test")]
  207. impl FolderManager {
  208. pub async fn folder_editor(&self) -> Arc<FolderEditor> {
  209. self.folder_editor.read().await.clone().unwrap()
  210. }
  211. }
  212. pub trait ViewDataProcessor {
  213. fn initialize(&self) -> FutureResult<(), FlowyError>;
  214. fn create_container(&self, user_id: &str, view_id: &str, delta_data: Bytes) -> FutureResult<(), FlowyError>;
  215. fn delete_container(&self, view_id: &str) -> FutureResult<(), FlowyError>;
  216. fn close_container(&self, view_id: &str) -> FutureResult<(), FlowyError>;
  217. fn get_delta_data(&self, view_id: &str) -> FutureResult<Bytes, FlowyError>;
  218. fn create_default_view(&self, user_id: &str, view_id: &str) -> FutureResult<Bytes, FlowyError>;
  219. fn create_view_from_delta_data(
  220. &self,
  221. user_id: &str,
  222. view_id: &str,
  223. data: Vec<u8>,
  224. ) -> FutureResult<Bytes, FlowyError>;
  225. fn data_type(&self) -> ViewDataType;
  226. }
  227. pub type ViewDataProcessorMap = Arc<HashMap<ViewDataType, Arc<dyn ViewDataProcessor + Send + Sync>>>;