123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839 |
- use std::collections::HashMap;
- use std::convert::TryFrom;
- use std::env::temp_dir;
- use std::path::PathBuf;
- use std::sync::Arc;
- use bytes::Bytes;
- use nanoid::nanoid;
- use parking_lot::RwLock;
- use protobuf::ProtobufError;
- use tokio::sync::broadcast::{channel, Sender};
- use flowy_core::{AppFlowyCore, AppFlowyCoreConfig};
- use flowy_database2::entities::*;
- use flowy_database2::event_map::DatabaseEvent;
- use flowy_document2::entities::{DocumentDataPB, OpenDocumentPayloadPB};
- use flowy_document2::event_map::DocumentEvent;
- use flowy_folder2::entities::icon::UpdateViewIconPayloadPB;
- use flowy_folder2::entities::*;
- use flowy_folder2::event_map::FolderEvent;
- use flowy_notification::entities::SubscribeObject;
- use flowy_notification::{register_notification_sender, NotificationSender};
- use flowy_server::supabase::define::{USER_EMAIL, USER_UUID};
- use flowy_user::entities::{AuthTypePB, ThirdPartyAuthPB, UserProfilePB};
- use flowy_user::errors::{FlowyError, FlowyResult};
- use flowy_user::event_map::UserEvent::*;
- use crate::document::document_event::OpenDocumentData;
- use crate::event_builder::EventBuilder;
- use crate::user_event::{async_sign_up, SignUpContext};
- pub mod document;
- pub mod event_builder;
- pub mod folder_event;
- pub mod user_event;
- #[derive(Clone)]
- pub struct FlowyCoreTest {
- auth_type: Arc<RwLock<AuthTypePB>>,
- inner: AppFlowyCore,
- #[allow(dead_code)]
- cleaner: Arc<Cleaner>,
- pub notification_sender: TestNotificationSender,
- }
- impl Default for FlowyCoreTest {
- fn default() -> Self {
- let temp_dir = PathBuf::from(temp_dir()).join(nanoid!(6));
- std::fs::create_dir_all(&temp_dir).unwrap();
- Self::new_with_user_data_path(temp_dir, nanoid!(6))
- }
- }
- impl FlowyCoreTest {
- pub fn new() -> Self {
- Self::default()
- }
- pub fn new_with_user_data_path(path: PathBuf, name: String) -> Self {
- let config = AppFlowyCoreConfig::new(path.clone().to_str().unwrap(), name).log_filter(
- "info",
- vec!["flowy_test".to_string(), "lib_dispatch".to_string()],
- );
- let inner = std::thread::spawn(|| AppFlowyCore::new(config))
- .join()
- .unwrap();
- let notification_sender = TestNotificationSender::new();
- let auth_type = Arc::new(RwLock::new(AuthTypePB::Local));
- register_notification_sender(notification_sender.clone());
- std::mem::forget(inner.dispatcher());
- Self {
- inner,
- auth_type,
- notification_sender,
- cleaner: Arc::new(Cleaner(path)),
- }
- }
- pub async fn new_with_guest_user() -> Self {
- let test = Self::default();
- test.sign_up_as_guest().await;
- test
- }
- pub async fn sign_up_as_guest(&self) -> SignUpContext {
- async_sign_up(self.inner.dispatcher(), AuthTypePB::Local).await
- }
- pub async fn supabase_party_sign_up(&self, uuid: &str) -> UserProfilePB {
- let mut map = HashMap::new();
- map.insert("uuid".to_string(), uuid.to_string());
- let payload = ThirdPartyAuthPB {
- map,
- auth_type: AuthTypePB::Supabase,
- };
- EventBuilder::new(self.clone())
- .event(ThirdPartyAuth)
- .payload(payload)
- .async_send()
- .await
- .parse::<UserProfilePB>()
- }
- pub async fn sign_out(&self) {
- EventBuilder::new(self.clone())
- .event(SignOut)
- .async_send()
- .await;
- }
- pub fn set_auth_type(&self, auth_type: AuthTypePB) {
- *self.auth_type.write() = auth_type;
- }
- pub async fn init_user(&self) -> UserProfilePB {
- self.sign_up_as_guest().await.user_profile
- }
- pub async fn third_party_sign_up_with_uuid(
- &self,
- uuid: &str,
- email: Option<String>,
- ) -> FlowyResult<UserProfilePB> {
- let mut map = HashMap::new();
- map.insert(USER_UUID.to_string(), uuid.to_string());
- map.insert(
- USER_EMAIL.to_string(),
- email.unwrap_or_else(|| format!("{}@appflowy.io", nanoid!(10))),
- );
- let payload = ThirdPartyAuthPB {
- map,
- auth_type: AuthTypePB::Supabase,
- };
- let user_profile = EventBuilder::new(self.clone())
- .event(ThirdPartyAuth)
- .payload(payload)
- .async_send()
- .await
- .try_parse::<UserProfilePB>()?;
- Ok(user_profile)
- }
- // Must sign up/ sign in first
- pub async fn get_current_workspace(&self) -> WorkspaceSettingPB {
- EventBuilder::new(self.clone())
- .event(FolderEvent::GetCurrentWorkspace)
- .async_send()
- .await
- .parse::<flowy_folder2::entities::WorkspaceSettingPB>()
- }
- pub async fn get_all_workspace_views(&self) -> Vec<ViewPB> {
- EventBuilder::new(self.clone())
- .event(FolderEvent::ReadWorkspaceViews)
- .async_send()
- .await
- .parse::<flowy_folder2::entities::RepeatedViewPB>()
- .items
- }
- pub async fn get_views(&self, parent_view_id: &str) -> ViewPB {
- EventBuilder::new(self.clone())
- .event(FolderEvent::ReadView)
- .payload(ViewIdPB {
- value: parent_view_id.to_string(),
- })
- .async_send()
- .await
- .parse::<flowy_folder2::entities::ViewPB>()
- }
- pub async fn delete_view(&self, view_id: &str) {
- let payload = RepeatedViewIdPB {
- items: vec![view_id.to_string()],
- };
- // delete the view. the view will be moved to trash
- EventBuilder::new(self.clone())
- .event(FolderEvent::DeleteView)
- .payload(payload)
- .async_send()
- .await;
- }
- pub async fn update_view(&self, changeset: UpdateViewPayloadPB) -> Option<FlowyError> {
- // delete the view. the view will be moved to trash
- EventBuilder::new(self.clone())
- .event(FolderEvent::UpdateView)
- .payload(changeset)
- .async_send()
- .await
- .error()
- }
- pub async fn update_view_icon(&self, payload: UpdateViewIconPayloadPB) -> Option<FlowyError> {
- EventBuilder::new(self.clone())
- .event(FolderEvent::UpdateViewIcon)
- .payload(payload)
- .async_send()
- .await
- .error()
- }
- pub async fn create_view(&self, parent_id: &str, name: String) -> ViewPB {
- let payload = CreateViewPayloadPB {
- parent_view_id: parent_id.to_string(),
- name,
- desc: "".to_string(),
- thumbnail: None,
- layout: Default::default(),
- initial_data: vec![],
- meta: Default::default(),
- set_as_current: false,
- index: None,
- };
- EventBuilder::new(self.clone())
- .event(FolderEvent::CreateView)
- .payload(payload)
- .async_send()
- .await
- .parse::<flowy_folder2::entities::ViewPB>()
- }
- pub async fn create_document(
- &self,
- parent_id: &str,
- name: &str,
- initial_data: Vec<u8>,
- ) -> ViewPB {
- let payload = CreateViewPayloadPB {
- parent_view_id: parent_id.to_string(),
- name: name.to_string(),
- desc: "".to_string(),
- thumbnail: None,
- layout: ViewLayoutPB::Document,
- initial_data,
- meta: Default::default(),
- set_as_current: true,
- index: None,
- };
- let view = EventBuilder::new(self.clone())
- .event(FolderEvent::CreateView)
- .payload(payload)
- .async_send()
- .await
- .parse::<ViewPB>();
- let payload = OpenDocumentPayloadPB {
- document_id: view.id.clone(),
- };
- let _ = EventBuilder::new(self.clone())
- .event(DocumentEvent::OpenDocument)
- .payload(payload)
- .async_send()
- .await
- .parse::<DocumentDataPB>();
- view
- }
- pub async fn create_grid(&self, parent_id: &str, name: String, initial_data: Vec<u8>) -> ViewPB {
- let payload = CreateViewPayloadPB {
- parent_view_id: parent_id.to_string(),
- name,
- desc: "".to_string(),
- thumbnail: None,
- layout: ViewLayoutPB::Grid,
- initial_data,
- meta: Default::default(),
- set_as_current: true,
- index: None,
- };
- EventBuilder::new(self.clone())
- .event(FolderEvent::CreateView)
- .payload(payload)
- .async_send()
- .await
- .parse::<flowy_folder2::entities::ViewPB>()
- }
- pub async fn open_database(&self, view_id: &str) {
- EventBuilder::new(self.clone())
- .event(DatabaseEvent::GetDatabase)
- .payload(DatabaseViewIdPB {
- value: view_id.to_string(),
- })
- .async_send()
- .await;
- }
- pub async fn open_document(&self, doc_id: String) -> OpenDocumentData {
- let payload = OpenDocumentPayloadPB {
- document_id: doc_id.clone(),
- };
- let data = EventBuilder::new(self.clone())
- .event(DocumentEvent::OpenDocument)
- .payload(payload)
- .async_send()
- .await
- .parse::<DocumentDataPB>();
- OpenDocumentData { id: doc_id, data }
- }
- pub async fn create_board(&self, parent_id: &str, name: String, initial_data: Vec<u8>) -> ViewPB {
- let payload = CreateViewPayloadPB {
- parent_view_id: parent_id.to_string(),
- name,
- desc: "".to_string(),
- thumbnail: None,
- layout: ViewLayoutPB::Board,
- initial_data,
- meta: Default::default(),
- set_as_current: true,
- index: None,
- };
- EventBuilder::new(self.clone())
- .event(FolderEvent::CreateView)
- .payload(payload)
- .async_send()
- .await
- .parse::<flowy_folder2::entities::ViewPB>()
- }
- pub async fn create_calendar(
- &self,
- parent_id: &str,
- name: String,
- initial_data: Vec<u8>,
- ) -> ViewPB {
- let payload = CreateViewPayloadPB {
- parent_view_id: parent_id.to_string(),
- name,
- desc: "".to_string(),
- thumbnail: None,
- layout: ViewLayoutPB::Calendar,
- initial_data,
- meta: Default::default(),
- set_as_current: true,
- index: None,
- };
- EventBuilder::new(self.clone())
- .event(FolderEvent::CreateView)
- .payload(payload)
- .async_send()
- .await
- .parse::<flowy_folder2::entities::ViewPB>()
- }
- pub async fn get_database(&self, view_id: &str) -> DatabasePB {
- EventBuilder::new(self.clone())
- .event(DatabaseEvent::GetDatabase)
- .payload(DatabaseViewIdPB {
- value: view_id.to_string(),
- })
- .async_send()
- .await
- .parse::<flowy_database2::entities::DatabasePB>()
- }
- pub async fn get_all_database_fields(&self, view_id: &str) -> RepeatedFieldPB {
- EventBuilder::new(self.clone())
- .event(DatabaseEvent::GetFields)
- .payload(GetFieldPayloadPB {
- view_id: view_id.to_string(),
- field_ids: None,
- })
- .async_send()
- .await
- .parse::<RepeatedFieldPB>()
- }
- pub async fn create_field(&self, view_id: &str, field_type: FieldType) -> FieldPB {
- EventBuilder::new(self.clone())
- .event(DatabaseEvent::CreateTypeOption)
- .payload(CreateFieldPayloadPB {
- view_id: view_id.to_string(),
- field_type,
- type_option_data: None,
- })
- .async_send()
- .await
- .parse::<TypeOptionPB>()
- .field
- }
- pub async fn update_field(&self, changeset: FieldChangesetPB) {
- EventBuilder::new(self.clone())
- .event(DatabaseEvent::UpdateField)
- .payload(changeset)
- .async_send()
- .await;
- }
- pub async fn delete_field(&self, view_id: &str, field_id: &str) -> Option<FlowyError> {
- EventBuilder::new(self.clone())
- .event(DatabaseEvent::DeleteField)
- .payload(DeleteFieldPayloadPB {
- view_id: view_id.to_string(),
- field_id: field_id.to_string(),
- })
- .async_send()
- .await
- .error()
- }
- pub async fn update_field_type(
- &self,
- view_id: &str,
- field_id: &str,
- field_type: FieldType,
- ) -> Option<FlowyError> {
- EventBuilder::new(self.clone())
- .event(DatabaseEvent::UpdateFieldType)
- .payload(UpdateFieldTypePayloadPB {
- view_id: view_id.to_string(),
- field_id: field_id.to_string(),
- field_type,
- })
- .async_send()
- .await
- .error()
- }
- pub async fn duplicate_field(&self, view_id: &str, field_id: &str) -> Option<FlowyError> {
- EventBuilder::new(self.clone())
- .event(DatabaseEvent::DuplicateField)
- .payload(DuplicateFieldPayloadPB {
- view_id: view_id.to_string(),
- field_id: field_id.to_string(),
- })
- .async_send()
- .await
- .error()
- }
- pub async fn get_primary_field(&self, database_view_id: &str) -> FieldPB {
- EventBuilder::new(self.clone())
- .event(DatabaseEvent::GetPrimaryField)
- .payload(DatabaseViewIdPB {
- value: database_view_id.to_string(),
- })
- .async_send()
- .await
- .parse::<FieldPB>()
- }
- pub async fn create_row(
- &self,
- view_id: &str,
- start_row_id: Option<String>,
- data: Option<RowDataPB>,
- ) -> RowMetaPB {
- EventBuilder::new(self.clone())
- .event(DatabaseEvent::CreateRow)
- .payload(CreateRowPayloadPB {
- view_id: view_id.to_string(),
- start_row_id,
- group_id: None,
- data,
- })
- .async_send()
- .await
- .parse::<RowMetaPB>()
- }
- pub async fn delete_row(&self, view_id: &str, row_id: &str) -> Option<FlowyError> {
- EventBuilder::new(self.clone())
- .event(DatabaseEvent::DeleteRow)
- .payload(RowIdPB {
- view_id: view_id.to_string(),
- row_id: row_id.to_string(),
- group_id: None,
- })
- .async_send()
- .await
- .error()
- }
- pub async fn get_row(&self, view_id: &str, row_id: &str) -> OptionalRowPB {
- EventBuilder::new(self.clone())
- .event(DatabaseEvent::GetRow)
- .payload(RowIdPB {
- view_id: view_id.to_string(),
- row_id: row_id.to_string(),
- group_id: None,
- })
- .async_send()
- .await
- .parse::<OptionalRowPB>()
- }
- pub async fn get_row_meta(&self, view_id: &str, row_id: &str) -> RowMetaPB {
- EventBuilder::new(self.clone())
- .event(DatabaseEvent::GetRowMeta)
- .payload(RowIdPB {
- view_id: view_id.to_string(),
- row_id: row_id.to_string(),
- group_id: None,
- })
- .async_send()
- .await
- .parse::<RowMetaPB>()
- }
- pub async fn update_row_meta(&self, changeset: UpdateRowMetaChangesetPB) -> Option<FlowyError> {
- EventBuilder::new(self.clone())
- .event(DatabaseEvent::UpdateRowMeta)
- .payload(changeset)
- .async_send()
- .await
- .error()
- }
- pub async fn duplicate_row(&self, view_id: &str, row_id: &str) -> Option<FlowyError> {
- EventBuilder::new(self.clone())
- .event(DatabaseEvent::DuplicateRow)
- .payload(RowIdPB {
- view_id: view_id.to_string(),
- row_id: row_id.to_string(),
- group_id: None,
- })
- .async_send()
- .await
- .error()
- }
- pub async fn move_row(&self, view_id: &str, row_id: &str, to_row_id: &str) -> Option<FlowyError> {
- EventBuilder::new(self.clone())
- .event(DatabaseEvent::MoveRow)
- .payload(MoveRowPayloadPB {
- view_id: view_id.to_string(),
- from_row_id: row_id.to_string(),
- to_row_id: to_row_id.to_string(),
- })
- .async_send()
- .await
- .error()
- }
- pub async fn update_cell(&self, changeset: CellChangesetPB) -> Option<FlowyError> {
- EventBuilder::new(self.clone())
- .event(DatabaseEvent::UpdateCell)
- .payload(changeset)
- .async_send()
- .await
- .error()
- }
- pub async fn update_date_cell(&self, changeset: DateChangesetPB) -> Option<FlowyError> {
- EventBuilder::new(self.clone())
- .event(DatabaseEvent::UpdateDateCell)
- .payload(changeset)
- .async_send()
- .await
- .error()
- }
- pub async fn get_cell(&self, view_id: &str, row_id: &str, field_id: &str) -> CellPB {
- EventBuilder::new(self.clone())
- .event(DatabaseEvent::GetCell)
- .payload(CellIdPB {
- view_id: view_id.to_string(),
- row_id: row_id.to_string(),
- field_id: field_id.to_string(),
- })
- .async_send()
- .await
- .parse::<CellPB>()
- }
- pub async fn get_date_cell(&self, view_id: &str, row_id: &str, field_id: &str) -> DateCellDataPB {
- let cell = self.get_cell(view_id, row_id, field_id).await;
- DateCellDataPB::try_from(Bytes::from(cell.data)).unwrap()
- }
- pub async fn get_checklist_cell(
- &self,
- view_id: &str,
- field_id: &str,
- row_id: &str,
- ) -> ChecklistCellDataPB {
- EventBuilder::new(self.clone())
- .event(DatabaseEvent::GetChecklistCellData)
- .payload(CellIdPB {
- view_id: view_id.to_string(),
- row_id: row_id.to_string(),
- field_id: field_id.to_string(),
- })
- .async_send()
- .await
- .parse::<ChecklistCellDataPB>()
- }
- pub async fn update_checklist_cell(
- &self,
- changeset: ChecklistCellDataChangesetPB,
- ) -> Option<FlowyError> {
- EventBuilder::new(self.clone())
- .event(DatabaseEvent::UpdateChecklistCell)
- .payload(changeset)
- .async_send()
- .await
- .error()
- }
- pub async fn insert_option(
- &self,
- view_id: &str,
- field_id: &str,
- row_id: &str,
- name: &str,
- ) -> Option<FlowyError> {
- let option = EventBuilder::new(self.clone())
- .event(DatabaseEvent::CreateSelectOption)
- .payload(CreateSelectOptionPayloadPB {
- field_id: field_id.to_string(),
- view_id: view_id.to_string(),
- option_name: name.to_string(),
- })
- .async_send()
- .await
- .parse::<SelectOptionPB>();
- EventBuilder::new(self.clone())
- .event(DatabaseEvent::InsertOrUpdateSelectOption)
- .payload(RepeatedSelectOptionPayload {
- view_id: view_id.to_string(),
- field_id: field_id.to_string(),
- row_id: row_id.to_string(),
- items: vec![option],
- })
- .async_send()
- .await
- .error()
- }
- pub async fn get_groups(&self, view_id: &str) -> Vec<GroupPB> {
- EventBuilder::new(self.clone())
- .event(DatabaseEvent::GetGroups)
- .payload(DatabaseViewIdPB {
- value: view_id.to_string(),
- })
- .async_send()
- .await
- .parse::<RepeatedGroupPB>()
- .items
- }
- pub async fn move_group(&self, view_id: &str, from_id: &str, to_id: &str) -> Option<FlowyError> {
- EventBuilder::new(self.clone())
- .event(DatabaseEvent::MoveGroup)
- .payload(MoveGroupPayloadPB {
- view_id: view_id.to_string(),
- from_group_id: from_id.to_string(),
- to_group_id: to_id.to_string(),
- })
- .async_send()
- .await
- .error()
- }
- pub async fn set_group_by_field(&self, view_id: &str, field_id: &str) -> Option<FlowyError> {
- EventBuilder::new(self.clone())
- .event(DatabaseEvent::SetGroupByField)
- .payload(GroupByFieldPayloadPB {
- field_id: field_id.to_string(),
- view_id: view_id.to_string(),
- })
- .async_send()
- .await
- .error()
- }
- pub async fn update_group(
- &self,
- view_id: &str,
- group_id: &str,
- name: Option<String>,
- visible: Option<bool>,
- ) -> Option<FlowyError> {
- EventBuilder::new(self.clone())
- .event(DatabaseEvent::UpdateGroup)
- .payload(UpdateGroupPB {
- view_id: view_id.to_string(),
- group_id: group_id.to_string(),
- name,
- visible,
- })
- .async_send()
- .await
- .error()
- }
- pub async fn update_setting(&self, changeset: DatabaseSettingChangesetPB) -> Option<FlowyError> {
- EventBuilder::new(self.clone())
- .event(DatabaseEvent::UpdateDatabaseSetting)
- .payload(changeset)
- .async_send()
- .await
- .error()
- }
- pub async fn get_all_calendar_events(&self, view_id: &str) -> Vec<CalendarEventPB> {
- EventBuilder::new(self.clone())
- .event(DatabaseEvent::GetAllCalendarEvents)
- .payload(CalendarEventRequestPB {
- view_id: view_id.to_string(),
- })
- .async_send()
- .await
- .parse::<RepeatedCalendarEventPB>()
- .items
- }
- pub async fn get_view(&self, view_id: &str) -> ViewPB {
- EventBuilder::new(self.clone())
- .event(FolderEvent::ReadView)
- .payload(ViewIdPB {
- value: view_id.to_string(),
- })
- .async_send()
- .await
- .parse::<flowy_folder2::entities::ViewPB>()
- }
- }
- impl std::ops::Deref for FlowyCoreTest {
- type Target = AppFlowyCore;
- fn deref(&self) -> &Self::Target {
- &self.inner
- }
- }
- #[derive(Clone)]
- pub struct TestNotificationSender {
- sender: Arc<Sender<SubscribeObject>>,
- }
- impl Default for TestNotificationSender {
- fn default() -> Self {
- let (sender, _) = channel(1000);
- Self {
- sender: Arc::new(sender),
- }
- }
- }
- impl TestNotificationSender {
- pub fn new() -> Self {
- Self::default()
- }
- pub fn subscribe<T>(&self, id: &str, ty: impl Into<i32> + Send) -> tokio::sync::mpsc::Receiver<T>
- where
- T: TryFrom<Bytes, Error = ProtobufError> + Send + 'static,
- {
- let id = id.to_string();
- let (tx, rx) = tokio::sync::mpsc::channel::<T>(10);
- let mut receiver = self.sender.subscribe();
- let ty = ty.into();
- tokio::spawn(async move {
- // DatabaseNotification::DidUpdateDatabaseSnapshotState
- while let Ok(value) = receiver.recv().await {
- if value.id == id && value.ty == ty {
- if let Some(payload) = value.payload {
- match T::try_from(Bytes::from(payload)) {
- Ok(object) => {
- let _ = tx.send(object).await;
- },
- Err(e) => {
- panic!(
- "Failed to parse notification payload to type: {:?} with error: {}",
- std::any::type_name::<T>(),
- e
- );
- },
- }
- }
- }
- }
- });
- rx
- }
- pub fn subscribe_with_condition<T, F>(&self, id: &str, when: F) -> tokio::sync::mpsc::Receiver<T>
- where
- T: TryFrom<Bytes, Error = ProtobufError> + Send + 'static,
- F: Fn(&T) -> bool + Send + 'static,
- {
- let id = id.to_string();
- let (tx, rx) = tokio::sync::mpsc::channel::<T>(10);
- let mut receiver = self.sender.subscribe();
- tokio::spawn(async move {
- while let Ok(value) = receiver.recv().await {
- if value.id == id {
- if let Some(payload) = value.payload {
- if let Ok(object) = T::try_from(Bytes::from(payload)) {
- if when(&object) {
- let _ = tx.send(object).await;
- }
- }
- }
- }
- }
- });
- rx
- }
- }
- impl NotificationSender for TestNotificationSender {
- fn send_subject(&self, subject: SubscribeObject) -> Result<(), String> {
- let _ = self.sender.send(subject);
- Ok(())
- }
- }
- pub struct Cleaner(PathBuf);
- impl Cleaner {
- pub fn new(dir: PathBuf) -> Self {
- Cleaner(dir)
- }
- fn cleanup(dir: &PathBuf) {
- let _ = std::fs::remove_dir_all(dir);
- }
- }
- impl Drop for Cleaner {
- fn drop(&mut self) {
- Self::cleanup(&self.0)
- }
- }
|