manager.rs 19 KB


  1. use std::collections::{HashMap, HashSet};
  2. use std::ops::Deref;
  3. use std::sync::Arc;
  4. use collab::preclude::CollabBuilder;
  5. use collab_folder::core::{
  6. Folder as InnerFolder, FolderContext, TrashChange, TrashChangeReceiver, TrashInfo, TrashRecord,
  7. View, ViewChange, ViewChangeReceiver, ViewLayout, Workspace,
  8. };
  9. use collab_persistence::kv::rocks_kv::RocksCollabDB;
  10. use collab_plugins::disk_plugin::rocksdb::RocksdbDiskPlugin;
  11. use parking_lot::Mutex;
  12. use tracing::{event, Level};
  13. use flowy_error::{FlowyError, FlowyResult};
  14. use lib_infra::util::timestamp;
  15. use crate::entities::{
  16. CreateViewParams, CreateWorkspaceParams, RepeatedTrashPB, RepeatedViewPB, RepeatedWorkspacePB,
  17. UpdateViewParams, ViewPB,
  18. };
  19. use crate::notification::{
  20. send_notification, send_workspace_notification, send_workspace_setting_notification,
  21. FolderNotification,
  22. };
  23. use crate::user_default::{gen_workspace_id, DefaultFolderBuilder};
  24. use crate::view_ext::{
  25. gen_view_id, view_from_create_view_params, ViewDataProcessor, ViewDataProcessorMap,
  26. };
  27. pub trait FolderUser: Send + Sync {
  28. fn user_id(&self) -> Result<i64, FlowyError>;
  29. fn token(&self) -> Result<String, FlowyError>;
  30. fn kv_db(&self) -> Result<Arc<RocksCollabDB>, FlowyError>;
  31. }
  32. pub struct Folder2Manager {
  33. folder: Folder,
  34. user: Arc<dyn FolderUser>,
  35. view_processors: ViewDataProcessorMap,
  36. }
  37. unsafe impl Send for Folder2Manager {}
  38. unsafe impl Sync for Folder2Manager {}
  39. impl Folder2Manager {
  40. pub async fn new(
  41. user: Arc<dyn FolderUser>,
  42. view_processors: ViewDataProcessorMap,
  43. ) -> FlowyResult<Self> {
  44. // let folder = make_user_folder(user.clone())?;
  45. let folder = Folder::default();
  46. let manager = Self {
  47. user,
  48. folder,
  49. view_processors,
  50. };
  51. Ok(manager)
  52. }
  53. pub async fn get_current_workspace(&self) -> FlowyResult<Workspace> {
  54. match self.with_folder(None, |folder| folder.get_current_workspace()) {
  55. None => Err(FlowyError::record_not_found().context("Can not find the workspace")),
  56. Some(workspace) => Ok(workspace),
  57. }
  58. }
  59. pub async fn get_current_workspace_views(&self) -> FlowyResult<Vec<ViewPB>> {
  60. let workspace_id = self
  61. .folder
  62. .lock()
  63. .as_ref()
  64. .map(|folder| folder.get_current_workspace_id());
  65. if let Some(Some(workspace_id)) = workspace_id {
  66. self.get_workspace_views(&workspace_id).await
  67. } else {
  68. Ok(vec![])
  69. }
  70. }
  71. pub async fn get_workspace_views(&self, workspace_id: &str) -> FlowyResult<Vec<ViewPB>> {
  72. let views = self.with_folder(vec![], |folder| {
  73. get_workspace_view_pbs(workspace_id, folder)
  74. });
  75. Ok(views)
  76. }
  77. /// Called immediately after the application launched fi the user already sign in/sign up.
  78. #[tracing::instrument(level = "trace", skip(self), err)]
  79. pub async fn initialize(&self, user_id: i64) -> FlowyResult<()> {
  80. if let Ok(uid) = self.user.user_id() {
  81. let folder_id = FolderId::new(uid);
  82. if let Ok(kv_db) = self.user.kv_db() {
  83. let mut collab = CollabBuilder::new(uid, folder_id).build();
  84. let disk_plugin = Arc::new(
  85. RocksdbDiskPlugin::new(uid, kv_db).map_err(|err| FlowyError::internal().context(err))?,
  86. );
  87. collab.add_plugin(disk_plugin);
  88. collab.initial();
  89. let (view_tx, view_rx) = tokio::sync::broadcast::channel(100);
  90. let (trash_tx, trash_rx) = tokio::sync::broadcast::channel(100);
  91. let folder_context = FolderContext {
  92. view_change_tx: Some(view_tx),
  93. trash_change_tx: Some(trash_tx),
  94. };
  95. *self.folder.lock() = Some(InnerFolder::get_or_create(collab, folder_context));
  96. listen_on_trash_change(trash_rx, self.folder.clone());
  97. listen_on_view_change(view_rx, self.folder.clone());
  98. }
  99. }
  100. Ok(())
  101. }
  102. /// Called after the user sign up / sign in
  103. pub async fn initialize_with_new_user(&self, user_id: i64, token: &str) -> FlowyResult<()> {
  104. self.initialize(user_id).await?;
  105. let (folder_data, workspace_pb) =
  106. DefaultFolderBuilder::build(self.user.user_id()?, &self.view_processors).await;
  107. self.with_folder((), |folder| {
  108. folder.create_with_data(folder_data);
  109. });
  110. send_notification(token, FolderNotification::DidCreateWorkspace)
  111. .payload(RepeatedWorkspacePB {
  112. items: vec![workspace_pb],
  113. })
  114. .send();
  115. Ok(())
  116. }
  117. /// Called when the current user logout
  118. ///
  119. pub async fn clear(&self, _user_id: i64) {}
  120. pub async fn create_workspace(&self, params: CreateWorkspaceParams) -> FlowyResult<Workspace> {
  121. let workspace = Workspace {
  122. id: gen_workspace_id(),
  123. name: params.name,
  124. belongings: Default::default(),
  125. created_at: timestamp(),
  126. };
  127. self.with_folder((), |folder| {
  128. folder.workspaces.create_workspace(workspace.clone());
  129. folder.set_current_workspace(&workspace.id);
  130. });
  131. let repeated_workspace = RepeatedWorkspacePB {
  132. items: vec![workspace.clone().into()],
  133. };
  134. send_workspace_notification(FolderNotification::DidCreateWorkspace, repeated_workspace);
  135. Ok(workspace)
  136. }
  137. pub async fn open_workspace(&self, workspace_id: &str) -> FlowyResult<Workspace> {
  138. self.with_folder(Err(FlowyError::internal()), |folder| {
  139. let workspace = folder
  140. .workspaces
  141. .get_workspace(workspace_id)
  142. .ok_or_else(|| {
  143. FlowyError::record_not_found().context("Can't open not existing workspace")
  144. })?;
  145. folder.set_current_workspace(workspace_id);
  146. Ok::<Workspace, FlowyError>(workspace)
  147. })
  148. }
  149. pub async fn get_workspace(&self, workspace_id: &str) -> Option<Workspace> {
  150. self.with_folder(None, |folder| folder.workspaces.get_workspace(workspace_id))
  151. }
  152. fn with_folder<F, Output>(&self, default_value: Output, f: F) -> Output
  153. where
  154. F: FnOnce(&InnerFolder) -> Output,
  155. {
  156. let folder = self.folder.lock();
  157. match &*folder {
  158. None => default_value,
  159. Some(folder) => f(folder),
  160. }
  161. }
  162. pub async fn get_all_workspaces(&self) -> Vec<Workspace> {
  163. self.with_folder(vec![], |folder| folder.workspaces.get_all_workspaces())
  164. }
  165. pub async fn create_view_with_params(&self, params: CreateViewParams) -> FlowyResult<View> {
  166. let view_layout: ViewLayout = params.layout.clone().into();
  167. let processor = self.get_data_processor(&view_layout)?;
  168. let user_id = self.user.user_id()?;
  169. let ext = params.ext.clone();
  170. match params.initial_data.is_empty() {
  171. true => {
  172. tracing::trace!("Create view with build-in data");
  173. processor
  174. .create_view_with_build_in_data(
  175. user_id,
  176. &params.view_id,
  177. &params.name,
  178. view_layout.clone(),
  179. ext,
  180. )
  181. .await?;
  182. },
  183. false => {
  184. tracing::trace!("Create view with view data");
  185. processor
  186. .create_view_with_custom_data(
  187. user_id,
  188. &params.view_id,
  189. &params.name,
  190. params.initial_data.clone(),
  191. view_layout.clone(),
  192. ext,
  193. )
  194. .await?;
  195. },
  196. }
  197. let view = view_from_create_view_params(params, view_layout);
  198. self.with_folder((), |folder| {
  199. folder.insert_view(view.clone());
  200. });
  201. notify_parent_view_did_change(self.folder.clone(), vec![view.bid.clone()]);
  202. Ok(view)
  203. }
  204. #[tracing::instrument(level = "debug", skip(self), err)]
  205. pub(crate) async fn close_view(&self, view_id: &str) -> Result<(), FlowyError> {
  206. let view = self
  207. .with_folder(None, |folder| folder.views.get_view(view_id))
  208. .ok_or_else(|| {
  209. FlowyError::record_not_found().context("Can't find the view when closing the view")
  210. })?;
  211. let processor = self.get_data_processor(&view.layout)?;
  212. processor.close_view(view_id).await?;
  213. Ok(())
  214. }
  215. pub async fn create_view_data(
  216. &self,
  217. view_id: &str,
  218. name: &str,
  219. view_layout: ViewLayout,
  220. data: Vec<u8>,
  221. ) -> FlowyResult<()> {
  222. let user_id = self.user.user_id()?;
  223. let processor = self.get_data_processor(&view_layout)?;
  224. processor
  225. .create_view_with_custom_data(
  226. user_id,
  227. view_id,
  228. name,
  229. data,
  230. view_layout,
  231. HashMap::default(),
  232. )
  233. .await?;
  234. Ok(())
  235. }
  236. #[tracing::instrument(level = "debug", skip(self, view_id), err)]
  237. pub async fn get_view(&self, view_id: &str) -> FlowyResult<ViewPB> {
  238. let view_id = view_id.to_string();
  239. let folder = self.folder.lock();
  240. let folder = folder.as_ref().ok_or_else(folder_not_init_error)?;
  241. let trash_ids = folder
  242. .trash
  243. .get_all_trash()
  244. .into_iter()
  245. .map(|trash| trash.id)
  246. .collect::<Vec<String>>();
  247. if trash_ids.contains(&view_id) {
  248. return Err(FlowyError::record_not_found());
  249. }
  250. match folder.views.get_view(&view_id) {
  251. None => Err(FlowyError::record_not_found()),
  252. Some(mut view) => {
  253. view.belongings.retain(|b| !trash_ids.contains(&b.id));
  254. let mut view_pb: ViewPB = view.into();
  255. view_pb.belongings = folder
  256. .views
  257. .get_views_belong_to(&view_pb.id)
  258. .into_iter()
  259. .filter(|view| !trash_ids.contains(&view.id))
  260. .map(|view| view.into())
  261. .collect::<Vec<ViewPB>>();
  262. Ok(view_pb)
  263. },
  264. }
  265. }
  266. #[tracing::instrument(level = "debug", skip(self, view_id), err)]
  267. pub async fn delete_view(&self, view_id: &str) -> FlowyResult<()> {
  268. self.with_folder((), |folder| folder.views.delete_views(vec![view_id]));
  269. Ok(())
  270. }
  271. #[tracing::instrument(level = "debug", skip(self), err)]
  272. pub async fn move_view_to_trash(&self, view_id: &str) -> FlowyResult<()> {
  273. self.with_folder((), |folder| {
  274. folder.trash.add_trash(vec![TrashRecord {
  275. id: view_id.to_string(),
  276. created_at: timestamp(),
  277. }]);
  278. if let Some(view) = folder.get_current_view() {
  279. if view == view_id {
  280. folder.set_current_view("");
  281. }
  282. }
  283. });
  284. Ok(())
  285. }
  286. #[tracing::instrument(level = "debug", skip(self), err)]
  287. pub async fn move_view(&self, view_id: &str, from: usize, to: usize) -> FlowyResult<()> {
  288. let view = self.with_folder(None, |folder| {
  289. folder.move_view(view_id, from as u32, to as u32)
  290. });
  291. match view {
  292. None => tracing::error!("Couldn't find the view. It should not be empty"),
  293. Some(view) => {
  294. notify_parent_view_did_change(self.folder.clone(), vec![view.bid]);
  295. },
  296. }
  297. Ok(())
  298. }
  299. #[tracing::instrument(level = "debug", skip(self, bid), err)]
  300. pub async fn get_views_belong_to(&self, bid: &str) -> FlowyResult<Vec<View>> {
  301. let views = self.with_folder(vec![], |folder| folder.views.get_views_belong_to(bid));
  302. Ok(views)
  303. }
  304. #[tracing::instrument(level = "trace", skip(self), err)]
  305. pub async fn update_view_with_params(&self, params: UpdateViewParams) -> FlowyResult<View> {
  306. let view = self
  307. .folder
  308. .lock()
  309. .as_ref()
  310. .ok_or_else(folder_not_init_error)?
  311. .views
  312. .update_view(&params.view_id, |update| {
  313. update
  314. .set_name_if_not_none(params.name)
  315. .set_desc_if_not_none(params.desc)
  316. .done()
  317. });
  318. match view {
  319. None => Err(FlowyError::record_not_found()),
  320. Some(view) => {
  321. let view_pb: ViewPB = view.clone().into();
  322. send_notification(&view.id, FolderNotification::DidUpdateView)
  323. .payload(view_pb)
  324. .send();
  325. notify_parent_view_did_change(self.folder.clone(), vec![view.bid.clone()]);
  326. Ok(view)
  327. },
  328. }
  329. }
  330. #[tracing::instrument(level = "debug", skip(self), err)]
  331. pub(crate) async fn duplicate_view(&self, view_id: &str) -> Result<(), FlowyError> {
  332. let view = self
  333. .with_folder(None, |folder| folder.views.get_view(view_id))
  334. .ok_or_else(|| FlowyError::record_not_found().context("Can't duplicate the view"))?;
  335. let processor = self.get_data_processor(&view.layout)?;
  336. let view_data = processor.get_view_data(&view.id).await?;
  337. let mut ext = HashMap::new();
  338. if let Some(database_id) = view.database_id {
  339. ext.insert("database_id".to_string(), database_id);
  340. }
  341. let duplicate_params = CreateViewParams {
  342. belong_to_id: view.bid.clone(),
  343. name: format!("{} (copy)", &view.name),
  344. desc: view.desc,
  345. layout: view.layout.into(),
  346. initial_data: view_data.to_vec(),
  347. view_id: gen_view_id(),
  348. ext,
  349. };
  350. let _ = self.create_view_with_params(duplicate_params).await?;
  351. Ok(())
  352. }
  353. #[tracing::instrument(level = "trace", skip(self), err)]
  354. pub(crate) async fn set_current_view(&self, view_id: &str) -> Result<(), FlowyError> {
  355. let folder = self.folder.lock();
  356. let folder = folder.as_ref().ok_or_else(folder_not_init_error)?;
  357. folder.set_current_view(view_id);
  358. let workspace = folder.get_current_workspace();
  359. let view = folder
  360. .get_current_view()
  361. .and_then(|view_id| folder.views.get_view(&view_id));
  362. send_workspace_setting_notification(workspace, view);
  363. Ok(())
  364. }
  365. #[tracing::instrument(level = "trace", skip(self))]
  366. pub(crate) async fn get_current_view(&self) -> Option<ViewPB> {
  367. let view_id = self.with_folder(None, |folder| folder.get_current_view())?;
  368. self.get_view(&view_id).await.ok()
  369. }
  370. #[tracing::instrument(level = "trace", skip(self))]
  371. pub(crate) async fn get_all_trash(&self) -> Vec<TrashInfo> {
  372. self.with_folder(vec![], |folder| folder.trash.get_all_trash())
  373. }
  374. #[tracing::instrument(level = "trace", skip(self))]
  375. pub(crate) async fn restore_all_trash(&self) {
  376. self.with_folder((), |folder| {
  377. folder.trash.clear();
  378. });
  379. send_notification("trash", FolderNotification::DidUpdateTrash)
  380. .payload(RepeatedTrashPB { items: vec![] })
  381. .send();
  382. }
  383. #[tracing::instrument(level = "trace", skip(self))]
  384. pub(crate) async fn restore_trash(&self, trash_id: &str) {
  385. self.with_folder((), |folder| {
  386. folder.trash.delete_trash(vec![trash_id]);
  387. });
  388. }
  389. #[tracing::instrument(level = "trace", skip(self))]
  390. pub(crate) async fn delete_trash(&self, trash_id: &str) {
  391. self.with_folder((), |folder| {
  392. folder.trash.delete_trash(vec![trash_id]);
  393. folder.views.delete_views(vec![trash_id]);
  394. })
  395. }
  396. #[tracing::instrument(level = "trace", skip(self))]
  397. pub(crate) async fn delete_all_trash(&self) {
  398. self.with_folder((), |folder| {
  399. let trash = folder.trash.get_all_trash();
  400. folder.trash.clear();
  401. folder.views.delete_views(trash);
  402. });
  403. send_notification("trash", FolderNotification::DidUpdateTrash)
  404. .payload(RepeatedTrashPB { items: vec![] })
  405. .send();
  406. }
  407. fn get_data_processor(
  408. &self,
  409. view_layout: &ViewLayout,
  410. ) -> FlowyResult<Arc<dyn ViewDataProcessor + Send + Sync>> {
  411. match self.view_processors.get(view_layout) {
  412. None => Err(FlowyError::internal().context(format!(
  413. "Get data processor failed. Unknown layout type: {:?}",
  414. view_layout
  415. ))),
  416. Some(processor) => Ok(processor.clone()),
  417. }
  418. }
  419. }
  420. /// Listen on the [ViewChange] after create/delete/update events happened
  421. fn listen_on_view_change(mut rx: ViewChangeReceiver, folder: Folder) {
  422. tokio::spawn(async move {
  423. while let Ok(value) = rx.recv().await {
  424. match value {
  425. ViewChange::DidCreateView { view } => {
  426. notify_parent_view_did_change(folder.clone(), vec![view.bid]);
  427. },
  428. ViewChange::DidDeleteView { views: _ } => {},
  429. ViewChange::DidUpdate { view } => {
  430. notify_parent_view_did_change(folder.clone(), vec![view.bid]);
  431. },
  432. };
  433. }
  434. });
  435. }
  436. /// Listen on the [TrashChange]s and notify the frontend some views were changed.
  437. fn listen_on_trash_change(mut rx: TrashChangeReceiver, folder: Folder) {
  438. tokio::spawn(async move {
  439. while let Ok(value) = rx.recv().await {
  440. let mut unique_ids = HashSet::new();
  441. tracing::trace!("Did receive trash change: {:?}", value);
  442. let ids = match value {
  443. TrashChange::DidCreateTrash { ids } => ids,
  444. TrashChange::DidDeleteTrash { ids } => ids,
  445. };
  446. if let Some(folder) = folder.lock().as_ref() {
  447. let views = folder.views.get_views(&ids);
  448. for view in views {
  449. unique_ids.insert(view.bid);
  450. }
  451. let repeated_trash: RepeatedTrashPB = folder.trash.get_all_trash().into();
  452. send_notification("trash", FolderNotification::DidUpdateTrash)
  453. .payload(repeated_trash)
  454. .send();
  455. }
  456. let parent_view_ids = unique_ids.into_iter().collect();
  457. notify_parent_view_did_change(folder.clone(), parent_view_ids);
  458. }
  459. });
  460. }
  461. fn get_workspace_view_pbs(workspace_id: &str, folder: &InnerFolder) -> Vec<ViewPB> {
  462. let trash_ids = folder
  463. .trash
  464. .get_all_trash()
  465. .into_iter()
  466. .map(|trash| trash.id)
  467. .collect::<Vec<String>>();
  468. let mut views = folder.get_workspace_views(workspace_id);
  469. views.retain(|view| !trash_ids.contains(&view.id));
  470. views
  471. .into_iter()
  472. .map(|view| {
  473. let mut parent_view: ViewPB = view.into();
  474. // Get child views
  475. parent_view.belongings = folder
  476. .views
  477. .get_views_belong_to(&parent_view.id)
  478. .into_iter()
  479. .map(|view| view.into())
  480. .collect();
  481. parent_view
  482. })
  483. .collect()
  484. }
  485. #[tracing::instrument(level = "debug", skip(folder, parent_view_ids))]
  486. fn notify_parent_view_did_change<T: AsRef<str>>(
  487. folder: Folder,
  488. parent_view_ids: Vec<T>,
  489. ) -> Option<()> {
  490. let folder = folder.lock();
  491. let folder = folder.as_ref()?;
  492. let workspace_id = folder.get_current_workspace_id()?;
  493. let trash_ids = folder
  494. .trash
  495. .get_all_trash()
  496. .into_iter()
  497. .map(|trash| trash.id)
  498. .collect::<Vec<String>>();
  499. for parent_view_id in parent_view_ids {
  500. let parent_view_id = parent_view_id.as_ref();
  501. // if the view's bid is equal to workspace id. Then it will fetch the current
  502. // workspace views. Because the the workspace is not a view stored in the views map.
  503. if parent_view_id == workspace_id {
  504. let repeated_view: RepeatedViewPB = get_workspace_view_pbs(&workspace_id, folder).into();
  505. send_notification(&workspace_id, FolderNotification::DidUpdateWorkspaceViews)
  506. .payload(repeated_view)
  507. .send();
  508. } else {
  509. // Parent view can contain a list of child views. Currently, only get the first level
  510. // child views.
  511. let parent_view = folder.views.get_view(parent_view_id)?;
  512. let mut child_views = folder.views.get_views_belong_to(parent_view_id);
  513. child_views.retain(|view| !trash_ids.contains(&view.id));
  514. event!(Level::DEBUG, child_views_count = child_views.len());
  515. // Post the notification
  516. let mut parent_view_pb: ViewPB = parent_view.into();
  517. parent_view_pb.belongings = child_views
  518. .into_iter()
  519. .map(|child_view| child_view.into())
  520. .collect::<Vec<ViewPB>>();
  521. send_notification(parent_view_id, FolderNotification::DidUpdateChildViews)
  522. .payload(parent_view_pb)
  523. .send();
  524. }
  525. }
  526. None
  527. }
  528. fn folder_not_init_error() -> FlowyError {
  529. FlowyError::internal().context("Folder not initialized")
  530. }
  531. #[derive(Clone)]
  532. pub struct FolderId(String);
  533. impl FolderId {
  534. pub fn new(uid: i64) -> Self {
  535. Self(format!("{}:folder", uid))
  536. }
  537. }
  538. impl AsRef<str> for FolderId {
  539. fn as_ref(&self) -> &str {
  540. &self.0
  541. }
  542. }
  543. #[derive(Clone, Default)]
  544. pub struct Folder(Arc<Mutex<Option<InnerFolder>>>);
  545. impl Deref for Folder {
  546. type Target = Arc<Mutex<Option<InnerFolder>>>;
  547. fn deref(&self) -> &Self::Target {
  548. &self.0
  549. }
  550. }
  551. unsafe impl Sync for Folder {}
  552. unsafe impl Send for Folder {}