123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251 |
- use crate::queue::BlockRevisionCompact;
- use crate::web_socket::{make_block_ws_manager, EditorCommandSender};
- use crate::{
- errors::FlowyError,
- queue::{EditBlockQueue, EditorCommand},
- BlockUser,
- };
- use bytes::Bytes;
- use flowy_collaboration::entities::ws_data::ServerRevisionWSData;
- use flowy_collaboration::{
- entities::{document_info::BlockInfo, revision::Revision},
- errors::CollaborateResult,
- util::make_delta_from_revisions,
- };
- use flowy_error::{internal_error, FlowyResult};
- use flowy_sync::{
- RevisionCloudService, RevisionManager, RevisionObjectBuilder, RevisionWebSocket, RevisionWebSocketManager,
- };
- use lib_ot::{
- core::{Interval, Operation},
- rich_text::{RichTextAttribute, RichTextDelta},
- };
- use lib_ws::WSConnectState;
- use std::sync::Arc;
- use tokio::sync::{mpsc, oneshot};
- pub struct ClientBlockEditor {
- pub doc_id: String,
- #[allow(dead_code)]
- rev_manager: Arc<RevisionManager>,
- ws_manager: Arc<RevisionWebSocketManager>,
- edit_cmd_tx: EditorCommandSender,
- }
- impl ClientBlockEditor {
- pub(crate) async fn new(
- doc_id: &str,
- user: Arc<dyn BlockUser>,
- mut rev_manager: RevisionManager,
- rev_web_socket: Arc<dyn RevisionWebSocket>,
- cloud_service: Arc<dyn RevisionCloudService>,
- ) -> FlowyResult<Arc<Self>> {
- let document_info = rev_manager
- .load::<BlockInfoBuilder, BlockRevisionCompact>(cloud_service)
- .await?;
- let delta = document_info.delta()?;
- let rev_manager = Arc::new(rev_manager);
- let doc_id = doc_id.to_string();
- let user_id = user.user_id()?;
- let edit_cmd_tx = spawn_edit_queue(user, rev_manager.clone(), delta);
- let ws_manager = make_block_ws_manager(
- doc_id.clone(),
- user_id.clone(),
- edit_cmd_tx.clone(),
- rev_manager.clone(),
- rev_web_socket,
- )
- .await;
- let editor = Arc::new(Self {
- doc_id,
- rev_manager,
- ws_manager,
- edit_cmd_tx,
- });
- Ok(editor)
- }
- pub async fn insert<T: ToString>(&self, index: usize, data: T) -> Result<(), FlowyError> {
- let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
- let msg = EditorCommand::Insert {
- index,
- data: data.to_string(),
- ret,
- };
- let _ = self.edit_cmd_tx.send(msg).await;
- let _ = rx.await.map_err(internal_error)??;
- Ok(())
- }
- pub async fn delete(&self, interval: Interval) -> Result<(), FlowyError> {
- let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
- let msg = EditorCommand::Delete { interval, ret };
- let _ = self.edit_cmd_tx.send(msg).await;
- let _ = rx.await.map_err(internal_error)??;
- Ok(())
- }
- pub async fn format(&self, interval: Interval, attribute: RichTextAttribute) -> Result<(), FlowyError> {
- let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
- let msg = EditorCommand::Format {
- interval,
- attribute,
- ret,
- };
- let _ = self.edit_cmd_tx.send(msg).await;
- let _ = rx.await.map_err(internal_error)??;
- Ok(())
- }
- pub async fn replace<T: ToString>(&self, interval: Interval, data: T) -> Result<(), FlowyError> {
- let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
- let msg = EditorCommand::Replace {
- interval,
- data: data.to_string(),
- ret,
- };
- let _ = self.edit_cmd_tx.send(msg).await;
- let _ = rx.await.map_err(internal_error)??;
- Ok(())
- }
- pub async fn can_undo(&self) -> bool {
- let (ret, rx) = oneshot::channel::<bool>();
- let msg = EditorCommand::CanUndo { ret };
- let _ = self.edit_cmd_tx.send(msg).await;
- rx.await.unwrap_or(false)
- }
- pub async fn can_redo(&self) -> bool {
- let (ret, rx) = oneshot::channel::<bool>();
- let msg = EditorCommand::CanRedo { ret };
- let _ = self.edit_cmd_tx.send(msg).await;
- rx.await.unwrap_or(false)
- }
- pub async fn undo(&self) -> Result<(), FlowyError> {
- let (ret, rx) = oneshot::channel();
- let msg = EditorCommand::Undo { ret };
- let _ = self.edit_cmd_tx.send(msg).await;
- let _ = rx.await.map_err(internal_error)??;
- Ok(())
- }
- pub async fn redo(&self) -> Result<(), FlowyError> {
- let (ret, rx) = oneshot::channel();
- let msg = EditorCommand::Redo { ret };
- let _ = self.edit_cmd_tx.send(msg).await;
- let _ = rx.await.map_err(internal_error)??;
- Ok(())
- }
- pub async fn block_json(&self) -> FlowyResult<String> {
- let (ret, rx) = oneshot::channel::<CollaborateResult<String>>();
- let msg = EditorCommand::ReadBlockJson { ret };
- let _ = self.edit_cmd_tx.send(msg).await;
- let json = rx.await.map_err(internal_error)??;
- Ok(json)
- }
- #[tracing::instrument(level = "trace", skip(self, data), err)]
- pub(crate) async fn compose_local_delta(&self, data: Bytes) -> Result<(), FlowyError> {
- let delta = RichTextDelta::from_bytes(&data)?;
- let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
- let msg = EditorCommand::ComposeLocalDelta {
- delta: delta.clone(),
- ret,
- };
- let _ = self.edit_cmd_tx.send(msg).await;
- let _ = rx.await.map_err(internal_error)??;
- Ok(())
- }
- pub fn stop(&self) {
- self.ws_manager.stop();
- }
- pub(crate) async fn receive_ws_data(&self, data: ServerRevisionWSData) -> Result<(), FlowyError> {
- self.ws_manager.receive_ws_data(data).await
- }
- pub(crate) fn receive_ws_state(&self, state: &WSConnectState) {
- self.ws_manager.connect_state_changed(state.clone());
- }
- }
- impl std::ops::Drop for ClientBlockEditor {
- fn drop(&mut self) {
- tracing::trace!("{} ClientBlockEditor was dropped", self.doc_id)
- }
- }
- // The edit queue will exit after the EditorCommandSender was dropped.
- fn spawn_edit_queue(
- user: Arc<dyn BlockUser>,
- rev_manager: Arc<RevisionManager>,
- delta: RichTextDelta,
- ) -> EditorCommandSender {
- let (sender, receiver) = mpsc::channel(1000);
- let edit_queue = EditBlockQueue::new(user, rev_manager, delta, receiver);
- tokio::spawn(edit_queue.run());
- sender
- }
- #[cfg(feature = "flowy_unit_test")]
- impl ClientBlockEditor {
- pub async fn doc_json(&self) -> FlowyResult<String> {
- let (ret, rx) = oneshot::channel::<CollaborateResult<String>>();
- let msg = EditorCommand::ReadBlockJson { ret };
- let _ = self.edit_cmd_tx.send(msg).await;
- let s = rx.await.map_err(internal_error)??;
- Ok(s)
- }
- pub async fn doc_delta(&self) -> FlowyResult<RichTextDelta> {
- let (ret, rx) = oneshot::channel::<CollaborateResult<RichTextDelta>>();
- let msg = EditorCommand::ReadBlockDelta { ret };
- let _ = self.edit_cmd_tx.send(msg).await;
- let delta = rx.await.map_err(internal_error)??;
- Ok(delta)
- }
- pub fn rev_manager(&self) -> Arc<RevisionManager> {
- self.rev_manager.clone()
- }
- }
- struct BlockInfoBuilder();
- impl RevisionObjectBuilder for BlockInfoBuilder {
- type Output = BlockInfo;
- fn build_object(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
- let (base_rev_id, rev_id) = revisions.last().unwrap().pair_rev_id();
- let mut delta = make_delta_from_revisions(revisions)?;
- correct_delta(&mut delta);
- Result::<BlockInfo, FlowyError>::Ok(BlockInfo {
- block_id: object_id.to_owned(),
- text: delta.to_delta_json(),
- rev_id,
- base_rev_id,
- })
- }
- }
- // quill-editor requires the delta should end with '\n' and only contains the
- // insert operation. The function, correct_delta maybe be removed in the future.
- fn correct_delta(delta: &mut RichTextDelta) {
- if let Some(op) = delta.ops.last() {
- let op_data = op.get_data();
- if !op_data.ends_with('\n') {
- tracing::warn!("The document must end with newline. Correcting it by inserting newline op");
- delta.ops.push(Operation::Insert("\n".into()));
- }
- }
- if let Some(op) = delta.ops.iter().find(|op| !op.is_insert()) {
- tracing::warn!("The document can only contains insert operations, but found {:?}", op);
- delta.ops.retain(|op| op.is_insert());
- }
- }
|