use bytes::Bytes; use flowy_collaboration::entities::{ document_info::{BlockDelta, BlockId}, revision::{RepeatedRevision, Revision}, }; use flowy_collaboration::client_document::default::initial_quill_delta_string; use futures::{FutureExt, StreamExt}; use std::collections::HashMap; use std::{collections::HashSet, sync::Arc}; use crate::manager::DataProcessorMap; use crate::{ dart_notification::{send_dart_notification, FolderNotification}, entities::{ trash::{RepeatedTrashId, TrashType}, view::{CreateViewParams, RepeatedView, UpdateViewParams, View, ViewId}, }, errors::{FlowyError, FlowyResult}, event_map::{FolderCouldServiceV1, WorkspaceUser}, services::{ persistence::{FolderPersistence, FolderPersistenceTransaction, ViewChangeset}, TrashController, TrashEvent, }, }; use flowy_block::BlockManager; use flowy_database::kv::KV; use flowy_folder_data_model::entities::view::ViewDataType; use lib_infra::uuid; const LATEST_VIEW_ID: &str = "latest_view_id"; pub(crate) struct ViewController { user: Arc, cloud_service: Arc, persistence: Arc, trash_controller: Arc, data_processors: DataProcessorMap, block_manager: Arc, } impl ViewController { pub(crate) fn new( user: Arc, persistence: Arc, cloud_service: Arc, trash_controller: Arc, data_processors: DataProcessorMap, block_manager: Arc, ) -> Self { Self { user, cloud_service, persistence, trash_controller, data_processors, block_manager, } } pub(crate) fn initialize(&self) -> Result<(), FlowyError> { let _ = self.block_manager.init()?; self.listen_trash_can_event(); Ok(()) } #[tracing::instrument(level = "trace", skip(self, params), fields(name = %params.name), err)] pub(crate) async fn create_view_from_params(&self, params: CreateViewParams) -> Result { let view_data = if params.data.is_empty() { initial_quill_delta_string() } else { params.data.clone() }; let _ = self.create_view(¶ms.view_id, Bytes::from(view_data)).await?; let view = self.create_view_on_server(params).await?; let _ = self.create_view_on_local(view.clone()).await?; Ok(view) } #[tracing::instrument(level = "debug", skip(self, view_id, delta_data), err)] pub(crate) async fn create_view(&self, view_id: &str, delta_data: Bytes) -> Result<(), FlowyError> { if delta_data.is_empty() { return Err(FlowyError::internal().context("The content of the view should not be empty")); } let user_id = self.user.user_id()?; let repeated_revision: RepeatedRevision = Revision::initial_revision(&user_id, view_id, delta_data).into(); let _ = self.block_manager.create_block(view_id, repeated_revision).await?; Ok(()) } pub(crate) async fn create_view_on_local(&self, view: View) -> Result<(), FlowyError> { let trash_controller = self.trash_controller.clone(); self.persistence .begin_transaction(|transaction| { let belong_to_id = view.belong_to_id.clone(); let _ = transaction.create_view(view)?; let _ = notify_views_changed(&belong_to_id, trash_controller, &transaction)?; Ok(()) }) .await } #[tracing::instrument(skip(self, view_id), fields(view_id = %view_id.value), err)] pub(crate) async fn read_view(&self, view_id: ViewId) -> Result { let view = self .persistence .begin_transaction(|transaction| { let view = transaction.read_view(&view_id.value)?; let trash_ids = self.trash_controller.read_trash_ids(&transaction)?; if trash_ids.contains(&view.id) { return Err(FlowyError::record_not_found()); } Ok(view) }) .await?; let _ = self.read_view_on_server(view_id); Ok(view) } pub(crate) async fn read_local_views(&self, ids: Vec) -> Result, FlowyError> { self.persistence .begin_transaction(|transaction| { let mut views = vec![]; for view_id in ids { views.push(transaction.read_view(&view_id)?); } Ok(views) }) .await } #[tracing::instrument(level = "debug", skip(self), err)] pub(crate) async fn open_view(&self, view_id: &str) -> Result { let editor = self.block_manager.open_block(view_id).await?; let delta_str = editor.delta_str().await?; KV::set_str(LATEST_VIEW_ID, view_id.to_owned()); Ok(BlockDelta { block_id: view_id.to_string(), delta_str, }) } #[tracing::instrument(level = "debug", skip(self), err)] pub(crate) async fn close_view(&self, doc_id: &str) -> Result<(), FlowyError> { let _ = self.block_manager.close_block(doc_id)?; Ok(()) } #[tracing::instrument(level = "debug", skip(self,params), fields(doc_id = %params.value), err)] pub(crate) async fn delete_view(&self, params: BlockId) -> Result<(), FlowyError> { if let Some(view_id) = KV::get_str(LATEST_VIEW_ID) { if view_id == params.value { let _ = KV::remove(LATEST_VIEW_ID); } } let _ = self.block_manager.close_block(¶ms.value)?; Ok(()) } #[tracing::instrument(level = "debug", skip(self), err)] pub(crate) async fn duplicate_view(&self, view_id: &str) -> Result<(), FlowyError> { let view = self .persistence .begin_transaction(|transaction| transaction.read_view(view_id)) .await?; let editor = self.block_manager.open_block(view_id).await?; let delta_str = editor.delta_str().await?; let duplicate_params = CreateViewParams { belong_to_id: view.belong_to_id.clone(), name: format!("{} (copy)", &view.name), desc: view.desc, thumbnail: view.thumbnail, data_type: view.data_type, data: delta_str, view_id: uuid(), ext_data: view.ext_data, plugin_type: view.plugin_type, }; let _ = self.create_view_from_params(duplicate_params).await?; Ok(()) } // belong_to_id will be the app_id or view_id. #[tracing::instrument(level = "debug", skip(self), err)] pub(crate) async fn read_views_belong_to(&self, belong_to_id: &str) -> Result { self.persistence .begin_transaction(|transaction| { read_belonging_views_on_local(belong_to_id, self.trash_controller.clone(), &transaction) }) .await } #[tracing::instrument(level = "debug", skip(self, params), err)] pub(crate) async fn update_view(&self, params: UpdateViewParams) -> Result { let changeset = ViewChangeset::new(params.clone()); let view_id = changeset.id.clone(); let view = self .persistence .begin_transaction(|transaction| { let _ = transaction.update_view(changeset)?; let view = transaction.read_view(&view_id)?; send_dart_notification(&view_id, FolderNotification::ViewUpdated) .payload(view.clone()) .send(); let _ = notify_views_changed(&view.belong_to_id, self.trash_controller.clone(), &transaction)?; Ok(view) }) .await?; let _ = self.update_view_on_server(params); Ok(view) } pub(crate) async fn latest_visit_view(&self) -> FlowyResult> { match KV::get_str(LATEST_VIEW_ID) { None => Ok(None), Some(view_id) => { let view = self .persistence .begin_transaction(|transaction| transaction.read_view(&view_id)) .await?; Ok(Some(view)) } } } pub(crate) fn set_latest_view(&self, view: &View) { KV::set_str(LATEST_VIEW_ID, view.id.clone()); } } impl ViewController { #[tracing::instrument(skip(self), err)] async fn create_view_on_server(&self, params: CreateViewParams) -> Result { let token = self.user.token()?; let view = self.cloud_service.create_view(&token, params).await?; Ok(view) } #[tracing::instrument(skip(self), err)] fn update_view_on_server(&self, params: UpdateViewParams) -> Result<(), FlowyError> { let token = self.user.token()?; let server = self.cloud_service.clone(); tokio::spawn(async move { match server.update_view(&token, params).await { Ok(_) => {} Err(e) => { // TODO: retry? log::error!("Update view failed: {:?}", e); } } }); Ok(()) } #[tracing::instrument(skip(self), err)] fn read_view_on_server(&self, params: ViewId) -> Result<(), FlowyError> { let token = self.user.token()?; let server = self.cloud_service.clone(); let persistence = self.persistence.clone(); // TODO: Retry with RetryAction? tokio::spawn(async move { match server.read_view(&token, params).await { Ok(Some(view)) => { match persistence .begin_transaction(|transaction| transaction.create_view(view.clone())) .await { Ok(_) => { send_dart_notification(&view.id, FolderNotification::ViewUpdated) .payload(view.clone()) .send(); } Err(e) => log::error!("Save view failed: {:?}", e), } } Ok(None) => {} Err(e) => log::error!("Read view failed: {:?}", e), } }); Ok(()) } fn listen_trash_can_event(&self) { let mut rx = self.trash_controller.subscribe(); let persistence = self.persistence.clone(); let block_manager = self.block_manager.clone(); let trash_controller = self.trash_controller.clone(); let _ = tokio::spawn(async move { loop { let mut stream = Box::pin(rx.recv().into_stream().filter_map(|result| async move { match result { Ok(event) => event.select(TrashType::TrashView), Err(_e) => None, } })); if let Some(event) = stream.next().await { handle_trash_event( persistence.clone(), block_manager.clone(), trash_controller.clone(), event, ) .await } } }); } } #[tracing::instrument(level = "trace", skip(persistence, block_manager, trash_can))] async fn handle_trash_event( persistence: Arc, block_manager: Arc, trash_can: Arc, event: TrashEvent, ) { match event { TrashEvent::NewTrash(identifiers, ret) => { let result = persistence .begin_transaction(|transaction| { let views = read_local_views_with_transaction(identifiers, &transaction)?; for view in views { let _ = notify_views_changed(&view.belong_to_id, trash_can.clone(), &transaction)?; notify_dart(view, FolderNotification::ViewDeleted); } Ok(()) }) .await; let _ = ret.send(result).await; } TrashEvent::Putback(identifiers, ret) => { let result = persistence .begin_transaction(|transaction| { let views = read_local_views_with_transaction(identifiers, &transaction)?; for view in views { let _ = notify_views_changed(&view.belong_to_id, trash_can.clone(), &transaction)?; notify_dart(view, FolderNotification::ViewRestored); } Ok(()) }) .await; let _ = ret.send(result).await; } TrashEvent::Delete(identifiers, ret) => { let result = persistence .begin_transaction(|transaction| { let mut notify_ids = HashSet::new(); for identifier in identifiers.items { let view = transaction.read_view(&identifier.id)?; let _ = transaction.delete_view(&identifier.id)?; let _ = block_manager.delete_block(&identifier.id)?; notify_ids.insert(view.belong_to_id); } for notify_id in notify_ids { let _ = notify_views_changed(¬ify_id, trash_can.clone(), &transaction)?; } Ok(()) }) .await; let _ = ret.send(result).await; } } } fn read_local_views_with_transaction<'a>( identifiers: RepeatedTrashId, transaction: &'a (dyn FolderPersistenceTransaction + 'a), ) -> Result, FlowyError> { let mut views = vec![]; for identifier in identifiers.items { let view = transaction.read_view(&identifier.id)?; views.push(view); } Ok(views) } fn notify_dart(view: View, notification: FolderNotification) { send_dart_notification(&view.id, notification).payload(view).send(); } #[tracing::instrument(skip(belong_to_id, trash_controller, transaction), fields(view_count), err)] fn notify_views_changed<'a>( belong_to_id: &str, trash_controller: Arc, transaction: &'a (dyn FolderPersistenceTransaction + 'a), ) -> FlowyResult<()> { let repeated_view = read_belonging_views_on_local(belong_to_id, trash_controller.clone(), transaction)?; tracing::Span::current().record("view_count", &format!("{}", repeated_view.len()).as_str()); send_dart_notification(belong_to_id, FolderNotification::AppViewsChanged) .payload(repeated_view) .send(); Ok(()) } fn read_belonging_views_on_local<'a>( belong_to_id: &str, trash_controller: Arc, transaction: &'a (dyn FolderPersistenceTransaction + 'a), ) -> FlowyResult { let mut views = transaction.read_views(belong_to_id)?; let trash_ids = trash_controller.read_trash_ids(transaction)?; views.retain(|view_table| !trash_ids.contains(&view_table.id)); Ok(RepeatedView { items: views }) }