| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 | use anyhow::Error;use flowy_task::{Task, TaskContent, TaskDispatcher, TaskHandler, TaskId, TaskResult, TaskRunner, TaskState};use futures::stream::FuturesUnordered;use futures::StreamExt;use lib_infra::async_trait::async_trait;use lib_infra::future::BoxResultFuture;use lib_infra::ref_map::RefCountValue;use rand::Rng;use std::sync::Arc;use std::time::Duration;use tokio::sync::oneshot::Receiver;use tokio::sync::RwLock;pub enum SearchScript {    AddTask {        task: Task,    },    AddTasks {        tasks: Vec<Task>,    },    #[allow(dead_code)]    Wait {        millisecond: u64,    },    CancelTask {        task_id: TaskId,    },    UnregisterHandler {        handler_id: String,    },    AssertTaskStatus {        task_id: TaskId,        expected_status: TaskState,    },    AssertExecuteOrder {        execute_order: Vec<u32>,        rets: Vec<Receiver<TaskResult>>,    },}pub struct SearchTest {    scheduler: Arc<RwLock<TaskDispatcher>>,}impl SearchTest {    pub async fn new() -> Self {        let duration = Duration::from_millis(1000);        let mut scheduler = TaskDispatcher::new(duration);        scheduler.register_handler(Arc::new(MockTextTaskHandler()));        scheduler.register_handler(Arc::new(MockBlobTaskHandler()));        scheduler.register_handler(Arc::new(MockTimeoutTaskHandler()));        let scheduler = Arc::new(RwLock::new(scheduler));        tokio::spawn(TaskRunner::run(scheduler.clone()));        Self { scheduler }    }    pub async fn next_task_id(&self) -> TaskId {        self.scheduler.read().await.next_task_id()    }    pub async fn run_scripts(&self, scripts: Vec<SearchScript>) {        for script in scripts {            self.run_script(script).await;        }    }    pub async fn run_script(&self, script: SearchScript) {        match script {            SearchScript::AddTask { task } => {                self.scheduler.write().await.add_task(task);            }            SearchScript::CancelTask { task_id } => {                self.scheduler.write().await.cancel_task(task_id);            }            SearchScript::AddTasks { tasks } => {                let mut scheduler = self.scheduler.write().await;                for task in tasks {                    scheduler.add_task(task);                }            }            SearchScript::Wait { millisecond } => {                tokio::time::sleep(Duration::from_millis(millisecond)).await;            }            SearchScript::UnregisterHandler { handler_id } => {                self.scheduler.write().await.unregister_handler(handler_id).await;            }            SearchScript::AssertTaskStatus {                task_id,                expected_status,            } => {                let status = self.scheduler.read().await.read_task(&task_id).unwrap().state().clone();                assert_eq!(status, expected_status);            }            SearchScript::AssertExecuteOrder { execute_order, rets } => {                let mut futures = FuturesUnordered::new();                for ret in rets {                    futures.push(ret);                }                let mut orders = vec![];                while let Some(Ok(result)) = futures.next().await {                    orders.push(result.id);                    assert!(result.state.is_done());                }                assert_eq!(execute_order, orders);            }        }    }}pub struct MockTextTaskHandler();#[async_trait]impl RefCountValue for MockTextTaskHandler {    async fn did_remove(&self) {}}impl TaskHandler for MockTextTaskHandler {    fn handler_id(&self) -> &str {        "1"    }    fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> {        let mut rng = rand::thread_rng();        let millisecond = rng.gen_range(1..50);        Box::pin(async move {            match content {                TaskContent::Text(_s) => {                    tokio::time::sleep(Duration::from_millis(millisecond)).await;                }                TaskContent::Blob(_) => panic!("Only support text"),            }            Ok(())        })    }}pub fn make_text_background_task(task_id: TaskId, s: &str) -> (Task, Receiver<TaskResult>) {    let mut task = Task::background("1", task_id, TaskContent::Text(s.to_owned()));    let recv = task.recv.take().unwrap();    (task, recv)}pub fn make_text_user_interactive_task(task_id: TaskId, s: &str) -> (Task, Receiver<TaskResult>) {    let mut task = Task::user_interactive("1", task_id, TaskContent::Text(s.to_owned()));    let recv = task.recv.take().unwrap();    (task, recv)}pub struct MockBlobTaskHandler();#[async_trait]impl RefCountValue for MockBlobTaskHandler {    async fn did_remove(&self) {}}impl TaskHandler for MockBlobTaskHandler {    fn handler_id(&self) -> &str {        "2"    }    fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> {        Box::pin(async move {            match content {                TaskContent::Text(_) => panic!("Only support blob"),                TaskContent::Blob(bytes) => {                    let _msg = String::from_utf8(bytes).unwrap();                    tokio::time::sleep(Duration::from_millis(20)).await;                }            }            Ok(())        })    }}pub struct MockTimeoutTaskHandler();impl TaskHandler for MockTimeoutTaskHandler {    fn handler_id(&self) -> &str {        "3"    }    fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> {        Box::pin(async move {            match content {                TaskContent::Text(_) => panic!("Only support blob"),                TaskContent::Blob(_bytes) => {                    tokio::time::sleep(Duration::from_millis(2000)).await;                }            }            Ok(())        })    }}pub fn make_timeout_task(task_id: TaskId) -> (Task, Receiver<TaskResult>) {    let mut task = Task::background("3", task_id, TaskContent::Blob(vec![]));    let recv = task.recv.take().unwrap();    (task, recv)}
 |