| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216 | 
							- use anyhow::Error;
 
- use flowy_task::{
 
-   Task, TaskContent, TaskDispatcher, TaskHandler, TaskId, TaskResult, TaskRunner, TaskState,
 
- };
 
- use futures::stream::FuturesUnordered;
 
- use futures::StreamExt;
 
- use lib_infra::async_trait::async_trait;
 
- use lib_infra::future::BoxResultFuture;
 
- use lib_infra::ref_map::RefCountValue;
 
- use rand::Rng;
 
- use std::sync::Arc;
 
- use std::time::Duration;
 
- use tokio::sync::oneshot::Receiver;
 
- use tokio::sync::RwLock;
 
- pub enum SearchScript {
 
-   AddTask {
 
-     task: Task,
 
-   },
 
-   AddTasks {
 
-     tasks: Vec<Task>,
 
-   },
 
-   #[allow(dead_code)]
 
-   Wait {
 
-     millisecond: u64,
 
-   },
 
-   CancelTask {
 
-     task_id: TaskId,
 
-   },
 
-   UnregisterHandler {
 
-     handler_id: String,
 
-   },
 
-   AssertTaskStatus {
 
-     task_id: TaskId,
 
-     expected_status: TaskState,
 
-   },
 
-   AssertExecuteOrder {
 
-     execute_order: Vec<u32>,
 
-     rets: Vec<Receiver<TaskResult>>,
 
-   },
 
- }
 
- pub struct SearchTest {
 
-   scheduler: Arc<RwLock<TaskDispatcher>>,
 
- }
 
- impl SearchTest {
 
-   pub async fn new() -> Self {
 
-     let duration = Duration::from_millis(1000);
 
-     let mut scheduler = TaskDispatcher::new(duration);
 
-     scheduler.register_handler(Arc::new(MockTextTaskHandler()));
 
-     scheduler.register_handler(Arc::new(MockBlobTaskHandler()));
 
-     scheduler.register_handler(Arc::new(MockTimeoutTaskHandler()));
 
-     let scheduler = Arc::new(RwLock::new(scheduler));
 
-     tokio::spawn(TaskRunner::run(scheduler.clone()));
 
-     Self { scheduler }
 
-   }
 
-   pub async fn next_task_id(&self) -> TaskId {
 
-     self.scheduler.read().await.next_task_id()
 
-   }
 
-   pub async fn run_scripts(&self, scripts: Vec<SearchScript>) {
 
-     for script in scripts {
 
-       self.run_script(script).await;
 
-     }
 
-   }
 
-   pub async fn run_script(&self, script: SearchScript) {
 
-     match script {
 
-       SearchScript::AddTask { task } => {
 
-         self.scheduler.write().await.add_task(task);
 
-       },
 
-       SearchScript::CancelTask { task_id } => {
 
-         self.scheduler.write().await.cancel_task(task_id);
 
-       },
 
-       SearchScript::AddTasks { tasks } => {
 
-         let mut scheduler = self.scheduler.write().await;
 
-         for task in tasks {
 
-           scheduler.add_task(task);
 
-         }
 
-       },
 
-       SearchScript::Wait { millisecond } => {
 
-         tokio::time::sleep(Duration::from_millis(millisecond)).await;
 
-       },
 
-       SearchScript::UnregisterHandler { handler_id } => {
 
-         self
 
-           .scheduler
 
-           .write()
 
-           .await
 
-           .unregister_handler(handler_id)
 
-           .await;
 
-       },
 
-       SearchScript::AssertTaskStatus {
 
-         task_id,
 
-         expected_status,
 
-       } => {
 
-         let status = self
 
-           .scheduler
 
-           .read()
 
-           .await
 
-           .read_task(&task_id)
 
-           .unwrap()
 
-           .state()
 
-           .clone();
 
-         assert_eq!(status, expected_status);
 
-       },
 
-       SearchScript::AssertExecuteOrder {
 
-         execute_order,
 
-         rets,
 
-       } => {
 
-         let mut futures = FuturesUnordered::new();
 
-         for ret in rets {
 
-           futures.push(ret);
 
-         }
 
-         let mut orders = vec![];
 
-         while let Some(Ok(result)) = futures.next().await {
 
-           orders.push(result.id);
 
-           assert!(result.state.is_done());
 
-         }
 
-         assert_eq!(execute_order, orders);
 
-       },
 
-     }
 
-   }
 
- }
 
- pub struct MockTextTaskHandler();
 
- #[async_trait]
 
- impl RefCountValue for MockTextTaskHandler {
 
-   async fn did_remove(&self) {}
 
- }
 
- impl TaskHandler for MockTextTaskHandler {
 
-   fn handler_id(&self) -> &str {
 
-     "1"
 
-   }
 
-   fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> {
 
-     let mut rng = rand::thread_rng();
 
-     let millisecond = rng.gen_range(1..50);
 
-     Box::pin(async move {
 
-       match content {
 
-         TaskContent::Text(_s) => {
 
-           tokio::time::sleep(Duration::from_millis(millisecond)).await;
 
-         },
 
-         TaskContent::Blob(_) => panic!("Only support text"),
 
-       }
 
-       Ok(())
 
-     })
 
-   }
 
- }
 
- pub fn make_text_background_task(task_id: TaskId, s: &str) -> (Task, Receiver<TaskResult>) {
 
-   let mut task = Task::background("1", task_id, TaskContent::Text(s.to_owned()));
 
-   let recv = task.recv.take().unwrap();
 
-   (task, recv)
 
- }
 
- pub fn make_text_user_interactive_task(task_id: TaskId, s: &str) -> (Task, Receiver<TaskResult>) {
 
-   let mut task = Task::user_interactive("1", task_id, TaskContent::Text(s.to_owned()));
 
-   let recv = task.recv.take().unwrap();
 
-   (task, recv)
 
- }
 
- pub struct MockBlobTaskHandler();
 
- #[async_trait]
 
- impl RefCountValue for MockBlobTaskHandler {
 
-   async fn did_remove(&self) {}
 
- }
 
- impl TaskHandler for MockBlobTaskHandler {
 
-   fn handler_id(&self) -> &str {
 
-     "2"
 
-   }
 
-   fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> {
 
-     Box::pin(async move {
 
-       match content {
 
-         TaskContent::Text(_) => panic!("Only support blob"),
 
-         TaskContent::Blob(bytes) => {
 
-           let _msg = String::from_utf8(bytes).unwrap();
 
-           tokio::time::sleep(Duration::from_millis(20)).await;
 
-         },
 
-       }
 
-       Ok(())
 
-     })
 
-   }
 
- }
 
- pub struct MockTimeoutTaskHandler();
 
- impl TaskHandler for MockTimeoutTaskHandler {
 
-   fn handler_id(&self) -> &str {
 
-     "3"
 
-   }
 
-   fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> {
 
-     Box::pin(async move {
 
-       match content {
 
-         TaskContent::Text(_) => panic!("Only support blob"),
 
-         TaskContent::Blob(_bytes) => {
 
-           tokio::time::sleep(Duration::from_millis(2000)).await;
 
-         },
 
-       }
 
-       Ok(())
 
-     })
 
-   }
 
- }
 
- pub fn make_timeout_task(task_id: TaskId) -> (Task, Receiver<TaskResult>) {
 
-   let mut task = Task::background("3", task_id, TaskContent::Blob(vec![]));
 
-   let recv = task.recv.take().unwrap();
 
-   (task, recv)
 
- }
 
 
  |