Browse Source

Merge pull request #647 from AppFlowy-IO/feat/task_scheduler_test

Add grid task scheduler tests & fix some bugs
Nathan.fooo 2 years ago
parent
commit
66e87a8a94

+ 1 - 5
frontend/rust-lib/flowy-grid/src/services/filter/filter_service.rs

@@ -130,11 +130,7 @@ impl GridFilterService {
         let handler_id = self.grid_pad.read().await.grid_id();
 
         let context = FilterTaskContext { blocks };
-        Task {
-            handler_id,
-            id: task_id,
-            content: TaskContent::Filter(context),
-        }
+        Task::new(&handler_id, task_id, TaskContent::Filter(context))
     }
 
     async fn notify(&self, changesets: Vec<GridBlockChangesetPB>) {

+ 7 - 7
frontend/rust-lib/flowy-grid/src/services/grid_editor_task.rs

@@ -1,23 +1,23 @@
 use crate::manager::GridTaskSchedulerRwLock;
 use crate::services::grid_editor::GridRevisionEditor;
-use crate::services::tasks::{GridTaskHandler, Task, TaskContent, TaskHandlerId, TaskId};
+use crate::services::tasks::{GridTaskHandler, Task, TaskContent, TaskId};
 use flowy_error::FlowyError;
 use futures::future::BoxFuture;
 use lib_infra::future::BoxResultFuture;
 
 pub(crate) trait GridServiceTaskScheduler: Send + Sync + 'static {
     fn gen_task_id(&self) -> BoxFuture<TaskId>;
-    fn register_task(&self, task: Task) -> BoxFuture<()>;
+    fn add_task(&self, task: Task) -> BoxFuture<()>;
 }
 
 impl GridTaskHandler for GridRevisionEditor {
-    fn handler_id(&self) -> &TaskHandlerId {
+    fn handler_id(&self) -> &str {
         &self.grid_id
     }
 
-    fn process_task(&self, task: Task) -> BoxResultFuture<(), FlowyError> {
+    fn process_content(&self, content: TaskContent) -> BoxResultFuture<(), FlowyError> {
         Box::pin(async move {
-            match task.content {
+            match content {
                 TaskContent::Snapshot => {}
                 TaskContent::Filter(context) => self.filter_service.process(context).await?,
             }
@@ -32,10 +32,10 @@ impl GridServiceTaskScheduler for GridTaskSchedulerRwLock {
         Box::pin(async move { this.read().await.next_task_id() })
     }
 
-    fn register_task(&self, task: Task) -> BoxFuture<()> {
+    fn add_task(&self, task: Task) -> BoxFuture<()> {
         let this = self.clone();
         Box::pin(async move {
-            this.write().await.register_task(task);
+            this.write().await.add_task(task);
         })
     }
 }

+ 12 - 2
frontend/rust-lib/flowy-grid/src/services/tasks/queue.rs

@@ -20,7 +20,12 @@ impl GridTaskQueue {
     }
 
     pub(crate) fn push(&mut self, task: &Task) {
-        let task_type = match task.content {
+        if task.content.is_none() {
+            tracing::warn!("Ignore task: {} with empty content", task.id);
+            return;
+        }
+
+        let task_type = match task.content.as_ref().unwrap() {
             TaskContent::Snapshot => TaskType::Snapshot,
             TaskContent::Filter { .. } => TaskType::Filter,
         };
@@ -28,7 +33,7 @@ impl GridTaskQueue {
             ty: task_type,
             id: task.id,
         };
-        match self.index_tasks.entry("1".to_owned()) {
+        match self.index_tasks.entry(task.handler_id.clone()) {
             Entry::Occupied(entry) => {
                 let mut list = entry.get().borrow_mut();
                 assert!(list.peek().map(|old_id| pending_task.id >= old_id.id).unwrap_or(true));
@@ -44,6 +49,11 @@ impl GridTaskQueue {
         }
     }
 
+    #[allow(dead_code)]
+    pub(crate) fn clear(&mut self) {
+        self.queue.clear();
+    }
+
     pub(crate) fn mut_head<T, F>(&mut self, mut f: F) -> Option<T>
     where
         F: FnMut(&mut TaskList) -> Option<T>,

+ 8 - 6
frontend/rust-lib/flowy-grid/src/services/tasks/runner.rs

@@ -1,4 +1,5 @@
 use crate::services::tasks::scheduler::GridTaskScheduler;
+
 use std::sync::Arc;
 use std::time::Duration;
 use tokio::sync::{watch, RwLock};
@@ -7,13 +8,13 @@ use tokio::time::interval;
 pub struct GridTaskRunner {
     scheduler: Arc<RwLock<GridTaskScheduler>>,
     debounce_duration: Duration,
-    notifier: Option<watch::Receiver<()>>,
+    notifier: Option<watch::Receiver<bool>>,
 }
 
 impl GridTaskRunner {
     pub fn new(
         scheduler: Arc<RwLock<GridTaskScheduler>>,
-        notifier: watch::Receiver<()>,
+        notifier: watch::Receiver<bool>,
         debounce_duration: Duration,
     ) -> Self {
         Self {
@@ -34,12 +35,13 @@ impl GridTaskRunner {
                 // The runner will be stopped if the corresponding Sender drop.
                 break;
             }
-            let mut interval = interval(self.debounce_duration);
-            interval.tick().await;
 
-            if let Err(e) = self.scheduler.write().await.process_next_task().await {
-                tracing::error!("{:?}", e);
+            if *notifier.borrow() {
+                break;
             }
+            let mut interval = interval(self.debounce_duration);
+            interval.tick().await;
+            let _ = self.scheduler.write().await.process_next_task().await;
         }
     }
 }

+ 116 - 22
frontend/rust-lib/flowy-grid/src/services/tasks/scheduler.rs

@@ -3,8 +3,8 @@ use crate::services::tasks::runner::GridTaskRunner;
 use crate::services::tasks::store::GridTaskStore;
 use crate::services::tasks::task::Task;
 
-use crate::services::tasks::TaskId;
-use flowy_error::{FlowyError, FlowyResult};
+use crate::services::tasks::{TaskContent, TaskId, TaskStatus};
+use flowy_error::FlowyError;
 use lib_infra::future::BoxResultFuture;
 use std::collections::HashMap;
 use std::sync::Arc;
@@ -12,21 +12,21 @@ use std::time::Duration;
 use tokio::sync::{watch, RwLock};
 
 pub(crate) trait GridTaskHandler: Send + Sync + 'static {
-    fn handler_id(&self) -> &TaskHandlerId;
+    fn handler_id(&self) -> &str;
 
-    fn process_task(&self, task: Task) -> BoxResultFuture<(), FlowyError>;
+    fn process_content(&self, content: TaskContent) -> BoxResultFuture<(), FlowyError>;
 }
 
 pub struct GridTaskScheduler {
     queue: GridTaskQueue,
     store: GridTaskStore,
-    notifier: watch::Sender<()>,
+    notifier: watch::Sender<bool>,
     handlers: HashMap<TaskHandlerId, Arc<dyn GridTaskHandler>>,
 }
 
 impl GridTaskScheduler {
     pub(crate) fn new() -> Arc<RwLock<Self>> {
-        let (notifier, rx) = watch::channel(());
+        let (notifier, rx) = watch::channel(false);
 
         let scheduler = Self {
             queue: GridTaskQueue::new(),
@@ -57,25 +57,38 @@ impl GridTaskScheduler {
         let _ = self.handlers.remove(handler_id.as_ref());
     }
 
-    pub(crate) async fn process_next_task(&mut self) -> FlowyResult<()> {
-        let mut get_next_task = || {
-            let pending_task = self.queue.mut_head(|list| list.pop())?;
-            let task = self.store.remove_task(&pending_task.id)?;
-            Some(task)
-        };
+    #[allow(dead_code)]
+    pub(crate) fn stop(&mut self) {
+        let _ = self.notifier.send(true);
+        self.queue.clear();
+        self.store.clear();
+    }
+
+    pub(crate) async fn process_next_task(&mut self) -> Option<()> {
+        let pending_task = self.queue.mut_head(|list| list.pop())?;
+        let mut task = self.store.remove_task(&pending_task.id)?;
+        let handler = self.handlers.get(&task.handler_id)?;
+
+        let ret = task.ret.take()?;
+        let content = task.content.take()?;
 
-        if let Some(task) = get_next_task() {
-            match self.handlers.get(&task.handler_id) {
-                None => {}
-                Some(handler) => {
-                    let _ = handler.process_task(task).await;
-                }
+        task.set_status(TaskStatus::Processing);
+        let _ = match handler.process_content(content).await {
+            Ok(_) => {
+                task.set_status(TaskStatus::Done);
+                let _ = ret.send(task.into());
             }
-        }
-        Ok(())
+            Err(e) => {
+                tracing::error!("Process task failed: {:?}", e);
+                task.set_status(TaskStatus::Failure);
+                let _ = ret.send(task.into());
+            }
+        };
+        self.notify();
+        None
     }
 
-    pub(crate) fn register_task(&mut self, task: Task) {
+    pub(crate) fn add_task(&mut self, task: Task) {
         assert!(!task.is_finished());
         self.queue.push(&task);
         self.store.insert_task(task);
@@ -87,6 +100,87 @@ impl GridTaskScheduler {
     }
 
     pub(crate) fn notify(&self) {
-        let _ = self.notifier.send(());
+        let _ = self.notifier.send(false);
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::services::grid_editor_task::GridServiceTaskScheduler;
+    use crate::services::tasks::{GridTaskHandler, GridTaskScheduler, Task, TaskContent, TaskStatus};
+    use flowy_error::FlowyError;
+    use lib_infra::future::BoxResultFuture;
+    use std::sync::Arc;
+    use std::time::Duration;
+    use tokio::time::interval;
+
+    #[tokio::test]
+    async fn task_scheduler_snapshot_task_test() {
+        let scheduler = GridTaskScheduler::new();
+        scheduler
+            .write()
+            .await
+            .register_handler(Arc::new(MockGridTaskHandler()));
+
+        let task_id = scheduler.gen_task_id().await;
+        let mut task = Task::new("1", task_id, TaskContent::Snapshot);
+        let rx = task.rx.take().unwrap();
+        scheduler.write().await.add_task(task);
+        assert_eq!(rx.await.unwrap().status, TaskStatus::Done);
+    }
+
+    #[tokio::test]
+    async fn task_scheduler_snapshot_task_cancel_test() {
+        let scheduler = GridTaskScheduler::new();
+        scheduler
+            .write()
+            .await
+            .register_handler(Arc::new(MockGridTaskHandler()));
+
+        let task_id = scheduler.gen_task_id().await;
+        let mut task = Task::new("1", task_id, TaskContent::Snapshot);
+        let rx = task.rx.take().unwrap();
+        scheduler.write().await.add_task(task);
+        scheduler.write().await.stop();
+
+        assert_eq!(rx.await.unwrap().status, TaskStatus::Cancel);
+    }
+
+    #[tokio::test]
+    async fn task_scheduler_multi_task_test() {
+        let scheduler = GridTaskScheduler::new();
+        scheduler
+            .write()
+            .await
+            .register_handler(Arc::new(MockGridTaskHandler()));
+
+        let task_id = scheduler.gen_task_id().await;
+        let mut task_1 = Task::new("1", task_id, TaskContent::Snapshot);
+        let rx_1 = task_1.rx.take().unwrap();
+
+        let task_id = scheduler.gen_task_id().await;
+        let mut task_2 = Task::new("1", task_id, TaskContent::Snapshot);
+        let rx_2 = task_2.rx.take().unwrap();
+
+        scheduler.write().await.add_task(task_1);
+        scheduler.write().await.add_task(task_2);
+
+        assert_eq!(rx_1.await.unwrap().status, TaskStatus::Done);
+        assert_eq!(rx_2.await.unwrap().status, TaskStatus::Done);
+    }
+    struct MockGridTaskHandler();
+    impl GridTaskHandler for MockGridTaskHandler {
+        fn handler_id(&self) -> &str {
+            "1"
+        }
+
+        fn process_content(&self, _content: TaskContent) -> BoxResultFuture<(), FlowyError> {
+            Box::pin(async move {
+                let mut interval = interval(Duration::from_secs(1));
+                interval.tick().await;
+                interval.tick().await;
+                Ok(())
+            })
+        }
     }
 }

+ 14 - 1
frontend/rust-lib/flowy-grid/src/services/tasks/store.rs

@@ -1,6 +1,7 @@
 use crate::services::tasks::task::Task;
-use crate::services::tasks::TaskId;
+use crate::services::tasks::{TaskId, TaskStatus};
 use std::collections::HashMap;
+use std::mem;
 use std::sync::atomic::AtomicU32;
 use std::sync::atomic::Ordering::SeqCst;
 
@@ -25,6 +26,18 @@ impl GridTaskStore {
         self.tasks.remove(task_id)
     }
 
+    #[allow(dead_code)]
+    pub(crate) fn clear(&mut self) {
+        let tasks = mem::take(&mut self.tasks);
+        tasks.into_values().for_each(|mut task| {
+            if task.ret.is_some() {
+                let ret = task.ret.take().unwrap();
+                task.set_status(TaskStatus::Cancel);
+                let _ = ret.send(task.into());
+            }
+        });
+    }
+
     pub(crate) fn next_task_id(&self) -> TaskId {
         let _ = self.task_id_counter.fetch_add(1, SeqCst);
         self.task_id_counter.load(SeqCst)

+ 49 - 2
frontend/rust-lib/flowy-grid/src/services/tasks/task.rs

@@ -1,3 +1,5 @@
+#![allow(clippy::all)]
+#![allow(dead_code)]
 use crate::services::row::GridBlockSnapshot;
 use crate::services::tasks::queue::TaskHandlerId;
 use std::cmp::Ordering;
@@ -60,14 +62,59 @@ pub(crate) enum TaskContent {
     Filter(FilterTaskContext),
 }
 
+#[derive(Debug, Eq, PartialEq)]
+pub(crate) enum TaskStatus {
+    Pending,
+    Processing,
+    Done,
+    Failure,
+    Cancel,
+}
+
 pub(crate) struct Task {
+    pub id: TaskId,
     pub handler_id: TaskHandlerId,
+    pub content: Option<TaskContent>,
+    status: TaskStatus,
+    pub ret: Option<tokio::sync::oneshot::Sender<TaskResult>>,
+    pub rx: Option<tokio::sync::oneshot::Receiver<TaskResult>>,
+}
+
+pub(crate) struct TaskResult {
     pub id: TaskId,
-    pub content: TaskContent,
+    pub(crate) status: TaskStatus,
+}
+
+impl std::convert::From<Task> for TaskResult {
+    fn from(task: Task) -> Self {
+        TaskResult {
+            id: task.id,
+            status: task.status,
+        }
+    }
 }
 
 impl Task {
+    pub fn new(handler_id: &str, id: TaskId, content: TaskContent) -> Self {
+        let (ret, rx) = tokio::sync::oneshot::channel();
+        Self {
+            handler_id: handler_id.to_owned(),
+            id,
+            content: Some(content),
+            ret: Some(ret),
+            rx: Some(rx),
+            status: TaskStatus::Pending,
+        }
+    }
+
+    pub fn set_status(&mut self, status: TaskStatus) {
+        self.status = status;
+    }
+
     pub fn is_finished(&self) -> bool {
-        todo!()
+        match self.status {
+            TaskStatus::Done => true,
+            _ => false,
+        }
     }
 }