123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- use crate::{
- context::FlowyPersistence,
- services::{
- document::{
- persistence::{create_document, read_document, revisions_to_key_value_items, DocumentKVPersistence},
- ws_actor::{DocumentWebSocketActor, WSActorMessage},
- },
- web_socket::{WSClientData, WebSocketReceiver},
- },
- };
- use backend_service::errors::ServerError;
- use flowy_collaboration::{
- entities::doc::DocumentInfo,
- errors::CollaborateError,
- protobuf::{
- CreateDocParams as CreateDocParamsPB,
- DocumentId,
- RepeatedRevision as RepeatedRevisionPB,
- Revision as RevisionPB,
- },
- server_document::{DocumentCloudPersistence, ServerDocumentManager},
- util::repeated_revision_from_repeated_revision_pb,
- };
- use lib_infra::future::BoxResultFuture;
- use std::{
- convert::TryInto,
- fmt::{Debug, Formatter},
- sync::Arc,
- };
- use tokio::sync::{mpsc, oneshot};
- pub fn make_document_ws_receiver(
- persistence: Arc<FlowyPersistence>,
- document_manager: Arc<ServerDocumentManager>,
- ) -> Arc<DocumentWebSocketReceiver> {
- let (ws_sender, rx) = tokio::sync::mpsc::channel(100);
- let actor = DocumentWebSocketActor::new(rx, document_manager);
- tokio::task::spawn(actor.run());
- Arc::new(DocumentWebSocketReceiver::new(persistence, ws_sender))
- }
- pub struct DocumentWebSocketReceiver {
- ws_sender: mpsc::Sender<WSActorMessage>,
- persistence: Arc<FlowyPersistence>,
- }
- impl DocumentWebSocketReceiver {
- pub fn new(persistence: Arc<FlowyPersistence>, ws_sender: mpsc::Sender<WSActorMessage>) -> Self {
- Self { ws_sender, persistence }
- }
- }
- impl WebSocketReceiver for DocumentWebSocketReceiver {
- fn receive(&self, data: WSClientData) {
- let (ret, rx) = oneshot::channel();
- let sender = self.ws_sender.clone();
- let persistence = self.persistence.clone();
- actix_rt::spawn(async move {
- let msg = WSActorMessage::ClientData {
- client_data: data,
- persistence,
- ret,
- };
- match sender.send(msg).await {
- Ok(_) => {},
- Err(e) => log::error!("{}", e),
- }
- match rx.await {
- Ok(_) => {},
- Err(e) => log::error!("{:?}", e),
- };
- });
- }
- }
- pub struct HttpDocumentCloudPersistence(pub Arc<DocumentKVPersistence>);
- impl Debug for HttpDocumentCloudPersistence {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("DocumentPersistenceImpl") }
- }
- impl DocumentCloudPersistence for HttpDocumentCloudPersistence {
- fn enable_sync(&self) -> bool { true }
- fn read_document(&self, doc_id: &str) -> BoxResultFuture<DocumentInfo, CollaborateError> {
- let params = DocumentId {
- doc_id: doc_id.to_string(),
- ..Default::default()
- };
- let kv_store = self.0.clone();
- Box::pin(async move {
- let mut pb_doc = read_document(&kv_store, params)
- .await
- .map_err(server_error_to_collaborate_error)?;
- let doc = (&mut pb_doc)
- .try_into()
- .map_err(|e| CollaborateError::internal().context(e))?;
- Ok(doc)
- })
- }
- fn create_document(
- &self,
- doc_id: &str,
- repeated_revision: RepeatedRevisionPB,
- ) -> BoxResultFuture<DocumentInfo, CollaborateError> {
- let kv_store = self.0.clone();
- let doc_id = doc_id.to_owned();
- Box::pin(async move {
- let revisions = repeated_revision_from_repeated_revision_pb(repeated_revision.clone())?.into_inner();
- let doc = DocumentInfo::from_revisions(&doc_id, revisions)?;
- let doc_id = doc_id.to_owned();
- let mut params = CreateDocParamsPB::new();
- params.set_id(doc_id);
- params.set_revisions(repeated_revision);
- let _ = create_document(&kv_store, params)
- .await
- .map_err(server_error_to_collaborate_error)?;
- Ok(doc)
- })
- }
- fn read_revisions(
- &self,
- doc_id: &str,
- rev_ids: Option<Vec<i64>>,
- ) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError> {
- let kv_store = self.0.clone();
- let doc_id = doc_id.to_owned();
- let f = || async move {
- let mut repeated_revision = kv_store.get_revisions(&doc_id, rev_ids).await?;
- Ok(repeated_revision.take_items().into())
- };
- Box::pin(async move { f().await.map_err(server_error_to_collaborate_error) })
- }
- fn save_revisions(&self, mut repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> {
- let kv_store = self.0.clone();
- let f = || async move {
- let revisions = repeated_revision.take_items().into();
- let _ = kv_store.set_revision(revisions).await?;
- Ok(())
- };
- Box::pin(async move { f().await.map_err(server_error_to_collaborate_error) })
- }
- fn reset_document(
- &self,
- doc_id: &str,
- mut repeated_revision: RepeatedRevisionPB,
- ) -> BoxResultFuture<(), CollaborateError> {
- let kv_store = self.0.clone();
- let doc_id = doc_id.to_owned();
- let f = || async move {
- kv_store
- .transaction(|mut transaction| {
- Box::pin(async move {
- let _ = transaction.batch_delete_key_start_with(&doc_id).await?;
- let items = revisions_to_key_value_items(repeated_revision.take_items().into())?;
- let _ = transaction.batch_set(items).await?;
- Ok(())
- })
- })
- .await
- };
- Box::pin(async move { f().await.map_err(server_error_to_collaborate_error) })
- }
- }
- fn server_error_to_collaborate_error(error: ServerError) -> CollaborateError {
- if error.is_record_not_found() {
- CollaborateError::record_not_found()
- } else {
- CollaborateError::internal().context(error)
- }
- }
|