Browse Source

chore: config task runner

appflowy 2 years ago
parent
commit
1b38ff8559
24 changed files with 431 additions and 113 deletions
  1. 7 0
      frontend/rust-lib/Cargo.lock
  2. 1 0
      frontend/rust-lib/flowy-grid/Cargo.toml
  3. 26 14
      frontend/rust-lib/flowy-grid/src/manager.rs
  4. 6 6
      frontend/rust-lib/flowy-grid/src/services/block_manager.rs
  5. 0 1
      frontend/rust-lib/flowy-grid/src/services/filter/filter_runner.rs
  6. 6 0
      frontend/rust-lib/flowy-grid/src/services/filter/filter_service.rs
  7. 2 2
      frontend/rust-lib/flowy-grid/src/services/filter/mod.rs
  8. 19 8
      frontend/rust-lib/flowy-grid/src/services/grid_editor.rs
  9. 21 0
      frontend/rust-lib/flowy-grid/src/services/grid_editor_task.rs
  10. 3 1
      frontend/rust-lib/flowy-grid/src/services/mod.rs
  11. 1 1
      frontend/rust-lib/flowy-grid/src/services/row/cell_data_operation.rs
  12. 3 0
      frontend/rust-lib/flowy-grid/src/services/snapshot/mod.rs
  13. 7 0
      frontend/rust-lib/flowy-grid/src/services/snapshot/snapshot_service.rs
  14. 0 0
      frontend/rust-lib/flowy-grid/src/services/tasks/filter/mod.rs
  15. 6 1
      frontend/rust-lib/flowy-grid/src/services/tasks/mod.rs
  16. 118 0
      frontend/rust-lib/flowy-grid/src/services/tasks/queue.rs
  17. 45 0
      frontend/rust-lib/flowy-grid/src/services/tasks/runner.rs
  18. 53 66
      frontend/rust-lib/flowy-grid/src/services/tasks/scheduler.rs
  19. 11 0
      frontend/rust-lib/flowy-grid/src/services/tasks/store.rs
  20. 67 0
      frontend/rust-lib/flowy-grid/src/services/tasks/task.rs
  21. 1 1
      frontend/rust-lib/flowy-grid/tests/grid/cell_test.rs
  22. 11 11
      frontend/rust-lib/flowy-grid/tests/grid/script.rs
  23. 9 0
      frontend/rust-lib/flowy-text-block/src/editor.rs
  24. 8 1
      shared-lib/flowy-sync/src/client_grid/grid_block_revsion_pad.rs

+ 7 - 0
frontend/rust-lib/Cargo.lock

@@ -93,6 +93,12 @@ dependencies = [
  "autocfg",
 ]
 
+[[package]]
+name = "atomic_refcell"
+version = "0.1.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "73b5e5f48b927f04e952dedc932f31995a65a0bf65ec971c74436e51bf6e970d"
+
 [[package]]
 name = "atty"
 version = "0.2.14"
@@ -925,6 +931,7 @@ dependencies = [
 name = "flowy-grid"
 version = "0.1.0"
 dependencies = [
+ "atomic_refcell",
  "bytes",
  "chrono",
  "dart-notify",

+ 1 - 0
frontend/rust-lib/flowy-grid/Cargo.toml

@@ -39,6 +39,7 @@ fancy-regex = "0.10.0"
 regex = "1.5.6"
 url = { version = "2"}
 futures = "0.3.15"
+atomic_refcell = "0.1.8"
 
 [dev-dependencies]
 flowy-test = { path = "../flowy-test" }

+ 26 - 14
frontend/rust-lib/flowy-grid/src/manager.rs

@@ -2,16 +2,18 @@ use crate::services::grid_editor::GridRevisionEditor;
 use crate::services::persistence::block_index::BlockIndexCache;
 use crate::services::persistence::kv::GridKVPersistence;
 use crate::services::persistence::GridDatabase;
+use crate::services::tasks::GridTaskScheduler;
 use bytes::Bytes;
 use dashmap::DashMap;
 use flowy_database::ConnectionPool;
 use flowy_error::{FlowyError, FlowyResult};
-use flowy_grid_data_model::revision::{BuildGridContext, GridRevision, GridSettingRevision};
+use flowy_grid_data_model::revision::{BuildGridContext, GridRevision};
 use flowy_revision::disk::{SQLiteGridBlockMetaRevisionPersistence, SQLiteGridRevisionPersistence};
 use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket};
 use flowy_sync::client_grid::{make_block_meta_delta, make_grid_delta};
 use flowy_sync::entities::revision::{RepeatedRevision, Revision};
 use std::sync::Arc;
+use tokio::sync::RwLock;
 
 pub trait GridUser: Send + Sync {
     fn user_id(&self) -> Result<String, FlowyError>;
@@ -20,11 +22,12 @@ pub trait GridUser: Send + Sync {
 }
 
 pub struct GridManager {
-    editor_map: Arc<DashMap<String, Arc<GridRevisionEditor>>>,
+    grid_editors: Arc<DashMap<String, Arc<GridRevisionEditor>>>,
     grid_user: Arc<dyn GridUser>,
     block_index_cache: Arc<BlockIndexCache>,
     #[allow(dead_code)]
     kv_persistence: Arc<GridKVPersistence>,
+    task_scheduler: Arc<RwLock<GridTaskScheduler>>,
 }
 
 impl GridManager {
@@ -35,12 +38,14 @@ impl GridManager {
     ) -> Self {
         let grid_editors = Arc::new(DashMap::new());
         let kv_persistence = Arc::new(GridKVPersistence::new(database.clone()));
-        let block_index_persistence = Arc::new(BlockIndexCache::new(database));
+        let block_index_cache = Arc::new(BlockIndexCache::new(database));
+        let task_scheduler = GridTaskScheduler::new();
         Self {
-            editor_map: grid_editors,
+            grid_editors,
             grid_user,
             kv_persistence,
-            block_index_cache: block_index_persistence,
+            block_index_cache,
+            task_scheduler,
         }
     }
 
@@ -77,7 +82,7 @@ impl GridManager {
     pub fn close_grid<T: AsRef<str>>(&self, grid_id: T) -> FlowyResult<()> {
         let grid_id = grid_id.as_ref();
         tracing::Span::current().record("grid_id", &grid_id);
-        self.editor_map.remove(grid_id);
+        self.grid_editors.remove(grid_id);
         Ok(())
     }
 
@@ -85,7 +90,7 @@ impl GridManager {
     pub fn delete_grid<T: AsRef<str>>(&self, grid_id: T) -> FlowyResult<()> {
         let grid_id = grid_id.as_ref();
         tracing::Span::current().record("grid_id", &grid_id);
-        self.editor_map.remove(grid_id);
+        self.grid_editors.remove(grid_id);
         Ok(())
     }
 
@@ -93,23 +98,23 @@ impl GridManager {
 
     // #[tracing::instrument(level = "debug", skip(self), err)]
     pub fn get_grid_editor(&self, grid_id: &str) -> FlowyResult<Arc<GridRevisionEditor>> {
-        match self.editor_map.get(grid_id) {
+        match self.grid_editors.get(grid_id) {
             None => Err(FlowyError::internal().context("Should call open_grid function first")),
             Some(editor) => Ok(editor.clone()),
         }
     }
 
     async fn get_or_create_grid_editor(&self, grid_id: &str) -> FlowyResult<Arc<GridRevisionEditor>> {
-        match self.editor_map.get(grid_id) {
+        match self.grid_editors.get(grid_id) {
             None => {
                 tracing::trace!("Create grid editor with id: {}", grid_id);
                 let db_pool = self.grid_user.db_pool()?;
                 let editor = self.make_grid_editor(grid_id, db_pool).await?;
 
-                if self.editor_map.contains_key(grid_id) {
+                if self.grid_editors.contains_key(grid_id) {
                     tracing::warn!("Grid:{} already exists in cache", grid_id);
                 }
-                self.editor_map.insert(grid_id.to_string(), editor.clone());
+                self.grid_editors.insert(grid_id.to_string(), editor.clone());
                 Ok(editor)
             }
             Some(editor) => Ok(editor.clone()),
@@ -124,7 +129,14 @@ impl GridManager {
     ) -> Result<Arc<GridRevisionEditor>, FlowyError> {
         let user = self.grid_user.clone();
         let rev_manager = self.make_grid_rev_manager(grid_id, pool.clone())?;
-        let grid_editor = GridRevisionEditor::new(grid_id, user, rev_manager, self.block_index_cache.clone()).await?;
+        let grid_editor = GridRevisionEditor::new(
+            grid_id,
+            user,
+            rev_manager,
+            self.block_index_cache.clone(),
+            self.task_scheduler.clone(),
+        )
+        .await?;
         Ok(grid_editor)
     }
 
@@ -164,10 +176,10 @@ pub async fn make_grid_view_data(
         });
 
         // Create grid's block
-        let grid_block_meta_delta = make_block_meta_delta(&block_meta_data);
+        let grid_block_meta_delta = make_block_meta_delta(block_meta_data);
         let block_meta_delta_data = grid_block_meta_delta.to_delta_bytes();
         let repeated_revision: RepeatedRevision =
-            Revision::initial_revision(user_id, &block_id, block_meta_delta_data).into();
+            Revision::initial_revision(user_id, block_id, block_meta_delta_data).into();
         let _ = grid_manager
             .create_grid_block_meta(&block_id, repeated_revision)
             .await?;

+ 6 - 6
frontend/rust-lib/flowy-grid/src/services/block_manager.rs

@@ -30,10 +30,10 @@ impl GridBlockManager {
     pub(crate) async fn new(
         grid_id: &str,
         user: &Arc<dyn GridUser>,
-        block_revs: Vec<Arc<GridBlockMetaRevision>>,
+        block_meta_revs: Vec<Arc<GridBlockMetaRevision>>,
         persistence: Arc<BlockIndexCache>,
     ) -> FlowyResult<Self> {
-        let editor_map = make_block_meta_editor_map(user, block_revs).await?;
+        let editor_map = make_block_meta_editor_map(user, block_meta_revs).await?;
         let user = user.clone();
         let grid_id = grid_id.to_owned();
         let manager = Self {
@@ -265,12 +265,12 @@ impl GridBlockManager {
 
 async fn make_block_meta_editor_map(
     user: &Arc<dyn GridUser>,
-    block_revs: Vec<Arc<GridBlockMetaRevision>>,
+    block_meta_revs: Vec<Arc<GridBlockMetaRevision>>,
 ) -> FlowyResult<DashMap<String, Arc<GridBlockRevisionEditor>>> {
     let editor_map = DashMap::new();
-    for block_rev in block_revs {
-        let editor = make_block_meta_editor(user, &block_rev.block_id).await?;
-        editor_map.insert(block_rev.block_id.clone(), Arc::new(editor));
+    for block_meta_rev in block_meta_revs {
+        let editor = make_block_meta_editor(user, &block_meta_rev.block_id).await?;
+        editor_map.insert(block_meta_rev.block_id.clone(), Arc::new(editor));
     }
 
     Ok(editor_map)

+ 0 - 1
frontend/rust-lib/flowy-grid/src/services/filter/filter_runner.rs

@@ -1 +0,0 @@
-pub(crate) struct FilterRunner {}

+ 6 - 0
frontend/rust-lib/flowy-grid/src/services/filter/filter_service.rs

@@ -0,0 +1,6 @@
+pub struct GridFilterService {}
+impl GridFilterService {
+    pub fn new() -> Self {
+        Self {}
+    }
+}

+ 2 - 2
frontend/rust-lib/flowy-grid/src/services/filter/mod.rs

@@ -1,3 +1,3 @@
-mod filter_runner;
+mod filter_service;
 
-pub use filter_runner::*;
+pub use filter_service::*;

+ 19 - 8
frontend/rust-lib/flowy-grid/src/services/grid_editor.rs

@@ -3,8 +3,10 @@ use crate::entities::CellIdentifier;
 use crate::manager::GridUser;
 use crate::services::block_manager::GridBlockManager;
 use crate::services::field::{default_type_option_builder_from_type, type_option_builder_from_bytes, FieldBuilder};
+use crate::services::filter::GridFilterService;
 use crate::services::persistence::block_index::BlockIndexCache;
 use crate::services::row::*;
+use crate::services::tasks::GridTaskScheduler;
 use bytes::Bytes;
 use flowy_error::{ErrorCode, FlowyError, FlowyResult};
 use flowy_grid_data_model::entities::*;
@@ -21,16 +23,18 @@ use std::sync::Arc;
 use tokio::sync::RwLock;
 
 pub struct GridRevisionEditor {
-    grid_id: String,
+    pub(crate) grid_id: String,
     user: Arc<dyn GridUser>,
     grid_pad: Arc<RwLock<GridRevisionPad>>,
     rev_manager: Arc<RevisionManager>,
     block_manager: Arc<GridBlockManager>,
+    task_scheduler: Arc<RwLock<GridTaskScheduler>>,
+    pub(crate) filter_service: Arc<GridFilterService>,
 }
 
 impl Drop for GridRevisionEditor {
     fn drop(&mut self) {
-        tracing::trace!("Drop GridMetaEditor");
+        tracing::trace!("Drop GridRevisionEditor");
     }
 }
 
@@ -40,22 +44,29 @@ impl GridRevisionEditor {
         user: Arc<dyn GridUser>,
         mut rev_manager: RevisionManager,
         persistence: Arc<BlockIndexCache>,
+        task_scheduler: Arc<RwLock<GridTaskScheduler>>,
     ) -> FlowyResult<Arc<Self>> {
         let token = user.token()?;
         let cloud = Arc::new(GridRevisionCloudService { token });
         let grid_pad = rev_manager.load::<GridPadBuilder>(Some(cloud)).await?;
         let rev_manager = Arc::new(rev_manager);
         let grid_pad = Arc::new(RwLock::new(grid_pad));
-        let block_revs = grid_pad.read().await.get_block_meta_revs();
-
-        let block_meta_manager = Arc::new(GridBlockManager::new(grid_id, &user, block_revs, persistence).await?);
-        Ok(Arc::new(Self {
+        let block_meta_revs = grid_pad.read().await.get_block_meta_revs();
+        let block_manager = Arc::new(GridBlockManager::new(grid_id, &user, block_meta_revs, persistence).await?);
+        let filter_service = Arc::new(GridFilterService::new());
+        let editor = Arc::new(Self {
             grid_id: grid_id.to_owned(),
             user,
             grid_pad,
             rev_manager,
-            block_manager: block_meta_manager,
-        }))
+            block_manager,
+            filter_service,
+            task_scheduler: task_scheduler.clone(),
+        });
+
+        task_scheduler.write().await.register_handler(editor.clone());
+
+        Ok(editor)
     }
 
     pub async fn insert_field(&self, params: InsertFieldParams) -> FlowyResult<()> {

+ 21 - 0
frontend/rust-lib/flowy-grid/src/services/grid_editor_task.rs

@@ -0,0 +1,21 @@
+use crate::services::grid_editor::GridRevisionEditor;
+use crate::services::tasks::{GridTaskHandler, Task, TaskContent};
+use flowy_error::FlowyError;
+use lib_infra::future::BoxResultFuture;
+use std::sync::Arc;
+
+impl GridTaskHandler for Arc<GridRevisionEditor> {
+    fn handler_id(&self) -> &str {
+        &self.grid_id
+    }
+
+    fn process_task(&self, task: Task) -> BoxResultFuture<(), FlowyError> {
+        Box::pin(async move {
+            match task.content {
+                TaskContent::Snapshot { .. } => {}
+                TaskContent::Filter => {}
+            }
+            Ok(())
+        })
+    }
+}

+ 3 - 1
frontend/rust-lib/flowy-grid/src/services/mod.rs

@@ -5,7 +5,9 @@ pub mod block_revision_editor;
 pub mod field;
 mod filter;
 pub mod grid_editor;
+mod grid_editor_task;
 pub mod persistence;
 pub mod row;
 pub mod setting;
-// mod tasks;
+mod snapshot;
+pub mod tasks;

+ 1 - 1
frontend/rust-lib/flowy-grid/src/services/row/cell_data_operation.rs

@@ -137,7 +137,7 @@ pub fn decode_cell_data<T: TryInto<TypeOptionCellData>>(data: T, field_rev: &Fie
     if let Ok(type_option_cell_data) = data.try_into() {
         let TypeOptionCellData { data, field_type } = type_option_cell_data;
         let to_field_type = &field_rev.field_type;
-        match try_decode_cell_data(data, &field_rev, &field_type, to_field_type) {
+        match try_decode_cell_data(data, field_rev, &field_type, to_field_type) {
             Ok(cell_data) => cell_data,
             Err(e) => {
                 tracing::error!("Decode cell data failed, {:?}", e);

+ 3 - 0
frontend/rust-lib/flowy-grid/src/services/snapshot/mod.rs

@@ -0,0 +1,3 @@
+mod snapshot_service;
+
+pub use snapshot_service::*;

+ 7 - 0
frontend/rust-lib/flowy-grid/src/services/snapshot/snapshot_service.rs

@@ -0,0 +1,7 @@
+pub struct GridSnapshotService {}
+
+impl GridSnapshotService {
+    pub fn new() -> Self {
+        Self {}
+    }
+}

+ 0 - 0
frontend/rust-lib/flowy-grid/src/services/tasks/filter/mod.rs


+ 6 - 1
frontend/rust-lib/flowy-grid/src/services/tasks/mod.rs

@@ -1,3 +1,8 @@
-mod filter;
+mod queue;
 mod runner;
 mod scheduler;
+mod store;
+mod task;
+
+pub use scheduler::*;
+pub use task::*;

+ 118 - 0
frontend/rust-lib/flowy-grid/src/services/tasks/queue.rs

@@ -0,0 +1,118 @@
+use crate::services::tasks::task::{PendingTask, Task, TaskContent, TaskType};
+use atomic_refcell::AtomicRefCell;
+
+use std::cmp::Ordering;
+use std::collections::hash_map::Entry;
+use std::collections::{BinaryHeap, HashMap};
+use std::ops::{Deref, DerefMut};
+use std::sync::Arc;
+
+#[derive(Default)]
+pub(crate) struct GridTaskQueue {
+    // index_tasks for quick access
+    index_tasks: HashMap<String, Arc<AtomicRefCell<TaskList>>>,
+    queue: BinaryHeap<Arc<AtomicRefCell<TaskList>>>,
+}
+
+impl GridTaskQueue {
+    pub(crate) fn new() -> Self {
+        Self::default()
+    }
+
+    pub(crate) fn push(&mut self, task: &Task) {
+        let task_type = match task.content {
+            TaskContent::Snapshot { .. } => TaskType::Snapshot,
+            TaskContent::Filter => TaskType::Filter,
+        };
+        let pending_task = PendingTask {
+            ty: task_type,
+            id: task.id.clone(),
+        };
+        match self.index_tasks.entry("1".to_owned()) {
+            Entry::Occupied(entry) => {
+                let mut list = entry.get().borrow_mut();
+                assert!(list.peek().map(|old_id| pending_task.id >= old_id.id).unwrap_or(true));
+                list.push(pending_task);
+            }
+            Entry::Vacant(entry) => {
+                let mut task_list = TaskList::new(entry.key());
+                task_list.push(pending_task);
+                let task_list = Arc::new(AtomicRefCell::new(task_list));
+                entry.insert(task_list.clone());
+                self.queue.push(task_list);
+            }
+        }
+    }
+
+    pub(crate) fn mut_head<T, F>(&mut self, mut f: F) -> Option<T>
+    where
+        F: FnMut(&mut TaskList) -> T,
+    {
+        let head = self.queue.pop()?;
+        let result = {
+            let mut ref_head = head.borrow_mut();
+            f(&mut *ref_head)
+        };
+        if !head.borrow().tasks.is_empty() {
+            self.queue.push(head);
+        } else {
+            self.index_tasks.remove(&head.borrow().id);
+        }
+
+        Some(result)
+    }
+}
+
+#[derive(Debug)]
+pub(crate) struct TaskList {
+    id: String,
+    tasks: BinaryHeap<PendingTask>,
+}
+
+impl Deref for TaskList {
+    type Target = BinaryHeap<PendingTask>;
+
+    fn deref(&self) -> &Self::Target {
+        &self.tasks
+    }
+}
+
+impl DerefMut for TaskList {
+    fn deref_mut(&mut self) -> &mut Self::Target {
+        &mut self.tasks
+    }
+}
+
+impl TaskList {
+    fn new(id: &str) -> Self {
+        Self {
+            id: id.to_owned(),
+            tasks: BinaryHeap::new(),
+        }
+    }
+}
+
+impl PartialEq for TaskList {
+    fn eq(&self, other: &Self) -> bool {
+        self.id == other.id
+    }
+}
+
+impl Eq for TaskList {}
+
+impl Ord for TaskList {
+    fn cmp(&self, other: &Self) -> Ordering {
+        match (self.peek(), other.peek()) {
+            (None, None) => Ordering::Equal,
+            (None, Some(_)) => Ordering::Less,
+            (Some(_), None) => Ordering::Greater,
+            (Some(lhs), Some(rhs)) => lhs.cmp(rhs),
+        }
+    }
+}
+
+impl PartialOrd for TaskList {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        Some(self.cmp(other))
+    }
+}

+ 45 - 0
frontend/rust-lib/flowy-grid/src/services/tasks/runner.rs

@@ -0,0 +1,45 @@
+use crate::services::tasks::scheduler::GridTaskScheduler;
+use std::sync::Arc;
+use std::time::Duration;
+use tokio::sync::{watch, RwLock};
+use tokio::time::interval;
+
+pub struct GridTaskRunner {
+    scheduler: Arc<RwLock<GridTaskScheduler>>,
+    debounce_duration: Duration,
+    notifier: Option<watch::Receiver<()>>,
+}
+
+impl GridTaskRunner {
+    pub fn new(
+        scheduler: Arc<RwLock<GridTaskScheduler>>,
+        notifier: watch::Receiver<()>,
+        debounce_duration: Duration,
+    ) -> Self {
+        Self {
+            scheduler,
+            debounce_duration,
+            notifier: Some(notifier),
+        }
+    }
+
+    pub async fn run(mut self) {
+        let mut notifier = self
+            .notifier
+            .take()
+            .expect("The GridTaskRunner's notifier should only take once");
+
+        loop {
+            if notifier.changed().await.is_err() {
+                // The runner will be stopped if the corresponding Sender drop.
+                break;
+            }
+            let mut interval = interval(self.debounce_duration);
+            interval.tick().await;
+
+            if let Err(e) = self.scheduler.write().await.process_next_task() {
+                tracing::error!("{:?}", e);
+            }
+        }
+    }
+}

+ 53 - 66
frontend/rust-lib/flowy-grid/src/services/tasks/scheduler.rs

@@ -1,76 +1,63 @@
-use std::cmp::Ordering;
-use std::collections::BinaryHeap;
-use std::ops::{Deref, DerefMut};
-
-enum TaskType {
-    /// Remove the row if it doesn't satisfy the filter.
-    Filter,
-    /// Generate snapshot for grid, unused by now.
-    Snapshot,
-}
-
-/// Two tasks are equal if they have the same type.
-impl PartialEq for TaskType {
-    fn eq(&self, other: &Self) -> bool {
-        matches!((self, other),)
+use crate::services::tasks::queue::GridTaskQueue;
+use crate::services::tasks::runner::GridTaskRunner;
+use crate::services::tasks::store::GridTaskStore;
+use crate::services::tasks::task::Task;
+use flowy_error::{FlowyError, FlowyResult};
+use lib_infra::future::BoxResultFuture;
+use std::sync::Arc;
+use std::time::Duration;
+use tokio::sync::{watch, RwLock};
+
+pub trait GridTaskHandler: Send + Sync + 'static {
+    fn handler_id(&self) -> &str;
+
+    fn process_task(&self, task: Task) -> BoxResultFuture<(), FlowyError>;
+}
+
+pub struct GridTaskScheduler {
+    queue: GridTaskQueue,
+    store: GridTaskStore,
+    notifier: watch::Sender<()>,
+    handlers: Vec<Arc<dyn GridTaskHandler>>,
+}
+
+impl GridTaskScheduler {
+    pub fn new() -> Arc<RwLock<Self>> {
+        let (notifier, rx) = watch::channel(());
+
+        let scheduler = Self {
+            queue: GridTaskQueue::new(),
+            store: GridTaskStore::new(),
+            notifier,
+            handlers: vec![],
+        };
+        // The runner will receive the newest value after start running.
+        scheduler.notify();
+
+        let scheduler = Arc::new(RwLock::new(scheduler));
+        let debounce_duration = Duration::from_millis(300);
+        let runner = GridTaskRunner::new(scheduler.clone(), rx, debounce_duration);
+        tokio::spawn(runner.run());
+
+        scheduler
     }
-}
-
-pub type TaskId = u32;
 
-#[derive(Eq, Debug, Clone, Copy)]
-struct PendingTask {
-    kind: TaskType,
-    id: TaskId,
-}
-
-impl PartialEq for PendingTask {
-    fn eq(&self, other: &Self) -> bool {
-        self.id.eq(&other.id)
+    pub fn register_handler<T>(&mut self, handler: T)
+    where
+        T: GridTaskHandler,
+    {
+        // todo!()
     }
-}
 
-impl PartialOrd for PendingTask {
-    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
-        Some(self.cmp(other))
+    pub fn process_next_task(&mut self) -> FlowyResult<()> {
+        Ok(())
     }
-}
 
-impl Ord for PendingTask {
-    fn cmp(&self, other: &Self) -> Ordering {
-        self.id.cmp(&other.id).reverse()
+    pub fn register_task(&self, task: Task) {
+        assert!(!task.is_finished());
     }
-}
-
-#[derive(PartialEq, Eq, Hash, Debug, Clone)]
-enum TaskListIdentifier {
-    Filter(String),
-    Snapshot(String),
-}
-
-#[derive(Debug)]
-struct TaskList {
-    tasks: BinaryHeap<PendingTask>,
-}
-
-impl Deref for TaskList {
-    type Target = BinaryHeap<PendingTask>;
-
-    fn deref(&self) -> &Self::Target {
-        &self.tasks
-    }
-}
-
-impl DerefMut for TaskList {
-    fn deref_mut(&mut self) -> &mut Self::Target {
-        &mut self.tasks
-    }
-}
 
-impl TaskList {
-    fn new() -> Self {
-        Self {
-            tasks: Default::default(),
-        }
+    pub fn notify(&self) {
+        let _ = self.notifier.send(());
     }
 }

+ 11 - 0
frontend/rust-lib/flowy-grid/src/services/tasks/store.rs

@@ -0,0 +1,11 @@
+use crate::services::tasks::task::Task;
+
+pub struct GridTaskStore {
+    tasks: Vec<Task>,
+}
+
+impl GridTaskStore {
+    pub fn new() -> Self {
+        Self { tasks: vec![] }
+    }
+}

+ 67 - 0
frontend/rust-lib/flowy-grid/src/services/tasks/task.rs

@@ -0,0 +1,67 @@
+use std::cmp::Ordering;
+
+#[derive(Eq, Debug, Clone, Copy)]
+pub enum TaskType {
+    /// Remove the row if it doesn't satisfy the filter.
+    Filter,
+    /// Generate snapshot for grid, unused by now.
+    Snapshot,
+}
+
+impl PartialEq for TaskType {
+    fn eq(&self, other: &Self) -> bool {
+        matches!(
+            (self, other),
+            (Self::Filter, Self::Filter) | (Self::Snapshot, Self::Snapshot)
+        )
+    }
+}
+
+pub type TaskId = u32;
+
+#[derive(Eq, Debug, Clone, Copy)]
+pub struct PendingTask {
+    pub ty: TaskType,
+    pub id: TaskId,
+}
+
+impl PartialEq for PendingTask {
+    fn eq(&self, other: &Self) -> bool {
+        self.id.eq(&other.id)
+    }
+}
+
+impl PartialOrd for PendingTask {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl Ord for PendingTask {
+    fn cmp(&self, other: &Self) -> Ordering {
+        match (self.ty, other.ty) {
+            (TaskType::Snapshot, TaskType::Snapshot) => Ordering::Equal,
+            (TaskType::Snapshot, _) => Ordering::Greater,
+            (_, TaskType::Snapshot) => Ordering::Less,
+            (TaskType::Filter, TaskType::Filter) => self.id.cmp(&other.id),
+        }
+    }
+}
+
+pub type ContentId = String;
+
+pub enum TaskContent {
+    Snapshot { content_id: ContentId },
+    Filter,
+}
+
+pub struct Task {
+    pub id: TaskId,
+    pub content: TaskContent,
+}
+
+impl Task {
+    pub fn is_finished(&self) -> bool {
+        todo!()
+    }
+}

+ 1 - 1
frontend/rust-lib/flowy-grid/tests/grid/cell_test.rs

@@ -9,7 +9,7 @@ async fn grid_cell_update() {
     let mut test = GridEditorTest::new().await;
     let field_revs = &test.field_revs;
     let row_revs = &test.row_revs;
-    let grid_blocks = &test.grid_block_revs;
+    let grid_blocks = &test.block_meta_revs;
 
     // For the moment, We only have one block to store rows
     let block_id = &grid_blocks.first().unwrap().block_id;

+ 11 - 11
frontend/rust-lib/flowy-grid/tests/grid/script.rs

@@ -87,7 +87,7 @@ pub struct GridEditorTest {
     pub grid_id: String,
     pub editor: Arc<GridRevisionEditor>,
     pub field_revs: Vec<FieldRevision>,
-    pub grid_block_revs: Vec<GridBlockMetaRevision>,
+    pub block_meta_revs: Vec<Arc<GridBlockMetaRevision>>,
     pub row_revs: Vec<Arc<RowRevision>>,
     pub field_count: usize,
 
@@ -103,10 +103,10 @@ impl GridEditorTest {
         let test = ViewTest::new_grid_view(&sdk, view_data.to_vec()).await;
         let editor = sdk.grid_manager.open_grid(&test.view.id).await.unwrap();
         let field_revs = editor.get_field_revs::<FieldOrder>(None).await.unwrap();
-        let grid_blocks = editor.get_block_meta_revs().await.unwrap();
+        let block_meta_revs = editor.get_block_meta_revs().await.unwrap();
         let row_revs = editor.grid_block_snapshots(None).await.unwrap().pop().unwrap().row_revs;
         assert_eq!(row_revs.len(), 3);
-        assert_eq!(grid_blocks.len(), 1);
+        assert_eq!(block_meta_revs.len(), 1);
 
         // It seems like you should add the field in the make_test_grid() function.
         // Because we assert the initialize count of the fields is equal to FieldType::COUNT.
@@ -118,7 +118,7 @@ impl GridEditorTest {
             grid_id,
             editor,
             field_revs,
-            grid_block_revs: grid_blocks,
+            block_meta_revs,
             row_revs,
             field_count: FieldType::COUNT,
             row_order_by_row_id: HashMap::default(),
@@ -172,7 +172,7 @@ impl GridEditorTest {
             }
             EditorScript::CreateBlock { block } => {
                 self.editor.create_block(block).await.unwrap();
-                self.grid_block_revs = self.editor.get_block_meta_revs().await.unwrap();
+                self.block_meta_revs = self.editor.get_block_meta_revs().await.unwrap();
             }
             EditorScript::UpdateBlock { changeset: change } => {
                 self.editor.update_block(change).await.unwrap();
@@ -185,19 +185,19 @@ impl GridEditorTest {
                 row_count,
                 start_row_index,
             } => {
-                assert_eq!(self.grid_block_revs[block_index].row_count, row_count);
-                assert_eq!(self.grid_block_revs[block_index].start_row_index, start_row_index);
+                assert_eq!(self.block_meta_revs[block_index].row_count, row_count);
+                assert_eq!(self.block_meta_revs[block_index].start_row_index, start_row_index);
             }
             EditorScript::AssertBlockEqual { block_index, block } => {
                 let blocks = self.editor.get_block_meta_revs().await.unwrap();
                 let compared_block = blocks[block_index].clone();
-                assert_eq!(compared_block, block);
+                assert_eq!(compared_block, Arc::new(block));
             }
             EditorScript::CreateEmptyRow => {
                 let row_order = self.editor.create_row(None).await.unwrap();
                 self.row_order_by_row_id.insert(row_order.row_id.clone(), row_order);
                 self.row_revs = self.get_row_revs().await;
-                self.grid_block_revs = self.editor.get_block_meta_revs().await.unwrap();
+                self.block_meta_revs = self.editor.get_block_meta_revs().await.unwrap();
             }
             EditorScript::CreateRow { payload: context } => {
                 let row_orders = self.editor.insert_rows(vec![context]).await.unwrap();
@@ -205,7 +205,7 @@ impl GridEditorTest {
                     self.row_order_by_row_id.insert(row_order.row_id.clone(), row_order);
                 }
                 self.row_revs = self.get_row_revs().await;
-                self.grid_block_revs = self.editor.get_block_meta_revs().await.unwrap();
+                self.block_meta_revs = self.editor.get_block_meta_revs().await.unwrap();
             }
             EditorScript::UpdateRow { changeset: change } => self.editor.update_row(change).await.unwrap(),
             EditorScript::DeleteRows { row_ids } => {
@@ -216,7 +216,7 @@ impl GridEditorTest {
 
                 self.editor.delete_rows(row_orders).await.unwrap();
                 self.row_revs = self.get_row_revs().await;
-                self.grid_block_revs = self.editor.get_block_meta_revs().await.unwrap();
+                self.block_meta_revs = self.editor.get_block_meta_revs().await.unwrap();
             }
             EditorScript::AssertRow { expected_row } => {
                 let row = &*self

+ 9 - 0
frontend/rust-lib/flowy-text-block/src/editor.rs

@@ -199,6 +199,15 @@ fn spawn_edit_queue(
 ) -> EditorCommandSender {
     let (sender, receiver) = mpsc::channel(1000);
     let edit_queue = EditBlockQueue::new(user, rev_manager, delta, receiver);
+    // We can use tokio::task::spawn_local here by using tokio::spawn_blocking.
+    // https://github.com/tokio-rs/tokio/issues/2095
+    // tokio::task::spawn_blocking(move || {
+    //     let rt = tokio::runtime::Handle::current();
+    //     rt.block_on(async {
+    //         let local = tokio::task::LocalSet::new();
+    //         local.run_until(edit_queue.run()).await;
+    //     });
+    // });
     tokio::spawn(edit_queue.run());
     sender
 }

+ 8 - 1
shared-lib/flowy-sync/src/client_grid/grid_block_revsion_pad.rs

@@ -5,7 +5,6 @@ use flowy_grid_data_model::revision::{
     gen_block_id, gen_row_id, CellRevision, GridBlockRevision, RowMetaChangeset, RowRevision,
 };
 use lib_ot::core::{OperationTransformable, PlainTextAttributes, PlainTextDelta, PlainTextDeltaBuilder};
-use serde::{Deserialize, Serialize};
 use std::borrow::Cow;
 use std::collections::HashMap;
 use std::sync::Arc;
@@ -19,6 +18,14 @@ pub struct GridBlockRevisionPad {
     pub(crate) delta: GridBlockRevisionDelta,
 }
 
+impl std::ops::Deref for GridBlockRevisionPad {
+    type Target = GridBlockRevision;
+
+    fn deref(&self) -> &Self::Target {
+        &self.block_revision
+    }
+}
+
 impl GridBlockRevisionPad {
     pub async fn duplicate_data(&self, duplicated_block_id: &str) -> GridBlockRevision {
         let duplicated_rows = self