|  | @@ -1,10 +1,8 @@
 | 
	
		
			
				|  |  |  use crate::{
 | 
	
		
			
				|  |  | -    core::{
 | 
	
		
			
				|  |  | -        document::Document,
 | 
	
		
			
				|  |  | -        sync::{RevisionSynchronizer, RevisionUser},
 | 
	
		
			
				|  |  | -    },
 | 
	
		
			
				|  |  | +    document::Document,
 | 
	
		
			
				|  |  |      entities::{doc::DocumentInfo, revision::Revision},
 | 
	
		
			
				|  |  |      errors::{internal_error, CollaborateError, CollaborateResult},
 | 
	
		
			
				|  |  | +    sync::{RevisionSynchronizer, RevisionUser},
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  use async_stream::stream;
 | 
	
		
			
				|  |  |  use dashmap::DashMap;
 | 
	
	
		
			
				|  | @@ -96,16 +94,22 @@ impl ServerDocumentManager {
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  struct OpenDocHandle {
 | 
	
		
			
				|  |  | -    sender: mpsc::Sender<EditCommand>,
 | 
	
		
			
				|  |  | +    sender: mpsc::Sender<DocumentCommand>,
 | 
	
		
			
				|  |  |      persistence: Arc<dyn DocumentPersistence>,
 | 
	
		
			
				|  |  | +    users: DashMap<String, Arc<dyn RevisionUser>>,
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  impl OpenDocHandle {
 | 
	
		
			
				|  |  |      fn new(doc: DocumentInfo, persistence: Arc<dyn DocumentPersistence>) -> Result<Self, CollaborateError> {
 | 
	
		
			
				|  |  |          let (sender, receiver) = mpsc::channel(100);
 | 
	
		
			
				|  |  | -        let queue = EditCommandQueue::new(receiver, doc)?;
 | 
	
		
			
				|  |  | +        let users = DashMap::new();
 | 
	
		
			
				|  |  | +        let queue = DocumentCommandQueue::new(receiver, doc)?;
 | 
	
		
			
				|  |  |          tokio::task::spawn(queue.run());
 | 
	
		
			
				|  |  | -        Ok(Self { sender, persistence })
 | 
	
		
			
				|  |  | +        Ok(Self {
 | 
	
		
			
				|  |  | +            sender,
 | 
	
		
			
				|  |  | +            persistence,
 | 
	
		
			
				|  |  | +            users,
 | 
	
		
			
				|  |  | +        })
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      async fn apply_revisions(
 | 
	
	
		
			
				|  | @@ -115,7 +119,8 @@ impl OpenDocHandle {
 | 
	
		
			
				|  |  |      ) -> Result<(), CollaborateError> {
 | 
	
		
			
				|  |  |          let (ret, rx) = oneshot::channel();
 | 
	
		
			
				|  |  |          let persistence = self.persistence.clone();
 | 
	
		
			
				|  |  | -        let msg = EditCommand::ApplyRevisions {
 | 
	
		
			
				|  |  | +        self.users.insert(user.user_id(), user.clone());
 | 
	
		
			
				|  |  | +        let msg = DocumentCommand::ApplyRevisions {
 | 
	
		
			
				|  |  |              user,
 | 
	
		
			
				|  |  |              revisions,
 | 
	
		
			
				|  |  |              persistence,
 | 
	
	
		
			
				|  | @@ -127,11 +132,11 @@ impl OpenDocHandle {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      pub async fn document_json(&self) -> CollaborateResult<String> {
 | 
	
		
			
				|  |  |          let (ret, rx) = oneshot::channel();
 | 
	
		
			
				|  |  | -        let msg = EditCommand::GetDocumentJson { ret };
 | 
	
		
			
				|  |  | +        let msg = DocumentCommand::GetDocumentJson { ret };
 | 
	
		
			
				|  |  |          self.send(msg, rx).await?
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    async fn send<T>(&self, msg: EditCommand, rx: oneshot::Receiver<T>) -> CollaborateResult<T> {
 | 
	
		
			
				|  |  | +    async fn send<T>(&self, msg: DocumentCommand, rx: oneshot::Receiver<T>) -> CollaborateResult<T> {
 | 
	
		
			
				|  |  |          let _ = self.sender.send(msg).await.map_err(internal_error)?;
 | 
	
		
			
				|  |  |          let result = rx.await.map_err(internal_error)?;
 | 
	
		
			
				|  |  |          Ok(result)
 | 
	
	
		
			
				|  | @@ -139,7 +144,7 @@ impl OpenDocHandle {
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  #[derive(Debug)]
 | 
	
		
			
				|  |  | -enum EditCommand {
 | 
	
		
			
				|  |  | +enum DocumentCommand {
 | 
	
		
			
				|  |  |      ApplyRevisions {
 | 
	
		
			
				|  |  |          user: Arc<dyn RevisionUser>,
 | 
	
		
			
				|  |  |          revisions: Vec<Revision>,
 | 
	
	
		
			
				|  | @@ -151,17 +156,15 @@ enum EditCommand {
 | 
	
		
			
				|  |  |      },
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -struct EditCommandQueue {
 | 
	
		
			
				|  |  | +struct DocumentCommandQueue {
 | 
	
		
			
				|  |  |      pub doc_id: String,
 | 
	
		
			
				|  |  | -    receiver: Option<mpsc::Receiver<EditCommand>>,
 | 
	
		
			
				|  |  | +    receiver: Option<mpsc::Receiver<DocumentCommand>>,
 | 
	
		
			
				|  |  |      synchronizer: Arc<RevisionSynchronizer>,
 | 
	
		
			
				|  |  | -    users: DashMap<String, Arc<dyn RevisionUser>>,
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -impl EditCommandQueue {
 | 
	
		
			
				|  |  | -    fn new(receiver: mpsc::Receiver<EditCommand>, doc: DocumentInfo) -> Result<Self, CollaborateError> {
 | 
	
		
			
				|  |  | +impl DocumentCommandQueue {
 | 
	
		
			
				|  |  | +    fn new(receiver: mpsc::Receiver<DocumentCommand>, doc: DocumentInfo) -> Result<Self, CollaborateError> {
 | 
	
		
			
				|  |  |          let delta = RichTextDelta::from_bytes(&doc.text)?;
 | 
	
		
			
				|  |  | -        let users = DashMap::new();
 | 
	
		
			
				|  |  |          let synchronizer = Arc::new(RevisionSynchronizer::new(
 | 
	
		
			
				|  |  |              &doc.id,
 | 
	
		
			
				|  |  |              doc.rev_id,
 | 
	
	
		
			
				|  | @@ -172,7 +175,6 @@ impl EditCommandQueue {
 | 
	
		
			
				|  |  |              doc_id: doc.id,
 | 
	
		
			
				|  |  |              receiver: Some(receiver),
 | 
	
		
			
				|  |  |              synchronizer,
 | 
	
		
			
				|  |  | -            users,
 | 
	
		
			
				|  |  |          })
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -193,22 +195,21 @@ impl EditCommandQueue {
 | 
	
		
			
				|  |  |          stream.for_each(|msg| self.handle_message(msg)).await;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    async fn handle_message(&self, msg: EditCommand) {
 | 
	
		
			
				|  |  | +    async fn handle_message(&self, msg: DocumentCommand) {
 | 
	
		
			
				|  |  |          match msg {
 | 
	
		
			
				|  |  | -            EditCommand::ApplyRevisions {
 | 
	
		
			
				|  |  | +            DocumentCommand::ApplyRevisions {
 | 
	
		
			
				|  |  |                  user,
 | 
	
		
			
				|  |  |                  revisions,
 | 
	
		
			
				|  |  |                  persistence,
 | 
	
		
			
				|  |  |                  ret,
 | 
	
		
			
				|  |  |              } => {
 | 
	
		
			
				|  |  | -                self.users.insert(user.user_id(), user.clone());
 | 
	
		
			
				|  |  |                  self.synchronizer
 | 
	
		
			
				|  |  |                      .apply_revisions(user, revisions, persistence)
 | 
	
		
			
				|  |  |                      .await
 | 
	
		
			
				|  |  |                      .unwrap();
 | 
	
		
			
				|  |  |                  let _ = ret.send(Ok(()));
 | 
	
		
			
				|  |  |              },
 | 
	
		
			
				|  |  | -            EditCommand::GetDocumentJson { ret } => {
 | 
	
		
			
				|  |  | +            DocumentCommand::GetDocumentJson { ret } => {
 | 
	
		
			
				|  |  |                  let synchronizer = self.synchronizer.clone();
 | 
	
		
			
				|  |  |                  let json = spawn_blocking(move || synchronizer.doc_json())
 | 
	
		
			
				|  |  |                      .await
 |