| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216 | use anyhow::Error;use flowy_task::{  Task, TaskContent, TaskDispatcher, TaskHandler, TaskId, TaskResult, TaskRunner, TaskState,};use futures::stream::FuturesUnordered;use futures::StreamExt;use lib_infra::async_trait::async_trait;use lib_infra::future::BoxResultFuture;use lib_infra::ref_map::RefCountValue;use rand::Rng;use std::sync::Arc;use std::time::Duration;use tokio::sync::oneshot::Receiver;use tokio::sync::RwLock;pub enum SearchScript {  AddTask {    task: Task,  },  AddTasks {    tasks: Vec<Task>,  },  #[allow(dead_code)]  Wait {    millisecond: u64,  },  CancelTask {    task_id: TaskId,  },  UnregisterHandler {    handler_id: String,  },  AssertTaskStatus {    task_id: TaskId,    expected_status: TaskState,  },  AssertExecuteOrder {    execute_order: Vec<u32>,    rets: Vec<Receiver<TaskResult>>,  },}pub struct SearchTest {  scheduler: Arc<RwLock<TaskDispatcher>>,}impl SearchTest {  pub async fn new() -> Self {    let duration = Duration::from_millis(1000);    let mut scheduler = TaskDispatcher::new(duration);    scheduler.register_handler(Arc::new(MockTextTaskHandler()));    scheduler.register_handler(Arc::new(MockBlobTaskHandler()));    scheduler.register_handler(Arc::new(MockTimeoutTaskHandler()));    let scheduler = Arc::new(RwLock::new(scheduler));    tokio::spawn(TaskRunner::run(scheduler.clone()));    Self { scheduler }  }  pub async fn next_task_id(&self) -> TaskId {    self.scheduler.read().await.next_task_id()  }  pub async fn run_scripts(&self, scripts: Vec<SearchScript>) {    for script in scripts {      self.run_script(script).await;    }  }  pub async fn run_script(&self, script: SearchScript) {    match script {      SearchScript::AddTask { task } => {        self.scheduler.write().await.add_task(task);      },      SearchScript::CancelTask { task_id } => {        self.scheduler.write().await.cancel_task(task_id);      },      SearchScript::AddTasks { tasks } => {        let mut scheduler = self.scheduler.write().await;        for task in tasks {          scheduler.add_task(task);        }      },      SearchScript::Wait { millisecond } => {        tokio::time::sleep(Duration::from_millis(millisecond)).await;      },      SearchScript::UnregisterHandler { handler_id } => {        self          .scheduler          .write()          .await          .unregister_handler(handler_id)          .await;      },      SearchScript::AssertTaskStatus {        task_id,        expected_status,      } => {        let status = self          .scheduler          .read()          .await          .read_task(&task_id)          .unwrap()          .state()          .clone();        assert_eq!(status, expected_status);      },      SearchScript::AssertExecuteOrder {        execute_order,        rets,      } => {        let mut futures = FuturesUnordered::new();        for ret in rets {          futures.push(ret);        }        let mut orders = vec![];        while let Some(Ok(result)) = futures.next().await {          orders.push(result.id);          assert!(result.state.is_done());        }        assert_eq!(execute_order, orders);      },    }  }}pub struct MockTextTaskHandler();#[async_trait]impl RefCountValue for MockTextTaskHandler {  async fn did_remove(&self) {}}impl TaskHandler for MockTextTaskHandler {  fn handler_id(&self) -> &str {    "1"  }  fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> {    let mut rng = rand::thread_rng();    let millisecond = rng.gen_range(1..50);    Box::pin(async move {      match content {        TaskContent::Text(_s) => {          tokio::time::sleep(Duration::from_millis(millisecond)).await;        },        TaskContent::Blob(_) => panic!("Only support text"),      }      Ok(())    })  }}pub fn make_text_background_task(task_id: TaskId, s: &str) -> (Task, Receiver<TaskResult>) {  let mut task = Task::background("1", task_id, TaskContent::Text(s.to_owned()));  let recv = task.recv.take().unwrap();  (task, recv)}pub fn make_text_user_interactive_task(task_id: TaskId, s: &str) -> (Task, Receiver<TaskResult>) {  let mut task = Task::user_interactive("1", task_id, TaskContent::Text(s.to_owned()));  let recv = task.recv.take().unwrap();  (task, recv)}pub struct MockBlobTaskHandler();#[async_trait]impl RefCountValue for MockBlobTaskHandler {  async fn did_remove(&self) {}}impl TaskHandler for MockBlobTaskHandler {  fn handler_id(&self) -> &str {    "2"  }  fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> {    Box::pin(async move {      match content {        TaskContent::Text(_) => panic!("Only support blob"),        TaskContent::Blob(bytes) => {          let _msg = String::from_utf8(bytes).unwrap();          tokio::time::sleep(Duration::from_millis(20)).await;        },      }      Ok(())    })  }}pub struct MockTimeoutTaskHandler();impl TaskHandler for MockTimeoutTaskHandler {  fn handler_id(&self) -> &str {    "3"  }  fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> {    Box::pin(async move {      match content {        TaskContent::Text(_) => panic!("Only support blob"),        TaskContent::Blob(_bytes) => {          tokio::time::sleep(Duration::from_millis(2000)).await;        },      }      Ok(())    })  }}pub fn make_timeout_task(task_id: TaskId) -> (Task, Receiver<TaskResult>) {  let mut task = Task::background("3", task_id, TaskContent::Blob(vec![]));  let recv = task.recv.take().unwrap();  (task, recv)}
 |