script.rs 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. use anyhow::Error;
  2. use flowy_task::{Task, TaskContent, TaskDispatcher, TaskHandler, TaskId, TaskResult, TaskRunner, TaskState};
  3. use futures::stream::FuturesUnordered;
  4. use futures::StreamExt;
  5. use lib_infra::async_trait::async_trait;
  6. use lib_infra::future::BoxResultFuture;
  7. use lib_infra::ref_map::RefCountValue;
  8. use rand::Rng;
  9. use std::sync::Arc;
  10. use std::time::Duration;
  11. use tokio::sync::oneshot::Receiver;
  12. use tokio::sync::RwLock;
  13. pub enum SearchScript {
  14. AddTask {
  15. task: Task,
  16. },
  17. AddTasks {
  18. tasks: Vec<Task>,
  19. },
  20. #[allow(dead_code)]
  21. Wait {
  22. millisecond: u64,
  23. },
  24. CancelTask {
  25. task_id: TaskId,
  26. },
  27. UnregisterHandler {
  28. handler_id: String,
  29. },
  30. AssertTaskStatus {
  31. task_id: TaskId,
  32. expected_status: TaskState,
  33. },
  34. AssertExecuteOrder {
  35. execute_order: Vec<u32>,
  36. rets: Vec<Receiver<TaskResult>>,
  37. },
  38. }
  39. pub struct SearchTest {
  40. scheduler: Arc<RwLock<TaskDispatcher>>,
  41. }
  42. impl SearchTest {
  43. pub async fn new() -> Self {
  44. let duration = Duration::from_millis(1000);
  45. let mut scheduler = TaskDispatcher::new(duration);
  46. scheduler.register_handler(Arc::new(MockTextTaskHandler()));
  47. scheduler.register_handler(Arc::new(MockBlobTaskHandler()));
  48. scheduler.register_handler(Arc::new(MockTimeoutTaskHandler()));
  49. let scheduler = Arc::new(RwLock::new(scheduler));
  50. tokio::spawn(TaskRunner::run(scheduler.clone()));
  51. Self { scheduler }
  52. }
  53. pub async fn next_task_id(&self) -> TaskId {
  54. self.scheduler.read().await.next_task_id()
  55. }
  56. pub async fn run_scripts(&self, scripts: Vec<SearchScript>) {
  57. for script in scripts {
  58. self.run_script(script).await;
  59. }
  60. }
  61. pub async fn run_script(&self, script: SearchScript) {
  62. match script {
  63. SearchScript::AddTask { task } => {
  64. self.scheduler.write().await.add_task(task);
  65. }
  66. SearchScript::CancelTask { task_id } => {
  67. self.scheduler.write().await.cancel_task(task_id);
  68. }
  69. SearchScript::AddTasks { tasks } => {
  70. let mut scheduler = self.scheduler.write().await;
  71. for task in tasks {
  72. scheduler.add_task(task);
  73. }
  74. }
  75. SearchScript::Wait { millisecond } => {
  76. tokio::time::sleep(Duration::from_millis(millisecond)).await;
  77. }
  78. SearchScript::UnregisterHandler { handler_id } => {
  79. self.scheduler.write().await.unregister_handler(handler_id).await;
  80. }
  81. SearchScript::AssertTaskStatus {
  82. task_id,
  83. expected_status,
  84. } => {
  85. let status = self.scheduler.read().await.read_task(&task_id).unwrap().state().clone();
  86. assert_eq!(status, expected_status);
  87. }
  88. SearchScript::AssertExecuteOrder { execute_order, rets } => {
  89. let mut futures = FuturesUnordered::new();
  90. for ret in rets {
  91. futures.push(ret);
  92. }
  93. let mut orders = vec![];
  94. while let Some(Ok(result)) = futures.next().await {
  95. orders.push(result.id);
  96. assert!(result.state.is_done());
  97. }
  98. assert_eq!(execute_order, orders);
  99. }
  100. }
  101. }
  102. }
  103. pub struct MockTextTaskHandler();
  104. #[async_trait]
  105. impl RefCountValue for MockTextTaskHandler {
  106. async fn did_remove(&self) {}
  107. }
  108. impl TaskHandler for MockTextTaskHandler {
  109. fn handler_id(&self) -> &str {
  110. "1"
  111. }
  112. fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> {
  113. let mut rng = rand::thread_rng();
  114. let millisecond = rng.gen_range(1..50);
  115. Box::pin(async move {
  116. match content {
  117. TaskContent::Text(_s) => {
  118. tokio::time::sleep(Duration::from_millis(millisecond)).await;
  119. }
  120. TaskContent::Blob(_) => panic!("Only support text"),
  121. }
  122. Ok(())
  123. })
  124. }
  125. }
  126. pub fn make_text_background_task(task_id: TaskId, s: &str) -> (Task, Receiver<TaskResult>) {
  127. let mut task = Task::background("1", task_id, TaskContent::Text(s.to_owned()));
  128. let recv = task.recv.take().unwrap();
  129. (task, recv)
  130. }
  131. pub fn make_text_user_interactive_task(task_id: TaskId, s: &str) -> (Task, Receiver<TaskResult>) {
  132. let mut task = Task::user_interactive("1", task_id, TaskContent::Text(s.to_owned()));
  133. let recv = task.recv.take().unwrap();
  134. (task, recv)
  135. }
  136. pub struct MockBlobTaskHandler();
  137. #[async_trait]
  138. impl RefCountValue for MockBlobTaskHandler {
  139. async fn did_remove(&self) {}
  140. }
  141. impl TaskHandler for MockBlobTaskHandler {
  142. fn handler_id(&self) -> &str {
  143. "2"
  144. }
  145. fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> {
  146. Box::pin(async move {
  147. match content {
  148. TaskContent::Text(_) => panic!("Only support blob"),
  149. TaskContent::Blob(bytes) => {
  150. let _msg = String::from_utf8(bytes).unwrap();
  151. tokio::time::sleep(Duration::from_millis(20)).await;
  152. }
  153. }
  154. Ok(())
  155. })
  156. }
  157. }
  158. pub struct MockTimeoutTaskHandler();
  159. impl TaskHandler for MockTimeoutTaskHandler {
  160. fn handler_id(&self) -> &str {
  161. "3"
  162. }
  163. fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> {
  164. Box::pin(async move {
  165. match content {
  166. TaskContent::Text(_) => panic!("Only support blob"),
  167. TaskContent::Blob(_bytes) => {
  168. tokio::time::sleep(Duration::from_millis(2000)).await;
  169. }
  170. }
  171. Ok(())
  172. })
  173. }
  174. }
  175. pub fn make_timeout_task(task_id: TaskId) -> (Task, Receiver<TaskResult>) {
  176. let mut task = Task::background("3", task_id, TaskContent::Blob(vec![]));
  177. let recv = task.recv.take().unwrap();
  178. (task, recv)
  179. }