scheduler.rs 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. use crate::services::tasks::queue::{GridTaskQueue, TaskHandlerId};
  2. use crate::services::tasks::runner::GridTaskRunner;
  3. use crate::services::tasks::store::GridTaskStore;
  4. use crate::services::tasks::task::Task;
  5. use flowy_error::{FlowyError, FlowyResult};
  6. use lib_infra::future::BoxResultFuture;
  7. use std::collections::HashMap;
  8. use std::sync::Arc;
  9. use std::time::Duration;
  10. use tokio::sync::{watch, RwLock};
  11. pub trait GridTaskHandler: Send + Sync + 'static {
  12. fn handler_id(&self) -> &TaskHandlerId;
  13. fn process_task(&self, task: Task) -> BoxResultFuture<(), FlowyError>;
  14. }
  15. pub struct GridTaskScheduler {
  16. queue: GridTaskQueue,
  17. store: GridTaskStore,
  18. notifier: watch::Sender<()>,
  19. handlers: HashMap<TaskHandlerId, Arc<dyn GridTaskHandler>>,
  20. }
  21. impl GridTaskScheduler {
  22. pub fn new() -> Arc<RwLock<Self>> {
  23. let (notifier, rx) = watch::channel(());
  24. let scheduler = Self {
  25. queue: GridTaskQueue::new(),
  26. store: GridTaskStore::new(),
  27. notifier,
  28. handlers: HashMap::new(),
  29. };
  30. // The runner will receive the newest value after start running.
  31. scheduler.notify();
  32. let scheduler = Arc::new(RwLock::new(scheduler));
  33. let debounce_duration = Duration::from_millis(300);
  34. let runner = GridTaskRunner::new(scheduler.clone(), rx, debounce_duration);
  35. tokio::spawn(runner.run());
  36. scheduler
  37. }
  38. pub fn register_handler<T>(&mut self, handler: Arc<T>)
  39. where
  40. T: GridTaskHandler,
  41. {
  42. let handler_id = handler.handler_id().to_owned();
  43. self.handlers.insert(handler_id, handler);
  44. }
  45. pub fn unregister_handler<T: AsRef<str>>(&mut self, handler_id: T) {
  46. let _ = self.handlers.remove(handler_id.as_ref());
  47. }
  48. pub async fn process_next_task(&mut self) -> FlowyResult<()> {
  49. let mut get_next_task = || {
  50. let pending_task = self.queue.mut_head(|list| list.pop())?;
  51. let task = self.store.remove_task(&pending_task.id)?;
  52. Some(task)
  53. };
  54. if let Some(task) = get_next_task() {
  55. match self.handlers.get(&task.hid) {
  56. None => {}
  57. Some(handler) => {
  58. let _ = handler.process_task(task).await;
  59. }
  60. }
  61. }
  62. Ok(())
  63. }
  64. pub fn register_task(&mut self, task: Task) {
  65. assert!(!task.is_finished());
  66. self.queue.push(&task);
  67. self.store.insert_task(task);
  68. self.notify();
  69. }
  70. pub fn notify(&self) {
  71. let _ = self.notifier.send(());
  72. }
  73. }