queue.rs 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. use crate::{PendingTask, Task};
  2. use atomic_refcell::AtomicRefCell;
  3. use std::cmp::Ordering;
  4. use std::collections::hash_map::Entry;
  5. use std::collections::{BinaryHeap, HashMap};
  6. use std::ops::{Deref, DerefMut};
  7. use std::sync::Arc;
  8. #[derive(Default)]
  9. pub(crate) struct TaskQueue {
  10. // index_tasks for quick access
  11. index_tasks: HashMap<TaskHandlerId, Arc<AtomicRefCell<TaskList>>>,
  12. queue: BinaryHeap<Arc<AtomicRefCell<TaskList>>>,
  13. }
  14. impl TaskQueue {
  15. pub(crate) fn new() -> Self {
  16. Self::default()
  17. }
  18. pub(crate) fn push(&mut self, task: &Task) {
  19. if task.content.is_none() {
  20. tracing::warn!("The task:{} with empty content will be not executed", task.id);
  21. return;
  22. }
  23. let pending_task = PendingTask {
  24. qos: task.qos,
  25. id: task.id,
  26. };
  27. match self.index_tasks.entry(task.handler_id.clone()) {
  28. Entry::Occupied(entry) => {
  29. let mut list = entry.get().borrow_mut();
  30. assert!(list.peek().map(|old_id| pending_task.id >= old_id.id).unwrap_or(true));
  31. list.push(pending_task);
  32. }
  33. Entry::Vacant(entry) => {
  34. let mut task_list = TaskList::new(entry.key());
  35. task_list.push(pending_task);
  36. let task_list = Arc::new(AtomicRefCell::new(task_list));
  37. entry.insert(task_list.clone());
  38. self.queue.push(task_list);
  39. }
  40. }
  41. }
  42. #[allow(dead_code)]
  43. pub(crate) fn clear(&mut self) {
  44. self.queue.clear();
  45. }
  46. pub(crate) fn mut_head<T, F>(&mut self, mut f: F) -> Option<T>
  47. where
  48. F: FnMut(&mut TaskList) -> Option<T>,
  49. {
  50. let head = self.queue.pop()?;
  51. let result = {
  52. let mut ref_head = head.borrow_mut();
  53. f(&mut ref_head)
  54. };
  55. if !head.borrow().tasks.is_empty() {
  56. self.queue.push(head);
  57. } else {
  58. self.index_tasks.remove(&head.borrow().id);
  59. }
  60. result
  61. }
  62. }
  63. pub type TaskHandlerId = String;
  64. #[derive(Debug)]
  65. pub(crate) struct TaskList {
  66. pub(crate) id: TaskHandlerId,
  67. tasks: BinaryHeap<PendingTask>,
  68. }
  69. impl Deref for TaskList {
  70. type Target = BinaryHeap<PendingTask>;
  71. fn deref(&self) -> &Self::Target {
  72. &self.tasks
  73. }
  74. }
  75. impl DerefMut for TaskList {
  76. fn deref_mut(&mut self) -> &mut Self::Target {
  77. &mut self.tasks
  78. }
  79. }
  80. impl TaskList {
  81. fn new(id: &str) -> Self {
  82. Self {
  83. id: id.to_owned(),
  84. tasks: BinaryHeap::new(),
  85. }
  86. }
  87. }
  88. impl PartialEq for TaskList {
  89. fn eq(&self, other: &Self) -> bool {
  90. self.id == other.id
  91. }
  92. }
  93. impl Eq for TaskList {}
  94. impl Ord for TaskList {
  95. fn cmp(&self, other: &Self) -> Ordering {
  96. match (self.peek(), other.peek()) {
  97. (None, None) => Ordering::Equal,
  98. (None, Some(_)) => Ordering::Less,
  99. (Some(_), None) => Ordering::Greater,
  100. (Some(lhs), Some(rhs)) => lhs.cmp(rhs),
  101. }
  102. }
  103. }
  104. impl PartialOrd for TaskList {
  105. fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
  106. Some(self.cmp(other))
  107. }
  108. }