123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196 |
- use anyhow::Error;
- use flowy_task::{Task, TaskContent, TaskDispatcher, TaskHandler, TaskId, TaskResult, TaskRunner, TaskState};
- use futures::stream::FuturesUnordered;
- use futures::StreamExt;
- use lib_infra::future::BoxResultFuture;
- use lib_infra::ref_map::RefCountValue;
- use rand::Rng;
- use std::sync::Arc;
- use std::time::Duration;
- use tokio::sync::oneshot::Receiver;
- use tokio::sync::RwLock;
- pub enum SearchScript {
- AddTask {
- task: Task,
- },
- AddTasks {
- tasks: Vec<Task>,
- },
- #[allow(dead_code)]
- Wait {
- millisecond: u64,
- },
- CancelTask {
- task_id: TaskId,
- },
- UnregisterHandler {
- handler_id: String,
- },
- AssertTaskStatus {
- task_id: TaskId,
- expected_status: TaskState,
- },
- AssertExecuteOrder {
- execute_order: Vec<u32>,
- rets: Vec<Receiver<TaskResult>>,
- },
- }
- pub struct SearchTest {
- scheduler: Arc<RwLock<TaskDispatcher>>,
- }
- impl SearchTest {
- pub async fn new() -> Self {
- let duration = Duration::from_millis(1000);
- let mut scheduler = TaskDispatcher::new(duration);
- scheduler.register_handler(Arc::new(MockTextTaskHandler()));
- scheduler.register_handler(Arc::new(MockBlobTaskHandler()));
- scheduler.register_handler(Arc::new(MockTimeoutTaskHandler()));
- let scheduler = Arc::new(RwLock::new(scheduler));
- tokio::spawn(TaskRunner::run(scheduler.clone()));
- Self { scheduler }
- }
- pub async fn next_task_id(&self) -> TaskId {
- self.scheduler.read().await.next_task_id()
- }
- pub async fn run_scripts(&self, scripts: Vec<SearchScript>) {
- for script in scripts {
- self.run_script(script).await;
- }
- }
- pub async fn run_script(&self, script: SearchScript) {
- match script {
- SearchScript::AddTask { task } => {
- self.scheduler.write().await.add_task(task);
- }
- SearchScript::CancelTask { task_id } => {
- self.scheduler.write().await.cancel_task(task_id);
- }
- SearchScript::AddTasks { tasks } => {
- let mut scheduler = self.scheduler.write().await;
- for task in tasks {
- scheduler.add_task(task);
- }
- }
- SearchScript::Wait { millisecond } => {
- tokio::time::sleep(Duration::from_millis(millisecond)).await;
- }
- SearchScript::UnregisterHandler { handler_id } => {
- self.scheduler.write().await.unregister_handler(handler_id);
- }
- SearchScript::AssertTaskStatus {
- task_id,
- expected_status,
- } => {
- let status = self.scheduler.read().await.read_task(&task_id).unwrap().state().clone();
- assert_eq!(status, expected_status);
- }
- SearchScript::AssertExecuteOrder { execute_order, rets } => {
- let mut futures = FuturesUnordered::new();
- for ret in rets {
- futures.push(ret);
- }
- let mut orders = vec![];
- while let Some(Ok(result)) = futures.next().await {
- orders.push(result.id);
- assert!(result.state.is_done());
- }
- assert_eq!(execute_order, orders);
- }
- }
- }
- }
- pub struct MockTextTaskHandler();
- impl RefCountValue for MockTextTaskHandler {
- fn did_remove(&self) {}
- }
- impl TaskHandler for MockTextTaskHandler {
- fn handler_id(&self) -> &str {
- "1"
- }
- fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> {
- let mut rng = rand::thread_rng();
- let millisecond = rng.gen_range(1..50);
- Box::pin(async move {
- match content {
- TaskContent::Text(_s) => {
- tokio::time::sleep(Duration::from_millis(millisecond)).await;
- }
- TaskContent::Blob(_) => panic!("Only support text"),
- }
- Ok(())
- })
- }
- }
- pub fn make_text_background_task(task_id: TaskId, s: &str) -> (Task, Receiver<TaskResult>) {
- let mut task = Task::background("1", task_id, TaskContent::Text(s.to_owned()));
- let recv = task.recv.take().unwrap();
- (task, recv)
- }
- pub fn make_text_user_interactive_task(task_id: TaskId, s: &str) -> (Task, Receiver<TaskResult>) {
- let mut task = Task::user_interactive("1", task_id, TaskContent::Text(s.to_owned()));
- let recv = task.recv.take().unwrap();
- (task, recv)
- }
- pub struct MockBlobTaskHandler();
- impl RefCountValue for MockBlobTaskHandler {
- fn did_remove(&self) {}
- }
- impl TaskHandler for MockBlobTaskHandler {
- fn handler_id(&self) -> &str {
- "2"
- }
- fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> {
- Box::pin(async move {
- match content {
- TaskContent::Text(_) => panic!("Only support blob"),
- TaskContent::Blob(bytes) => {
- let _msg = String::from_utf8(bytes).unwrap();
- tokio::time::sleep(Duration::from_millis(20)).await;
- }
- }
- Ok(())
- })
- }
- }
- pub struct MockTimeoutTaskHandler();
- impl TaskHandler for MockTimeoutTaskHandler {
- fn handler_id(&self) -> &str {
- "3"
- }
- fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> {
- Box::pin(async move {
- match content {
- TaskContent::Text(_) => panic!("Only support blob"),
- TaskContent::Blob(_bytes) => {
- tokio::time::sleep(Duration::from_millis(2000)).await;
- }
- }
- Ok(())
- })
- }
- }
- pub fn make_timeout_task(task_id: TaskId) -> (Task, Receiver<TaskResult>) {
- let mut task = Task::background("3", task_id, TaskContent::Blob(vec![]));
- let recv = task.recv.take().unwrap();
- (task, recv)
- }
|