lib.rs 21 KB


  1. use std::collections::HashMap;
  2. use std::convert::TryFrom;
  3. use std::env::temp_dir;
  4. use std::path::PathBuf;
  5. use std::sync::Arc;
  6. use bytes::Bytes;
  7. use nanoid::nanoid;
  8. use parking_lot::RwLock;
  9. use protobuf::ProtobufError;
  10. use tokio::sync::broadcast::{channel, Sender};
  11. use flowy_core::{AppFlowyCore, AppFlowyCoreConfig};
  12. use flowy_database2::entities::*;
  13. use flowy_database2::event_map::DatabaseEvent;
  14. use flowy_document2::entities::{DocumentDataPB, OpenDocumentPayloadPB};
  15. use flowy_document2::event_map::DocumentEvent;
  16. use flowy_folder2::entities::*;
  17. use flowy_folder2::event_map::FolderEvent;
  18. use flowy_notification::entities::SubscribeObject;
  19. use flowy_notification::{register_notification_sender, NotificationSender};
  20. use flowy_server::supabase::define::{USER_EMAIL, USER_UUID};
  21. use flowy_user::entities::{AuthTypePB, ThirdPartyAuthPB, UserProfilePB};
  22. use flowy_user::errors::{FlowyError, FlowyResult};
  23. use flowy_user::event_map::UserEvent::*;
  24. use crate::event_builder::EventBuilder;
  25. use crate::user_event::{async_sign_up, SignUpContext};
  26. pub mod document;
  27. pub mod event_builder;
  28. pub mod folder_event;
  29. pub mod user_event;
  30. #[derive(Clone)]
  31. pub struct FlowyCoreTest {
  32. auth_type: Arc<RwLock<AuthTypePB>>,
  33. inner: AppFlowyCore,
  34. cleaner: Arc<RwLock<Option<Cleaner>>>,
  35. pub notification_sender: TestNotificationSender,
  36. }
  37. impl Default for FlowyCoreTest {
  38. fn default() -> Self {
  39. let temp_dir = temp_dir();
  40. let config = AppFlowyCoreConfig::new(temp_dir.to_str().unwrap(), nanoid!(6)).log_filter(
  41. "info",
  42. vec!["flowy_test".to_string(), "lib_dispatch".to_string()],
  43. );
  44. let inner = std::thread::spawn(|| AppFlowyCore::new(config))
  45. .join()
  46. .unwrap();
  47. let auth_type = Arc::new(RwLock::new(AuthTypePB::Local));
  48. let notification_sender = TestNotificationSender::new();
  49. register_notification_sender(notification_sender.clone());
  50. std::mem::forget(inner.dispatcher());
  51. Self {
  52. inner,
  53. auth_type,
  54. notification_sender,
  55. cleaner: Arc::new(RwLock::new(None)),
  56. }
  57. }
  58. }
  59. impl FlowyCoreTest {
  60. pub fn new() -> Self {
  61. Self::default()
  62. }
  63. pub async fn new_with_guest_user() -> Self {
  64. let test = Self::default();
  65. test.sign_up_as_guest().await;
  66. test
  67. }
  68. pub async fn sign_up_as_guest(&self) -> SignUpContext {
  69. async_sign_up(self.inner.dispatcher(), AuthTypePB::Local).await
  70. }
  71. pub async fn supabase_party_sign_up(&self, uuid: &str) -> UserProfilePB {
  72. let mut map = HashMap::new();
  73. map.insert("uuid".to_string(), uuid.to_string());
  74. let payload = ThirdPartyAuthPB {
  75. map,
  76. auth_type: AuthTypePB::Supabase,
  77. };
  78. EventBuilder::new(self.clone())
  79. .event(ThirdPartyAuth)
  80. .payload(payload)
  81. .async_send()
  82. .await
  83. .parse::<UserProfilePB>()
  84. }
  85. pub async fn sign_out(&self) {
  86. EventBuilder::new(self.clone())
  87. .event(SignOut)
  88. .async_send()
  89. .await;
  90. }
  91. pub fn set_auth_type(&self, auth_type: AuthTypePB) {
  92. *self.auth_type.write() = auth_type;
  93. }
  94. pub async fn init_user(&self) -> UserProfilePB {
  95. self.sign_up_as_guest().await.user_profile
  96. }
  97. pub async fn third_party_sign_up_with_uuid(
  98. &self,
  99. uuid: &str,
  100. email: Option<String>,
  101. ) -> FlowyResult<UserProfilePB> {
  102. let mut map = HashMap::new();
  103. map.insert(USER_UUID.to_string(), uuid.to_string());
  104. map.insert(
  105. USER_EMAIL.to_string(),
  106. email.unwrap_or_else(|| format!("{}@appflowy.io", nanoid!(10))),
  107. );
  108. let payload = ThirdPartyAuthPB {
  109. map,
  110. auth_type: AuthTypePB::Supabase,
  111. };
  112. let user_profile = EventBuilder::new(self.clone())
  113. .event(ThirdPartyAuth)
  114. .payload(payload)
  115. .async_send()
  116. .await
  117. .try_parse::<UserProfilePB>()?;
  118. let user_path = PathBuf::from(&self.config.storage_path).join(user_profile.id.to_string());
  119. *self.cleaner.write() = Some(Cleaner::new(user_path));
  120. Ok(user_profile)
  121. }
  122. // Must sign up/ sign in first
  123. pub async fn get_current_workspace(&self) -> WorkspaceSettingPB {
  124. EventBuilder::new(self.clone())
  125. .event(FolderEvent::GetCurrentWorkspace)
  126. .async_send()
  127. .await
  128. .parse::<flowy_folder2::entities::WorkspaceSettingPB>()
  129. }
  130. pub async fn get_all_workspace_views(&self) -> Vec<ViewPB> {
  131. EventBuilder::new(self.clone())
  132. .event(FolderEvent::ReadWorkspaceViews)
  133. .async_send()
  134. .await
  135. .parse::<flowy_folder2::entities::RepeatedViewPB>()
  136. .items
  137. }
  138. pub async fn delete_view(&self, view_id: &str) {
  139. let payload = RepeatedViewIdPB {
  140. items: vec![view_id.to_string()],
  141. };
  142. // delete the view. the view will be moved to trash
  143. EventBuilder::new(self.clone())
  144. .event(FolderEvent::DeleteView)
  145. .payload(payload)
  146. .async_send()
  147. .await;
  148. }
  149. pub async fn update_view(&self, changeset: UpdateViewPayloadPB) -> Option<FlowyError> {
  150. // delete the view. the view will be moved to trash
  151. EventBuilder::new(self.clone())
  152. .event(FolderEvent::UpdateView)
  153. .payload(changeset)
  154. .async_send()
  155. .await
  156. .error()
  157. }
  158. pub async fn create_view(&self, parent_id: &str, name: String) -> ViewPB {
  159. let payload = CreateViewPayloadPB {
  160. parent_view_id: parent_id.to_string(),
  161. name,
  162. desc: "".to_string(),
  163. thumbnail: None,
  164. layout: Default::default(),
  165. initial_data: vec![],
  166. meta: Default::default(),
  167. set_as_current: false,
  168. index: None,
  169. };
  170. EventBuilder::new(self.clone())
  171. .event(FolderEvent::CreateView)
  172. .payload(payload)
  173. .async_send()
  174. .await
  175. .parse::<flowy_folder2::entities::ViewPB>()
  176. }
  177. pub async fn create_document(
  178. &self,
  179. parent_id: &str,
  180. name: &str,
  181. initial_data: Vec<u8>,
  182. ) -> ViewPB {
  183. let payload = CreateViewPayloadPB {
  184. parent_view_id: parent_id.to_string(),
  185. name: name.to_string(),
  186. desc: "".to_string(),
  187. thumbnail: None,
  188. layout: ViewLayoutPB::Document,
  189. initial_data,
  190. meta: Default::default(),
  191. set_as_current: true,
  192. index: None,
  193. };
  194. let view = EventBuilder::new(self.clone())
  195. .event(FolderEvent::CreateView)
  196. .payload(payload)
  197. .async_send()
  198. .await
  199. .parse::<ViewPB>();
  200. let payload = OpenDocumentPayloadPB {
  201. document_id: view.id.clone(),
  202. };
  203. let _ = EventBuilder::new(self.clone())
  204. .event(DocumentEvent::OpenDocument)
  205. .payload(payload)
  206. .async_send()
  207. .await
  208. .parse::<DocumentDataPB>();
  209. view
  210. }
  211. pub async fn create_grid(&self, parent_id: &str, name: String, initial_data: Vec<u8>) -> ViewPB {
  212. let payload = CreateViewPayloadPB {
  213. parent_view_id: parent_id.to_string(),
  214. name,
  215. desc: "".to_string(),
  216. thumbnail: None,
  217. layout: ViewLayoutPB::Grid,
  218. initial_data,
  219. meta: Default::default(),
  220. set_as_current: true,
  221. index: None,
  222. };
  223. EventBuilder::new(self.clone())
  224. .event(FolderEvent::CreateView)
  225. .payload(payload)
  226. .async_send()
  227. .await
  228. .parse::<flowy_folder2::entities::ViewPB>()
  229. }
  230. pub async fn open_database(&self, view_id: &str) {
  231. EventBuilder::new(self.clone())
  232. .event(DatabaseEvent::GetDatabase)
  233. .payload(DatabaseViewIdPB {
  234. value: view_id.to_string(),
  235. })
  236. .async_send()
  237. .await;
  238. }
  239. pub async fn create_board(&self, parent_id: &str, name: String, initial_data: Vec<u8>) -> ViewPB {
  240. let payload = CreateViewPayloadPB {
  241. parent_view_id: parent_id.to_string(),
  242. name,
  243. desc: "".to_string(),
  244. thumbnail: None,
  245. layout: ViewLayoutPB::Board,
  246. initial_data,
  247. meta: Default::default(),
  248. set_as_current: true,
  249. index: None,
  250. };
  251. EventBuilder::new(self.clone())
  252. .event(FolderEvent::CreateView)
  253. .payload(payload)
  254. .async_send()
  255. .await
  256. .parse::<flowy_folder2::entities::ViewPB>()
  257. }
  258. pub async fn create_calendar(
  259. &self,
  260. parent_id: &str,
  261. name: String,
  262. initial_data: Vec<u8>,
  263. ) -> ViewPB {
  264. let payload = CreateViewPayloadPB {
  265. parent_view_id: parent_id.to_string(),
  266. name,
  267. desc: "".to_string(),
  268. thumbnail: None,
  269. layout: ViewLayoutPB::Calendar,
  270. initial_data,
  271. meta: Default::default(),
  272. set_as_current: true,
  273. index: None,
  274. };
  275. EventBuilder::new(self.clone())
  276. .event(FolderEvent::CreateView)
  277. .payload(payload)
  278. .async_send()
  279. .await
  280. .parse::<flowy_folder2::entities::ViewPB>()
  281. }
  282. pub async fn get_database(&self, view_id: &str) -> DatabasePB {
  283. EventBuilder::new(self.clone())
  284. .event(DatabaseEvent::GetDatabase)
  285. .payload(DatabaseViewIdPB {
  286. value: view_id.to_string(),
  287. })
  288. .async_send()
  289. .await
  290. .parse::<flowy_database2::entities::DatabasePB>()
  291. }
  292. pub async fn get_all_database_fields(&self, view_id: &str) -> RepeatedFieldPB {
  293. EventBuilder::new(self.clone())
  294. .event(DatabaseEvent::GetFields)
  295. .payload(GetFieldPayloadPB {
  296. view_id: view_id.to_string(),
  297. field_ids: None,
  298. })
  299. .async_send()
  300. .await
  301. .parse::<RepeatedFieldPB>()
  302. }
  303. pub async fn create_field(&self, view_id: &str, field_type: FieldType) -> FieldPB {
  304. EventBuilder::new(self.clone())
  305. .event(DatabaseEvent::CreateTypeOption)
  306. .payload(CreateFieldPayloadPB {
  307. view_id: view_id.to_string(),
  308. field_type,
  309. type_option_data: None,
  310. })
  311. .async_send()
  312. .await
  313. .parse::<TypeOptionPB>()
  314. .field
  315. }
  316. pub async fn update_field(&self, changeset: FieldChangesetPB) {
  317. EventBuilder::new(self.clone())
  318. .event(DatabaseEvent::UpdateField)
  319. .payload(changeset)
  320. .async_send()
  321. .await;
  322. }
  323. pub async fn delete_field(&self, view_id: &str, field_id: &str) -> Option<FlowyError> {
  324. EventBuilder::new(self.clone())
  325. .event(DatabaseEvent::DeleteField)
  326. .payload(DeleteFieldPayloadPB {
  327. view_id: view_id.to_string(),
  328. field_id: field_id.to_string(),
  329. })
  330. .async_send()
  331. .await
  332. .error()
  333. }
  334. pub async fn update_field_type(
  335. &self,
  336. view_id: &str,
  337. field_id: &str,
  338. field_type: FieldType,
  339. ) -> Option<FlowyError> {
  340. EventBuilder::new(self.clone())
  341. .event(DatabaseEvent::UpdateFieldType)
  342. .payload(UpdateFieldTypePayloadPB {
  343. view_id: view_id.to_string(),
  344. field_id: field_id.to_string(),
  345. field_type,
  346. })
  347. .async_send()
  348. .await
  349. .error()
  350. }
  351. pub async fn duplicate_field(&self, view_id: &str, field_id: &str) -> Option<FlowyError> {
  352. EventBuilder::new(self.clone())
  353. .event(DatabaseEvent::DuplicateField)
  354. .payload(DuplicateFieldPayloadPB {
  355. view_id: view_id.to_string(),
  356. field_id: field_id.to_string(),
  357. })
  358. .async_send()
  359. .await
  360. .error()
  361. }
  362. pub async fn get_primary_field(&self, database_view_id: &str) -> FieldPB {
  363. EventBuilder::new(self.clone())
  364. .event(DatabaseEvent::GetPrimaryField)
  365. .payload(DatabaseViewIdPB {
  366. value: database_view_id.to_string(),
  367. })
  368. .async_send()
  369. .await
  370. .parse::<FieldPB>()
  371. }
  372. pub async fn create_row(
  373. &self,
  374. view_id: &str,
  375. start_row_id: Option<String>,
  376. data: Option<RowDataPB>,
  377. ) -> RowMetaPB {
  378. EventBuilder::new(self.clone())
  379. .event(DatabaseEvent::CreateRow)
  380. .payload(CreateRowPayloadPB {
  381. view_id: view_id.to_string(),
  382. start_row_id,
  383. group_id: None,
  384. data,
  385. })
  386. .async_send()
  387. .await
  388. .parse::<RowMetaPB>()
  389. }
  390. pub async fn delete_row(&self, view_id: &str, row_id: &str) -> Option<FlowyError> {
  391. EventBuilder::new(self.clone())
  392. .event(DatabaseEvent::DeleteRow)
  393. .payload(RowIdPB {
  394. view_id: view_id.to_string(),
  395. row_id: row_id.to_string(),
  396. group_id: None,
  397. })
  398. .async_send()
  399. .await
  400. .error()
  401. }
  402. pub async fn get_row(&self, view_id: &str, row_id: &str) -> OptionalRowPB {
  403. EventBuilder::new(self.clone())
  404. .event(DatabaseEvent::GetRow)
  405. .payload(RowIdPB {
  406. view_id: view_id.to_string(),
  407. row_id: row_id.to_string(),
  408. group_id: None,
  409. })
  410. .async_send()
  411. .await
  412. .parse::<OptionalRowPB>()
  413. }
  414. pub async fn get_row_meta(&self, view_id: &str, row_id: &str) -> RowMetaPB {
  415. EventBuilder::new(self.clone())
  416. .event(DatabaseEvent::GetRowMeta)
  417. .payload(RowIdPB {
  418. view_id: view_id.to_string(),
  419. row_id: row_id.to_string(),
  420. group_id: None,
  421. })
  422. .async_send()
  423. .await
  424. .parse::<RowMetaPB>()
  425. }
  426. pub async fn update_row_meta(&self, changeset: UpdateRowMetaChangesetPB) -> Option<FlowyError> {
  427. EventBuilder::new(self.clone())
  428. .event(DatabaseEvent::UpdateRowMeta)
  429. .payload(changeset)
  430. .async_send()
  431. .await
  432. .error()
  433. }
  434. pub async fn duplicate_row(&self, view_id: &str, row_id: &str) -> Option<FlowyError> {
  435. EventBuilder::new(self.clone())
  436. .event(DatabaseEvent::DuplicateRow)
  437. .payload(RowIdPB {
  438. view_id: view_id.to_string(),
  439. row_id: row_id.to_string(),
  440. group_id: None,
  441. })
  442. .async_send()
  443. .await
  444. .error()
  445. }
  446. pub async fn move_row(&self, view_id: &str, row_id: &str, to_row_id: &str) -> Option<FlowyError> {
  447. EventBuilder::new(self.clone())
  448. .event(DatabaseEvent::MoveRow)
  449. .payload(MoveRowPayloadPB {
  450. view_id: view_id.to_string(),
  451. from_row_id: row_id.to_string(),
  452. to_row_id: to_row_id.to_string(),
  453. })
  454. .async_send()
  455. .await
  456. .error()
  457. }
  458. pub async fn update_cell(&self, changeset: CellChangesetPB) -> Option<FlowyError> {
  459. EventBuilder::new(self.clone())
  460. .event(DatabaseEvent::UpdateCell)
  461. .payload(changeset)
  462. .async_send()
  463. .await
  464. .error()
  465. }
  466. pub async fn update_date_cell(&self, changeset: DateChangesetPB) -> Option<FlowyError> {
  467. EventBuilder::new(self.clone())
  468. .event(DatabaseEvent::UpdateDateCell)
  469. .payload(changeset)
  470. .async_send()
  471. .await
  472. .error()
  473. }
  474. pub async fn get_cell(&self, view_id: &str, row_id: &str, field_id: &str) -> CellPB {
  475. EventBuilder::new(self.clone())
  476. .event(DatabaseEvent::GetCell)
  477. .payload(CellIdPB {
  478. view_id: view_id.to_string(),
  479. row_id: row_id.to_string(),
  480. field_id: field_id.to_string(),
  481. })
  482. .async_send()
  483. .await
  484. .parse::<CellPB>()
  485. }
  486. pub async fn get_date_cell(&self, view_id: &str, row_id: &str, field_id: &str) -> DateCellDataPB {
  487. let cell = self.get_cell(view_id, row_id, field_id).await;
  488. DateCellDataPB::try_from(Bytes::from(cell.data)).unwrap()
  489. }
  490. pub async fn get_checklist_cell(
  491. &self,
  492. view_id: &str,
  493. field_id: &str,
  494. row_id: &str,
  495. ) -> ChecklistCellDataPB {
  496. EventBuilder::new(self.clone())
  497. .event(DatabaseEvent::GetChecklistCellData)
  498. .payload(CellIdPB {
  499. view_id: view_id.to_string(),
  500. row_id: row_id.to_string(),
  501. field_id: field_id.to_string(),
  502. })
  503. .async_send()
  504. .await
  505. .parse::<ChecklistCellDataPB>()
  506. }
  507. pub async fn update_checklist_cell(
  508. &self,
  509. changeset: ChecklistCellDataChangesetPB,
  510. ) -> Option<FlowyError> {
  511. EventBuilder::new(self.clone())
  512. .event(DatabaseEvent::UpdateChecklistCell)
  513. .payload(changeset)
  514. .async_send()
  515. .await
  516. .error()
  517. }
  518. pub async fn insert_option(
  519. &self,
  520. view_id: &str,
  521. field_id: &str,
  522. row_id: &str,
  523. name: &str,
  524. ) -> Option<FlowyError> {
  525. let option = EventBuilder::new(self.clone())
  526. .event(DatabaseEvent::CreateSelectOption)
  527. .payload(CreateSelectOptionPayloadPB {
  528. field_id: field_id.to_string(),
  529. view_id: view_id.to_string(),
  530. option_name: name.to_string(),
  531. })
  532. .async_send()
  533. .await
  534. .parse::<SelectOptionPB>();
  535. EventBuilder::new(self.clone())
  536. .event(DatabaseEvent::InsertOrUpdateSelectOption)
  537. .payload(RepeatedSelectOptionPayload {
  538. view_id: view_id.to_string(),
  539. field_id: field_id.to_string(),
  540. row_id: row_id.to_string(),
  541. items: vec![option],
  542. })
  543. .async_send()
  544. .await
  545. .error()
  546. }
  547. pub async fn get_groups(&self, view_id: &str) -> Vec<GroupPB> {
  548. EventBuilder::new(self.clone())
  549. .event(DatabaseEvent::GetGroups)
  550. .payload(DatabaseViewIdPB {
  551. value: view_id.to_string(),
  552. })
  553. .async_send()
  554. .await
  555. .parse::<RepeatedGroupPB>()
  556. .items
  557. }
  558. pub async fn move_group(&self, view_id: &str, from_id: &str, to_id: &str) -> Option<FlowyError> {
  559. EventBuilder::new(self.clone())
  560. .event(DatabaseEvent::MoveGroup)
  561. .payload(MoveGroupPayloadPB {
  562. view_id: view_id.to_string(),
  563. from_group_id: from_id.to_string(),
  564. to_group_id: to_id.to_string(),
  565. })
  566. .async_send()
  567. .await
  568. .error()
  569. }
  570. pub async fn set_group_by_field(&self, view_id: &str, field_id: &str) -> Option<FlowyError> {
  571. EventBuilder::new(self.clone())
  572. .event(DatabaseEvent::SetGroupByField)
  573. .payload(GroupByFieldPayloadPB {
  574. field_id: field_id.to_string(),
  575. view_id: view_id.to_string(),
  576. })
  577. .async_send()
  578. .await
  579. .error()
  580. }
  581. pub async fn update_group(
  582. &self,
  583. view_id: &str,
  584. group_id: &str,
  585. name: Option<String>,
  586. visible: Option<bool>,
  587. ) -> Option<FlowyError> {
  588. EventBuilder::new(self.clone())
  589. .event(DatabaseEvent::UpdateGroup)
  590. .payload(UpdateGroupPB {
  591. view_id: view_id.to_string(),
  592. group_id: group_id.to_string(),
  593. name,
  594. visible,
  595. })
  596. .async_send()
  597. .await
  598. .error()
  599. }
  600. pub async fn update_setting(&self, changeset: DatabaseSettingChangesetPB) -> Option<FlowyError> {
  601. EventBuilder::new(self.clone())
  602. .event(DatabaseEvent::UpdateDatabaseSetting)
  603. .payload(changeset)
  604. .async_send()
  605. .await
  606. .error()
  607. }
  608. pub async fn get_all_calendar_events(&self, view_id: &str) -> Vec<CalendarEventPB> {
  609. EventBuilder::new(self.clone())
  610. .event(DatabaseEvent::GetAllCalendarEvents)
  611. .payload(CalendarEventRequestPB {
  612. view_id: view_id.to_string(),
  613. })
  614. .async_send()
  615. .await
  616. .parse::<RepeatedCalendarEventPB>()
  617. .items
  618. }
  619. pub async fn get_view(&self, view_id: &str) -> ViewPB {
  620. EventBuilder::new(self.clone())
  621. .event(FolderEvent::ReadView)
  622. .payload(ViewIdPB {
  623. value: view_id.to_string(),
  624. })
  625. .async_send()
  626. .await
  627. .parse::<flowy_folder2::entities::ViewPB>()
  628. }
  629. }
  630. impl std::ops::Deref for FlowyCoreTest {
  631. type Target = AppFlowyCore;
  632. fn deref(&self) -> &Self::Target {
  633. &self.inner
  634. }
  635. }
  636. #[derive(Clone)]
  637. pub struct TestNotificationSender {
  638. sender: Arc<Sender<SubscribeObject>>,
  639. }
  640. impl Default for TestNotificationSender {
  641. fn default() -> Self {
  642. let (sender, _) = channel(1000);
  643. Self {
  644. sender: Arc::new(sender),
  645. }
  646. }
  647. }
  648. impl TestNotificationSender {
  649. pub fn new() -> Self {
  650. Self::default()
  651. }
  652. pub fn subscribe<T>(&self, id: &str, ty: impl Into<i32> + Send) -> tokio::sync::mpsc::Receiver<T>
  653. where
  654. T: TryFrom<Bytes, Error = ProtobufError> + Send + 'static,
  655. {
  656. let id = id.to_string();
  657. let (tx, rx) = tokio::sync::mpsc::channel::<T>(10);
  658. let mut receiver = self.sender.subscribe();
  659. let ty = ty.into();
  660. tokio::spawn(async move {
  661. // DatabaseNotification::DidUpdateDatabaseSnapshotState
  662. while let Ok(value) = receiver.recv().await {
  663. if value.id == id && value.ty == ty {
  664. if let Some(payload) = value.payload {
  665. if let Ok(object) = T::try_from(Bytes::from(payload)) {
  666. let _ = tx.send(object).await;
  667. }
  668. }
  669. }
  670. }
  671. });
  672. rx
  673. }
  674. pub fn subscribe_with_condition<T, F>(&self, id: &str, when: F) -> tokio::sync::mpsc::Receiver<T>
  675. where
  676. T: TryFrom<Bytes, Error = ProtobufError> + Send + 'static,
  677. F: Fn(&T) -> bool + Send + 'static,
  678. {
  679. let id = id.to_string();
  680. let (tx, rx) = tokio::sync::mpsc::channel::<T>(10);
  681. let mut receiver = self.sender.subscribe();
  682. tokio::spawn(async move {
  683. while let Ok(value) = receiver.recv().await {
  684. if value.id == id {
  685. if let Some(payload) = value.payload {
  686. if let Ok(object) = T::try_from(Bytes::from(payload)) {
  687. if when(&object) {
  688. let _ = tx.send(object).await;
  689. }
  690. }
  691. }
  692. }
  693. }
  694. });
  695. rx
  696. }
  697. }
  698. impl NotificationSender for TestNotificationSender {
  699. fn send_subject(&self, subject: SubscribeObject) -> Result<(), String> {
  700. let _ = self.sender.send(subject);
  701. Ok(())
  702. }
  703. }
  704. struct Cleaner(PathBuf);
  705. impl Cleaner {
  706. fn new(dir: PathBuf) -> Self {
  707. Cleaner(dir)
  708. }
  709. fn cleanup(dir: &PathBuf) {
  710. let _ = std::fs::remove_dir_all(dir);
  711. }
  712. }
  713. impl Drop for Cleaner {
  714. fn drop(&mut self) {
  715. Self::cleanup(&self.0)
  716. }
  717. }