queue.rs 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. use crate::{PendingTask, Task};
  2. use atomic_refcell::AtomicRefCell;
  3. use std::cmp::Ordering;
  4. use std::collections::hash_map::Entry;
  5. use std::collections::{BinaryHeap, HashMap};
  6. use std::ops::{Deref, DerefMut};
  7. use std::sync::Arc;
  8. #[derive(Default)]
  9. pub(crate) struct TaskQueue {
  10. // index_tasks for quick access
  11. index_tasks: HashMap<TaskHandlerId, Arc<AtomicRefCell<TaskList>>>,
  12. queue: BinaryHeap<Arc<AtomicRefCell<TaskList>>>,
  13. }
  14. impl TaskQueue {
  15. pub(crate) fn new() -> Self {
  16. Self::default()
  17. }
  18. pub(crate) fn push(&mut self, task: &Task) {
  19. if task.content.is_none() {
  20. tracing::warn!(
  21. "The task:{} with empty content will be not executed",
  22. task.id
  23. );
  24. return;
  25. }
  26. let pending_task = PendingTask {
  27. qos: task.qos,
  28. id: task.id,
  29. };
  30. match self.index_tasks.entry(task.handler_id.clone()) {
  31. Entry::Occupied(entry) => {
  32. let mut list = entry.get().borrow_mut();
  33. assert!(list
  34. .peek()
  35. .map(|old_id| pending_task.id >= old_id.id)
  36. .unwrap_or(true));
  37. list.push(pending_task);
  38. },
  39. Entry::Vacant(entry) => {
  40. let mut task_list = TaskList::new(entry.key());
  41. task_list.push(pending_task);
  42. let task_list = Arc::new(AtomicRefCell::new(task_list));
  43. entry.insert(task_list.clone());
  44. self.queue.push(task_list);
  45. },
  46. }
  47. }
  48. #[allow(dead_code)]
  49. pub(crate) fn clear(&mut self) {
  50. self.queue.clear();
  51. }
  52. pub(crate) fn mut_head<T, F>(&mut self, mut f: F) -> Option<T>
  53. where
  54. F: FnMut(&mut TaskList) -> Option<T>,
  55. {
  56. let head = self.queue.pop()?;
  57. let result = {
  58. let mut ref_head = head.borrow_mut();
  59. f(&mut ref_head)
  60. };
  61. if !head.borrow().tasks.is_empty() {
  62. self.queue.push(head);
  63. } else {
  64. self.index_tasks.remove(&head.borrow().id);
  65. }
  66. result
  67. }
  68. }
  69. pub type TaskHandlerId = String;
  70. #[derive(Debug)]
  71. pub(crate) struct TaskList {
  72. pub(crate) id: TaskHandlerId,
  73. tasks: BinaryHeap<PendingTask>,
  74. }
  75. impl Deref for TaskList {
  76. type Target = BinaryHeap<PendingTask>;
  77. fn deref(&self) -> &Self::Target {
  78. &self.tasks
  79. }
  80. }
  81. impl DerefMut for TaskList {
  82. fn deref_mut(&mut self) -> &mut Self::Target {
  83. &mut self.tasks
  84. }
  85. }
  86. impl TaskList {
  87. fn new(id: &str) -> Self {
  88. Self {
  89. id: id.to_owned(),
  90. tasks: BinaryHeap::new(),
  91. }
  92. }
  93. }
  94. impl PartialEq for TaskList {
  95. fn eq(&self, other: &Self) -> bool {
  96. self.id == other.id
  97. }
  98. }
  99. impl Eq for TaskList {}
  100. impl Ord for TaskList {
  101. fn cmp(&self, other: &Self) -> Ordering {
  102. match (self.peek(), other.peek()) {
  103. (None, None) => Ordering::Equal,
  104. (None, Some(_)) => Ordering::Less,
  105. (Some(_), None) => Ordering::Greater,
  106. (Some(lhs), Some(rhs)) => lhs.cmp(rhs),
  107. }
  108. }
  109. }
  110. impl PartialOrd for TaskList {
  111. fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
  112. Some(self.cmp(other))
  113. }
  114. }