queue.rs 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. use crate::services::tasks::task::{PendingTask, Task, TaskContent, TaskType};
  2. use atomic_refcell::AtomicRefCell;
  3. use std::cmp::Ordering;
  4. use std::collections::hash_map::Entry;
  5. use std::collections::{BinaryHeap, HashMap};
  6. use std::ops::{Deref, DerefMut};
  7. use std::sync::Arc;
  8. #[derive(Default)]
  9. pub(crate) struct GridTaskQueue {
  10. // index_tasks for quick access
  11. index_tasks: HashMap<TaskHandlerId, Arc<AtomicRefCell<TaskList>>>,
  12. queue: BinaryHeap<Arc<AtomicRefCell<TaskList>>>,
  13. }
  14. impl GridTaskQueue {
  15. pub(crate) fn new() -> Self {
  16. Self::default()
  17. }
  18. pub(crate) fn push(&mut self, task: &Task) {
  19. let task_type = match task.content {
  20. TaskContent::Snapshot { .. } => TaskType::Snapshot,
  21. TaskContent::Filter => TaskType::Filter,
  22. };
  23. let pending_task = PendingTask {
  24. ty: task_type,
  25. id: task.id,
  26. };
  27. match self.index_tasks.entry("1".to_owned()) {
  28. Entry::Occupied(entry) => {
  29. let mut list = entry.get().borrow_mut();
  30. assert!(list.peek().map(|old_id| pending_task.id >= old_id.id).unwrap_or(true));
  31. list.push(pending_task);
  32. }
  33. Entry::Vacant(entry) => {
  34. let mut task_list = TaskList::new(entry.key());
  35. task_list.push(pending_task);
  36. let task_list = Arc::new(AtomicRefCell::new(task_list));
  37. entry.insert(task_list.clone());
  38. self.queue.push(task_list);
  39. }
  40. }
  41. }
  42. pub(crate) fn mut_head<T, F>(&mut self, mut f: F) -> Option<T>
  43. where
  44. F: FnMut(&mut TaskList) -> Option<T>,
  45. {
  46. let head = self.queue.pop()?;
  47. let result = {
  48. let mut ref_head = head.borrow_mut();
  49. f(&mut *ref_head)
  50. };
  51. if !head.borrow().tasks.is_empty() {
  52. self.queue.push(head);
  53. } else {
  54. self.index_tasks.remove(&head.borrow().id);
  55. }
  56. result
  57. }
  58. }
  59. pub type TaskHandlerId = String;
  60. #[derive(Debug)]
  61. pub(crate) struct TaskList {
  62. pub(crate) id: TaskHandlerId,
  63. tasks: BinaryHeap<PendingTask>,
  64. }
  65. impl Deref for TaskList {
  66. type Target = BinaryHeap<PendingTask>;
  67. fn deref(&self) -> &Self::Target {
  68. &self.tasks
  69. }
  70. }
  71. impl DerefMut for TaskList {
  72. fn deref_mut(&mut self) -> &mut Self::Target {
  73. &mut self.tasks
  74. }
  75. }
  76. impl TaskList {
  77. fn new(id: &str) -> Self {
  78. Self {
  79. id: id.to_owned(),
  80. tasks: BinaryHeap::new(),
  81. }
  82. }
  83. }
  84. impl PartialEq for TaskList {
  85. fn eq(&self, other: &Self) -> bool {
  86. self.id == other.id
  87. }
  88. }
  89. impl Eq for TaskList {}
  90. impl Ord for TaskList {
  91. fn cmp(&self, other: &Self) -> Ordering {
  92. match (self.peek(), other.peek()) {
  93. (None, None) => Ordering::Equal,
  94. (None, Some(_)) => Ordering::Less,
  95. (Some(_), None) => Ordering::Greater,
  96. (Some(lhs), Some(rhs)) => lhs.cmp(rhs),
  97. }
  98. }
  99. }
  100. impl PartialOrd for TaskList {
  101. fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
  102. Some(self.cmp(other))
  103. }
  104. }