123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220 |
- use crate::service::{
- util::md5,
- ws::{entities::Socket, WsClientData, WsMessageAdaptor, WsUser},
- };
- use byteorder::{BigEndian, WriteBytesExt};
- use bytes::Bytes;
- use dashmap::DashMap;
- use flowy_document::{
- entities::ws::{WsDataType, WsDocumentData},
- protobuf::{Doc, RevType, Revision, RevisionRange, UpdateDocParams},
- services::doc::Document,
- };
- use flowy_net::errors::{internal_error, ServerError};
- use flowy_ot::{
- core::{Delta, OperationTransformable},
- errors::OTError,
- };
- use flowy_ws::WsMessage;
- use parking_lot::RwLock;
- use protobuf::Message;
- use std::{
- convert::TryInto,
- sync::{
- atomic::{AtomicI64, Ordering::SeqCst},
- Arc,
- },
- time::Duration,
- };
- struct EditUser {
- user: Arc<WsUser>,
- socket: Socket,
- }
- pub struct EditDocContext {
- doc_id: String,
- rev_id: AtomicI64,
- document: Arc<RwLock<Document>>,
- users: DashMap<String, EditUser>,
- }
- impl EditDocContext {
- pub fn new(doc: Doc) -> Result<Self, ServerError> {
- let delta = Delta::from_bytes(&doc.data).map_err(internal_error)?;
- let document = Arc::new(RwLock::new(Document::from_delta(delta)));
- let users = DashMap::new();
- Ok(Self {
- doc_id: doc.id.clone(),
- rev_id: AtomicI64::new(doc.rev_id),
- document,
- users,
- })
- }
- pub fn doc_json(&self) -> String { self.document.read().to_json() }
- #[tracing::instrument(level = "debug", skip(self, client_data, revision))]
- pub async fn apply_revision(&self, client_data: WsClientData, revision: Revision) -> Result<(), ServerError> {
- let _ = self.verify_md5(&revision)?;
- // Opti: find out another way to keep the user socket available.
- let user = EditUser {
- user: client_data.user.clone(),
- socket: client_data.socket.clone(),
- };
- self.users.insert(client_data.user.id().to_owned(), user);
- log::debug!(
- "cur_base_rev_id: {}, expect_base_rev_id: {} rev_id: {}",
- self.rev_id.load(SeqCst),
- revision.base_rev_id,
- revision.rev_id
- );
- let cli_socket = client_data.socket;
- let cur_rev_id = self.rev_id.load(SeqCst);
- if cur_rev_id > revision.rev_id {
- // The client document is outdated. Transform the client revision delta and then
- // send the prime delta to the client. Client should compose the this prime
- // delta.
- let (cli_prime, server_prime) = self.transform(&revision.delta_data).map_err(internal_error)?;
- let _ = self.update_document_delta(server_prime)?;
- log::debug!("{} client delta: {}", self.doc_id, cli_prime.to_json());
- let cli_revision = self.mk_revision(revision.rev_id, cli_prime);
- let ws_cli_revision = mk_push_rev_ws_message(&self.doc_id, cli_revision);
- cli_socket.do_send(ws_cli_revision).map_err(internal_error)?;
- Ok(())
- } else if cur_rev_id < revision.rev_id {
- if cur_rev_id != revision.base_rev_id {
- // The server document is outdated, try to get the missing revision from the
- // client.
- cli_socket
- .do_send(mk_pull_rev_ws_message(&self.doc_id, cur_rev_id, revision.rev_id))
- .map_err(internal_error)?;
- } else {
- let delta = Delta::from_bytes(&revision.delta_data).map_err(internal_error)?;
- let _ = self.update_document_delta(delta)?;
- cli_socket
- .do_send(mk_acked_ws_message(&revision))
- .map_err(internal_error)?;
- self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(revision.rev_id));
- let _ = self.save_revision(&revision).await?;
- }
- Ok(())
- } else {
- log::error!("Client rev_id should not equal to server rev_id");
- Ok(())
- }
- }
- fn mk_revision(&self, base_rev_id: i64, delta: Delta) -> Revision {
- let delta_data = delta.to_bytes().to_vec();
- let md5 = md5(&delta_data);
- let revision = Revision {
- base_rev_id,
- rev_id: self.rev_id.load(SeqCst),
- delta_data,
- md5,
- doc_id: self.doc_id.to_string(),
- ty: RevType::Remote,
- ..Default::default()
- };
- revision
- }
- #[tracing::instrument(level = "debug", skip(self, delta_data))]
- fn transform(&self, delta_data: &Vec<u8>) -> Result<(Delta, Delta), OTError> {
- log::debug!("Document: {}", self.document.read().to_json());
- let doc_delta = self.document.read().delta().clone();
- let cli_delta = Delta::from_bytes(delta_data)?;
- log::debug!("Compose delta: {}", cli_delta);
- let (cli_prime, server_prime) = doc_delta.transform(&cli_delta)?;
- Ok((cli_prime, server_prime))
- }
- #[tracing::instrument(level = "debug", skip(self, delta))]
- fn update_document_delta(&self, delta: Delta) -> Result<(), ServerError> {
- // Opti: push each revision into queue and process it one by one.
- match self.document.try_write_for(Duration::from_millis(300)) {
- None => {
- log::error!("Failed to acquire write lock of document");
- },
- Some(mut write_guard) => {
- let _ = write_guard.compose_delta(&delta).map_err(internal_error)?;
- log::debug!("Document: {}", write_guard.to_json());
- },
- }
- Ok(())
- }
- fn verify_md5(&self, revision: &Revision) -> Result<(), ServerError> {
- if md5(&revision.delta_data) != revision.md5 {
- return Err(ServerError::internal().context("Delta md5 not match"));
- }
- Ok(())
- }
- #[tracing::instrument(level = "debug", skip(self, revision))]
- async fn save_revision(&self, revision: &Revision) -> Result<(), ServerError> {
- // Opti: save with multiple revisions
- let mut params = UpdateDocParams::new();
- params.set_doc_id(self.doc_id.clone());
- params.set_data(self.document.read().to_json());
- params.set_rev_id(revision.rev_id);
- // let _ = update_doc(self.pg_pool.get_ref(), params).await?;
- Ok(())
- }
- }
- fn mk_push_rev_ws_message(doc_id: &str, revision: Revision) -> WsMessageAdaptor {
- let bytes = revision.write_to_bytes().unwrap();
- let data = WsDocumentData {
- id: doc_id.to_string(),
- ty: WsDataType::PushRev,
- data: bytes,
- };
- mk_ws_message(data)
- }
- fn mk_pull_rev_ws_message(doc_id: &str, from_rev_id: i64, to_rev_id: i64) -> WsMessageAdaptor {
- let range = RevisionRange {
- doc_id: doc_id.to_string(),
- from_rev_id,
- to_rev_id,
- ..Default::default()
- };
- let bytes = range.write_to_bytes().unwrap();
- let data = WsDocumentData {
- id: doc_id.to_string(),
- ty: WsDataType::PullRev,
- data: bytes,
- };
- mk_ws_message(data)
- }
- fn mk_acked_ws_message(revision: &Revision) -> WsMessageAdaptor {
- let mut wtr = vec![];
- let _ = wtr.write_i64::<BigEndian>(revision.rev_id);
- let data = WsDocumentData {
- id: revision.doc_id.clone(),
- ty: WsDataType::Acked,
- data: wtr,
- };
- mk_ws_message(data)
- }
- fn mk_ws_message<T: Into<WsMessage>>(data: T) -> WsMessageAdaptor {
- let msg: WsMessage = data.into();
- let bytes: Bytes = msg.try_into().unwrap();
- WsMessageAdaptor(bytes)
- }
|