queue.rs 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. use crate::services::tasks::task::{PendingTask, Task, TaskContent, TaskType};
  2. use atomic_refcell::AtomicRefCell;
  3. use std::cmp::Ordering;
  4. use std::collections::hash_map::Entry;
  5. use std::collections::{BinaryHeap, HashMap};
  6. use std::ops::{Deref, DerefMut};
  7. use std::sync::Arc;
  8. #[derive(Default)]
  9. pub(crate) struct GridTaskQueue {
  10. // index_tasks for quick access
  11. index_tasks: HashMap<TaskHandlerId, Arc<AtomicRefCell<TaskList>>>,
  12. queue: BinaryHeap<Arc<AtomicRefCell<TaskList>>>,
  13. }
  14. impl GridTaskQueue {
  15. pub(crate) fn new() -> Self {
  16. Self::default()
  17. }
  18. pub(crate) fn push(&mut self, task: &Task) {
  19. if task.content.is_none() {
  20. tracing::warn!("Ignore task: {} with empty content", task.id);
  21. return;
  22. }
  23. let task_type = match task.content.as_ref().unwrap() {
  24. TaskContent::Snapshot => TaskType::Snapshot,
  25. TaskContent::Filter { .. } => TaskType::Filter,
  26. };
  27. let pending_task = PendingTask {
  28. ty: task_type,
  29. id: task.id,
  30. };
  31. match self.index_tasks.entry(task.handler_id.clone()) {
  32. Entry::Occupied(entry) => {
  33. let mut list = entry.get().borrow_mut();
  34. assert!(list.peek().map(|old_id| pending_task.id >= old_id.id).unwrap_or(true));
  35. list.push(pending_task);
  36. }
  37. Entry::Vacant(entry) => {
  38. let mut task_list = TaskList::new(entry.key());
  39. task_list.push(pending_task);
  40. let task_list = Arc::new(AtomicRefCell::new(task_list));
  41. entry.insert(task_list.clone());
  42. self.queue.push(task_list);
  43. }
  44. }
  45. }
  46. #[allow(dead_code)]
  47. pub(crate) fn clear(&mut self) {
  48. self.queue.clear();
  49. }
  50. pub(crate) fn mut_head<T, F>(&mut self, mut f: F) -> Option<T>
  51. where
  52. F: FnMut(&mut TaskList) -> Option<T>,
  53. {
  54. let head = self.queue.pop()?;
  55. let result = {
  56. let mut ref_head = head.borrow_mut();
  57. f(&mut *ref_head)
  58. };
  59. if !head.borrow().tasks.is_empty() {
  60. self.queue.push(head);
  61. } else {
  62. self.index_tasks.remove(&head.borrow().id);
  63. }
  64. result
  65. }
  66. }
  67. pub type TaskHandlerId = String;
  68. #[derive(Debug)]
  69. pub(crate) struct TaskList {
  70. pub(crate) id: TaskHandlerId,
  71. tasks: BinaryHeap<PendingTask>,
  72. }
  73. impl Deref for TaskList {
  74. type Target = BinaryHeap<PendingTask>;
  75. fn deref(&self) -> &Self::Target {
  76. &self.tasks
  77. }
  78. }
  79. impl DerefMut for TaskList {
  80. fn deref_mut(&mut self) -> &mut Self::Target {
  81. &mut self.tasks
  82. }
  83. }
  84. impl TaskList {
  85. fn new(id: &str) -> Self {
  86. Self {
  87. id: id.to_owned(),
  88. tasks: BinaryHeap::new(),
  89. }
  90. }
  91. }
  92. impl PartialEq for TaskList {
  93. fn eq(&self, other: &Self) -> bool {
  94. self.id == other.id
  95. }
  96. }
  97. impl Eq for TaskList {}
  98. impl Ord for TaskList {
  99. fn cmp(&self, other: &Self) -> Ordering {
  100. match (self.peek(), other.peek()) {
  101. (None, None) => Ordering::Equal,
  102. (None, Some(_)) => Ordering::Less,
  103. (Some(_), None) => Ordering::Greater,
  104. (Some(lhs), Some(rhs)) => lhs.cmp(rhs),
  105. }
  106. }
  107. }
  108. impl PartialOrd for TaskList {
  109. fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
  110. Some(self.cmp(other))
  111. }
  112. }