script.rs 5.5 KB


  1. use anyhow::Error;
  2. use flowy_task::{
  3. Task, TaskContent, TaskDispatcher, TaskHandler, TaskId, TaskResult, TaskRunner, TaskState,
  4. };
  5. use futures::stream::FuturesUnordered;
  6. use futures::StreamExt;
  7. use lib_infra::async_trait::async_trait;
  8. use lib_infra::future::BoxResultFuture;
  9. use lib_infra::ref_map::RefCountValue;
  10. use rand::Rng;
  11. use std::sync::Arc;
  12. use std::time::Duration;
  13. use tokio::sync::oneshot::Receiver;
  14. use tokio::sync::RwLock;
  15. pub enum SearchScript {
  16. AddTask {
  17. task: Task,
  18. },
  19. AddTasks {
  20. tasks: Vec<Task>,
  21. },
  22. #[allow(dead_code)]
  23. Wait {
  24. millisecond: u64,
  25. },
  26. CancelTask {
  27. task_id: TaskId,
  28. },
  29. UnregisterHandler {
  30. handler_id: String,
  31. },
  32. AssertTaskStatus {
  33. task_id: TaskId,
  34. expected_status: TaskState,
  35. },
  36. AssertExecuteOrder {
  37. execute_order: Vec<u32>,
  38. rets: Vec<Receiver<TaskResult>>,
  39. },
  40. }
  41. pub struct SearchTest {
  42. scheduler: Arc<RwLock<TaskDispatcher>>,
  43. }
  44. impl SearchTest {
  45. pub async fn new() -> Self {
  46. let duration = Duration::from_millis(1000);
  47. let mut scheduler = TaskDispatcher::new(duration);
  48. scheduler.register_handler(Arc::new(MockTextTaskHandler()));
  49. scheduler.register_handler(Arc::new(MockBlobTaskHandler()));
  50. scheduler.register_handler(Arc::new(MockTimeoutTaskHandler()));
  51. let scheduler = Arc::new(RwLock::new(scheduler));
  52. tokio::spawn(TaskRunner::run(scheduler.clone()));
  53. Self { scheduler }
  54. }
  55. pub async fn next_task_id(&self) -> TaskId {
  56. self.scheduler.read().await.next_task_id()
  57. }
  58. pub async fn run_scripts(&self, scripts: Vec<SearchScript>) {
  59. for script in scripts {
  60. self.run_script(script).await;
  61. }
  62. }
  63. pub async fn run_script(&self, script: SearchScript) {
  64. match script {
  65. SearchScript::AddTask { task } => {
  66. self.scheduler.write().await.add_task(task);
  67. },
  68. SearchScript::CancelTask { task_id } => {
  69. self.scheduler.write().await.cancel_task(task_id);
  70. },
  71. SearchScript::AddTasks { tasks } => {
  72. let mut scheduler = self.scheduler.write().await;
  73. for task in tasks {
  74. scheduler.add_task(task);
  75. }
  76. },
  77. SearchScript::Wait { millisecond } => {
  78. tokio::time::sleep(Duration::from_millis(millisecond)).await;
  79. },
  80. SearchScript::UnregisterHandler { handler_id } => {
  81. self
  82. .scheduler
  83. .write()
  84. .await
  85. .unregister_handler(handler_id)
  86. .await;
  87. },
  88. SearchScript::AssertTaskStatus {
  89. task_id,
  90. expected_status,
  91. } => {
  92. let status = self
  93. .scheduler
  94. .read()
  95. .await
  96. .read_task(&task_id)
  97. .unwrap()
  98. .state()
  99. .clone();
  100. assert_eq!(status, expected_status);
  101. },
  102. SearchScript::AssertExecuteOrder {
  103. execute_order,
  104. rets,
  105. } => {
  106. let mut futures = FuturesUnordered::new();
  107. for ret in rets {
  108. futures.push(ret);
  109. }
  110. let mut orders = vec![];
  111. while let Some(Ok(result)) = futures.next().await {
  112. orders.push(result.id);
  113. assert!(result.state.is_done());
  114. }
  115. assert_eq!(execute_order, orders);
  116. },
  117. }
  118. }
  119. }
  120. pub struct MockTextTaskHandler();
  121. #[async_trait]
  122. impl RefCountValue for MockTextTaskHandler {
  123. async fn did_remove(&self) {}
  124. }
  125. impl TaskHandler for MockTextTaskHandler {
  126. fn handler_id(&self) -> &str {
  127. "1"
  128. }
  129. fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> {
  130. let mut rng = rand::thread_rng();
  131. let millisecond = rng.gen_range(1..50);
  132. Box::pin(async move {
  133. match content {
  134. TaskContent::Text(_s) => {
  135. tokio::time::sleep(Duration::from_millis(millisecond)).await;
  136. },
  137. TaskContent::Blob(_) => panic!("Only support text"),
  138. }
  139. Ok(())
  140. })
  141. }
  142. }
  143. pub fn make_text_background_task(task_id: TaskId, s: &str) -> (Task, Receiver<TaskResult>) {
  144. let mut task = Task::background("1", task_id, TaskContent::Text(s.to_owned()));
  145. let recv = task.recv.take().unwrap();
  146. (task, recv)
  147. }
  148. pub fn make_text_user_interactive_task(task_id: TaskId, s: &str) -> (Task, Receiver<TaskResult>) {
  149. let mut task = Task::user_interactive("1", task_id, TaskContent::Text(s.to_owned()));
  150. let recv = task.recv.take().unwrap();
  151. (task, recv)
  152. }
  153. pub struct MockBlobTaskHandler();
  154. #[async_trait]
  155. impl RefCountValue for MockBlobTaskHandler {
  156. async fn did_remove(&self) {}
  157. }
  158. impl TaskHandler for MockBlobTaskHandler {
  159. fn handler_id(&self) -> &str {
  160. "2"
  161. }
  162. fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> {
  163. Box::pin(async move {
  164. match content {
  165. TaskContent::Text(_) => panic!("Only support blob"),
  166. TaskContent::Blob(bytes) => {
  167. let _msg = String::from_utf8(bytes).unwrap();
  168. tokio::time::sleep(Duration::from_millis(20)).await;
  169. },
  170. }
  171. Ok(())
  172. })
  173. }
  174. }
  175. pub struct MockTimeoutTaskHandler();
  176. impl TaskHandler for MockTimeoutTaskHandler {
  177. fn handler_id(&self) -> &str {
  178. "3"
  179. }
  180. fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> {
  181. Box::pin(async move {
  182. match content {
  183. TaskContent::Text(_) => panic!("Only support blob"),
  184. TaskContent::Blob(_bytes) => {
  185. tokio::time::sleep(Duration::from_millis(2000)).await;
  186. },
  187. }
  188. Ok(())
  189. })
  190. }
  191. }
  192. pub fn make_timeout_task(task_id: TaskId) -> (Task, Receiver<TaskResult>) {
  193. let mut task = Task::background("3", task_id, TaskContent::Blob(vec![]));
  194. let recv = task.recv.take().unwrap();
  195. (task, recv)
  196. }