123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333 |
- use crate::entities::revision::{RepeatedRevision, Revision};
- use crate::{
- entities::{
- folder::{FolderDelta, FolderInfo},
- ws_data::ServerRevisionWSDataBuilder,
- },
- errors::{internal_error, CollaborateError, CollaborateResult},
- protobuf::ClientRevisionWSData,
- server_folder::folder_pad::ServerFolder,
- synchronizer::{RevisionSyncPersistence, RevisionSyncResponse, RevisionSynchronizer, RevisionUser},
- util::rev_id_from_str,
- };
- use async_stream::stream;
- use futures::stream::StreamExt;
- use lib_infra::future::BoxResultFuture;
- use lib_ot::core::PhantomAttributes;
- use std::{collections::HashMap, fmt::Debug, sync::Arc};
- use tokio::{
- sync::{mpsc, oneshot, RwLock},
- task::spawn_blocking,
- };
- pub trait FolderCloudPersistence: Send + Sync + Debug {
- fn read_folder(&self, user_id: &str, folder_id: &str) -> BoxResultFuture<FolderInfo, CollaborateError>;
- fn create_folder(
- &self,
- user_id: &str,
- folder_id: &str,
- repeated_revision: RepeatedRevision,
- ) -> BoxResultFuture<Option<FolderInfo>, CollaborateError>;
- fn save_folder_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError>;
- fn read_folder_revisions(
- &self,
- folder_id: &str,
- rev_ids: Option<Vec<i64>>,
- ) -> BoxResultFuture<Vec<Revision>, CollaborateError>;
- fn reset_folder(
- &self,
- folder_id: &str,
- repeated_revision: RepeatedRevision,
- ) -> BoxResultFuture<(), CollaborateError>;
- }
- impl RevisionSyncPersistence for Arc<dyn FolderCloudPersistence> {
- fn read_revisions(
- &self,
- object_id: &str,
- rev_ids: Option<Vec<i64>>,
- ) -> BoxResultFuture<Vec<Revision>, CollaborateError> {
- (**self).read_folder_revisions(object_id, rev_ids)
- }
- fn save_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError> {
- (**self).save_folder_revisions(repeated_revision)
- }
- fn reset_object(
- &self,
- object_id: &str,
- repeated_revision: RepeatedRevision,
- ) -> BoxResultFuture<(), CollaborateError> {
- (**self).reset_folder(object_id, repeated_revision)
- }
- }
- pub struct ServerFolderManager {
- folder_handlers: Arc<RwLock<HashMap<String, Arc<OpenFolderHandler>>>>,
- persistence: Arc<dyn FolderCloudPersistence>,
- }
- impl ServerFolderManager {
- pub fn new(persistence: Arc<dyn FolderCloudPersistence>) -> Self {
- Self {
- folder_handlers: Arc::new(RwLock::new(HashMap::new())),
- persistence,
- }
- }
- pub async fn handle_client_revisions(
- &self,
- user: Arc<dyn RevisionUser>,
- mut client_data: ClientRevisionWSData,
- ) -> Result<(), CollaborateError> {
- let repeated_revision: RepeatedRevision = client_data.take_revisions().into();
- let cloned_user = user.clone();
- let ack_id = rev_id_from_str(&client_data.data_id)?;
- let folder_id = client_data.object_id;
- let user_id = user.user_id();
- let result = match self.get_folder_handler(&user_id, &folder_id).await {
- None => {
- let _ = self
- .create_folder(&user_id, &folder_id, repeated_revision)
- .await
- .map_err(|e| CollaborateError::internal().context(format!("Server create folder failed: {}", e)))?;
- Ok(())
- }
- Some(handler) => {
- let _ = handler.apply_revisions(user, repeated_revision).await?;
- Ok(())
- }
- };
- if result.is_ok() {
- cloned_user.receive(RevisionSyncResponse::Ack(
- ServerRevisionWSDataBuilder::build_ack_message(&folder_id, ack_id),
- ));
- }
- result
- }
- pub async fn handle_client_ping(
- &self,
- user: Arc<dyn RevisionUser>,
- client_data: ClientRevisionWSData,
- ) -> Result<(), CollaborateError> {
- let user_id = user.user_id();
- let rev_id = rev_id_from_str(&client_data.data_id)?;
- let folder_id = client_data.object_id.clone();
- match self.get_folder_handler(&user_id, &folder_id).await {
- None => {
- tracing::trace!("Folder:{} doesn't exist, ignore client ping", folder_id);
- Ok(())
- }
- Some(handler) => {
- let _ = handler.apply_ping(rev_id, user).await?;
- Ok(())
- }
- }
- }
- async fn get_folder_handler(&self, user_id: &str, folder_id: &str) -> Option<Arc<OpenFolderHandler>> {
- let folder_id = folder_id.to_owned();
- if let Some(handler) = self.folder_handlers.read().await.get(&folder_id).cloned() {
- return Some(handler);
- }
- let mut write_guard = self.folder_handlers.write().await;
- match self.persistence.read_folder(user_id, &folder_id).await {
- Ok(folder_info) => {
- let handler = self
- .create_folder_handler(folder_info)
- .await
- .map_err(internal_error)
- .unwrap();
- write_guard.insert(folder_id, handler.clone());
- drop(write_guard);
- Some(handler)
- }
- Err(_) => None,
- }
- }
- async fn create_folder_handler(&self, folder_info: FolderInfo) -> Result<Arc<OpenFolderHandler>, CollaborateError> {
- let persistence = self.persistence.clone();
- let handle = spawn_blocking(|| OpenFolderHandler::new(folder_info, persistence))
- .await
- .map_err(|e| CollaborateError::internal().context(format!("Create folder handler failed: {}", e)))?;
- Ok(Arc::new(handle?))
- }
- #[tracing::instrument(level = "debug", skip(self, repeated_revision), err)]
- async fn create_folder(
- &self,
- user_id: &str,
- folder_id: &str,
- repeated_revision: RepeatedRevision,
- ) -> Result<Arc<OpenFolderHandler>, CollaborateError> {
- match self
- .persistence
- .create_folder(user_id, folder_id, repeated_revision)
- .await?
- {
- Some(folder_info) => {
- let handler = self.create_folder_handler(folder_info).await?;
- self.folder_handlers
- .write()
- .await
- .insert(folder_id.to_owned(), handler.clone());
- Ok(handler)
- }
- None => Err(CollaborateError::internal().context(String::new())),
- }
- }
- }
- type FolderRevisionSynchronizer = RevisionSynchronizer<PhantomAttributes>;
- struct OpenFolderHandler {
- folder_id: String,
- sender: mpsc::Sender<FolderCommand>,
- }
- impl OpenFolderHandler {
- fn new(folder_info: FolderInfo, persistence: Arc<dyn FolderCloudPersistence>) -> CollaborateResult<Self> {
- let (sender, receiver) = mpsc::channel(1000);
- let folder_id = folder_info.folder_id.clone();
- let delta = FolderDelta::from_bytes(&folder_info.text)?;
- let sync_object = ServerFolder::from_delta(&folder_id, delta);
- let synchronizer = Arc::new(FolderRevisionSynchronizer::new(
- folder_info.rev_id,
- sync_object,
- persistence,
- ));
- let queue = FolderCommandRunner::new(&folder_id, receiver, synchronizer);
- tokio::task::spawn(queue.run());
- Ok(Self { folder_id, sender })
- }
- #[tracing::instrument(
- name = "server_folder_apply_revision",
- level = "trace",
- skip(self, user, repeated_revision),
- err
- )]
- async fn apply_revisions(
- &self,
- user: Arc<dyn RevisionUser>,
- repeated_revision: RepeatedRevision,
- ) -> CollaborateResult<()> {
- let (ret, rx) = oneshot::channel();
- let msg = FolderCommand::ApplyRevisions {
- user,
- repeated_revision,
- ret,
- };
- self.send(msg, rx).await?
- }
- async fn apply_ping(&self, rev_id: i64, user: Arc<dyn RevisionUser>) -> Result<(), CollaborateError> {
- let (ret, rx) = oneshot::channel();
- let msg = FolderCommand::Ping { user, rev_id, ret };
- self.send(msg, rx).await?
- }
- async fn send<T>(&self, msg: FolderCommand, rx: oneshot::Receiver<T>) -> CollaborateResult<T> {
- let _ = self
- .sender
- .send(msg)
- .await
- .map_err(|e| CollaborateError::internal().context(format!("Send folder command failed: {}", e)))?;
- Ok(rx.await.map_err(internal_error)?)
- }
- }
- impl std::ops::Drop for OpenFolderHandler {
- fn drop(&mut self) {
- tracing::trace!("{} OpenFolderHandler was dropped", self.folder_id);
- }
- }
- enum FolderCommand {
- ApplyRevisions {
- user: Arc<dyn RevisionUser>,
- repeated_revision: RepeatedRevision,
- ret: oneshot::Sender<CollaborateResult<()>>,
- },
- Ping {
- user: Arc<dyn RevisionUser>,
- rev_id: i64,
- ret: oneshot::Sender<CollaborateResult<()>>,
- },
- }
- struct FolderCommandRunner {
- folder_id: String,
- receiver: Option<mpsc::Receiver<FolderCommand>>,
- synchronizer: Arc<FolderRevisionSynchronizer>,
- }
- impl FolderCommandRunner {
- fn new(
- folder_id: &str,
- receiver: mpsc::Receiver<FolderCommand>,
- synchronizer: Arc<FolderRevisionSynchronizer>,
- ) -> Self {
- Self {
- folder_id: folder_id.to_owned(),
- receiver: Some(receiver),
- synchronizer,
- }
- }
- async fn run(mut self) {
- let mut receiver = self
- .receiver
- .take()
- .expect("FolderCommandRunner's receiver should only take one time");
- let stream = stream! {
- loop {
- match receiver.recv().await {
- Some(msg) => yield msg,
- None => break,
- }
- }
- };
- stream.for_each(|msg| self.handle_message(msg)).await;
- }
- async fn handle_message(&self, msg: FolderCommand) {
- match msg {
- FolderCommand::ApplyRevisions {
- user,
- repeated_revision,
- ret,
- } => {
- let result = self
- .synchronizer
- .sync_revisions(user, repeated_revision)
- .await
- .map_err(internal_error);
- let _ = ret.send(result);
- }
- FolderCommand::Ping { user, rev_id, ret } => {
- let result = self.synchronizer.pong(user, rev_id).await.map_err(internal_error);
- let _ = ret.send(result);
- }
- }
- }
- }
- impl std::ops::Drop for FolderCommandRunner {
- fn drop(&mut self) {
- tracing::trace!("{} FolderCommandRunner was dropped", self.folder_id);
- }
- }
|