script.rs 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. use anyhow::Error;
  2. use flowy_task::{Task, TaskContent, TaskDispatcher, TaskHandler, TaskId, TaskResult, TaskRunner, TaskState};
  3. use futures::stream::FuturesUnordered;
  4. use futures::StreamExt;
  5. use lib_infra::future::BoxResultFuture;
  6. use lib_infra::ref_map::RefCountValue;
  7. use rand::Rng;
  8. use std::sync::Arc;
  9. use std::time::Duration;
  10. use tokio::sync::oneshot::Receiver;
  11. use tokio::sync::RwLock;
  12. pub enum SearchScript {
  13. AddTask {
  14. task: Task,
  15. },
  16. AddTasks {
  17. tasks: Vec<Task>,
  18. },
  19. #[allow(dead_code)]
  20. Wait {
  21. millisecond: u64,
  22. },
  23. CancelTask {
  24. task_id: TaskId,
  25. },
  26. UnregisterHandler {
  27. handler_id: String,
  28. },
  29. AssertTaskStatus {
  30. task_id: TaskId,
  31. expected_status: TaskState,
  32. },
  33. AssertExecuteOrder {
  34. execute_order: Vec<u32>,
  35. rets: Vec<Receiver<TaskResult>>,
  36. },
  37. }
  38. pub struct SearchTest {
  39. scheduler: Arc<RwLock<TaskDispatcher>>,
  40. }
  41. impl SearchTest {
  42. pub async fn new() -> Self {
  43. let duration = Duration::from_millis(1000);
  44. let mut scheduler = TaskDispatcher::new(duration);
  45. scheduler.register_handler(Arc::new(MockTextTaskHandler()));
  46. scheduler.register_handler(Arc::new(MockBlobTaskHandler()));
  47. scheduler.register_handler(Arc::new(MockTimeoutTaskHandler()));
  48. let scheduler = Arc::new(RwLock::new(scheduler));
  49. tokio::spawn(TaskRunner::run(scheduler.clone()));
  50. Self { scheduler }
  51. }
  52. pub async fn next_task_id(&self) -> TaskId {
  53. self.scheduler.read().await.next_task_id()
  54. }
  55. pub async fn run_scripts(&self, scripts: Vec<SearchScript>) {
  56. for script in scripts {
  57. self.run_script(script).await;
  58. }
  59. }
  60. pub async fn run_script(&self, script: SearchScript) {
  61. match script {
  62. SearchScript::AddTask { task } => {
  63. self.scheduler.write().await.add_task(task);
  64. }
  65. SearchScript::CancelTask { task_id } => {
  66. self.scheduler.write().await.cancel_task(task_id);
  67. }
  68. SearchScript::AddTasks { tasks } => {
  69. let mut scheduler = self.scheduler.write().await;
  70. for task in tasks {
  71. scheduler.add_task(task);
  72. }
  73. }
  74. SearchScript::Wait { millisecond } => {
  75. tokio::time::sleep(Duration::from_millis(millisecond)).await;
  76. }
  77. SearchScript::UnregisterHandler { handler_id } => {
  78. self.scheduler.write().await.unregister_handler(handler_id);
  79. }
  80. SearchScript::AssertTaskStatus {
  81. task_id,
  82. expected_status,
  83. } => {
  84. let status = self.scheduler.read().await.read_task(&task_id).unwrap().state().clone();
  85. assert_eq!(status, expected_status);
  86. }
  87. SearchScript::AssertExecuteOrder { execute_order, rets } => {
  88. let mut futures = FuturesUnordered::new();
  89. for ret in rets {
  90. futures.push(ret);
  91. }
  92. let mut orders = vec![];
  93. while let Some(Ok(result)) = futures.next().await {
  94. orders.push(result.id);
  95. assert!(result.state.is_done());
  96. }
  97. assert_eq!(execute_order, orders);
  98. }
  99. }
  100. }
  101. }
  102. pub struct MockTextTaskHandler();
  103. impl RefCountValue for MockTextTaskHandler {
  104. fn did_remove(&self) {}
  105. }
  106. impl TaskHandler for MockTextTaskHandler {
  107. fn handler_id(&self) -> &str {
  108. "1"
  109. }
  110. fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> {
  111. let mut rng = rand::thread_rng();
  112. let millisecond = rng.gen_range(1..50);
  113. Box::pin(async move {
  114. match content {
  115. TaskContent::Text(_s) => {
  116. tokio::time::sleep(Duration::from_millis(millisecond)).await;
  117. }
  118. TaskContent::Blob(_) => panic!("Only support text"),
  119. }
  120. Ok(())
  121. })
  122. }
  123. }
  124. pub fn make_text_background_task(task_id: TaskId, s: &str) -> (Task, Receiver<TaskResult>) {
  125. let mut task = Task::background("1", task_id, TaskContent::Text(s.to_owned()));
  126. let recv = task.recv.take().unwrap();
  127. (task, recv)
  128. }
  129. pub fn make_text_user_interactive_task(task_id: TaskId, s: &str) -> (Task, Receiver<TaskResult>) {
  130. let mut task = Task::user_interactive("1", task_id, TaskContent::Text(s.to_owned()));
  131. let recv = task.recv.take().unwrap();
  132. (task, recv)
  133. }
  134. pub struct MockBlobTaskHandler();
  135. impl RefCountValue for MockBlobTaskHandler {
  136. fn did_remove(&self) {}
  137. }
  138. impl TaskHandler for MockBlobTaskHandler {
  139. fn handler_id(&self) -> &str {
  140. "2"
  141. }
  142. fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> {
  143. Box::pin(async move {
  144. match content {
  145. TaskContent::Text(_) => panic!("Only support blob"),
  146. TaskContent::Blob(bytes) => {
  147. let _msg = String::from_utf8(bytes).unwrap();
  148. tokio::time::sleep(Duration::from_millis(20)).await;
  149. }
  150. }
  151. Ok(())
  152. })
  153. }
  154. }
  155. pub struct MockTimeoutTaskHandler();
  156. impl TaskHandler for MockTimeoutTaskHandler {
  157. fn handler_id(&self) -> &str {
  158. "3"
  159. }
  160. fn run(&self, content: TaskContent) -> BoxResultFuture<(), Error> {
  161. Box::pin(async move {
  162. match content {
  163. TaskContent::Text(_) => panic!("Only support blob"),
  164. TaskContent::Blob(_bytes) => {
  165. tokio::time::sleep(Duration::from_millis(2000)).await;
  166. }
  167. }
  168. Ok(())
  169. })
  170. }
  171. }
  172. pub fn make_timeout_task(task_id: TaskId) -> (Task, Receiver<TaskResult>) {
  173. let mut task = Task::background("3", task_id, TaskContent::Blob(vec![]));
  174. let recv = task.recv.take().unwrap();
  175. (task, recv)
  176. }