edit_doc.rs 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. use crate::service::{
  2. util::md5,
  3. ws::{entities::Socket, WsClientData, WsMessageAdaptor, WsUser},
  4. };
  5. use byteorder::{BigEndian, WriteBytesExt};
  6. use bytes::Bytes;
  7. use dashmap::DashMap;
  8. use flowy_document::{
  9. entities::ws::{WsDataType, WsDocumentData},
  10. protobuf::{Doc, RevType, Revision, RevisionRange, UpdateDocParams},
  11. services::doc::Document,
  12. };
  13. use flowy_net::errors::{internal_error, ServerError};
  14. use flowy_ot::{
  15. core::{Delta, OperationTransformable},
  16. errors::OTError,
  17. };
  18. use flowy_ws::WsMessage;
  19. use parking_lot::RwLock;
  20. use protobuf::Message;
  21. use std::{
  22. convert::TryInto,
  23. sync::{
  24. atomic::{AtomicI64, Ordering::SeqCst},
  25. Arc,
  26. },
  27. time::Duration,
  28. };
  29. struct EditUser {
  30. user: Arc<WsUser>,
  31. socket: Socket,
  32. }
  33. pub struct EditDocContext {
  34. doc_id: String,
  35. rev_id: AtomicI64,
  36. document: Arc<RwLock<Document>>,
  37. users: DashMap<String, EditUser>,
  38. }
  39. impl EditDocContext {
  40. pub fn new(doc: Doc) -> Result<Self, ServerError> {
  41. let delta = Delta::from_bytes(&doc.data).map_err(internal_error)?;
  42. let document = Arc::new(RwLock::new(Document::from_delta(delta)));
  43. let users = DashMap::new();
  44. Ok(Self {
  45. doc_id: doc.id.clone(),
  46. rev_id: AtomicI64::new(doc.rev_id),
  47. document,
  48. users,
  49. })
  50. }
  51. pub fn doc_json(&self) -> String { self.document.read().to_json() }
  52. #[tracing::instrument(level = "debug", skip(self, client_data, revision))]
  53. pub async fn apply_revision(&self, client_data: WsClientData, revision: Revision) -> Result<(), ServerError> {
  54. let _ = self.verify_md5(&revision)?;
  55. // Opti: find out another way to keep the user socket available.
  56. let user = EditUser {
  57. user: client_data.user.clone(),
  58. socket: client_data.socket.clone(),
  59. };
  60. self.users.insert(client_data.user.id().to_owned(), user);
  61. log::debug!(
  62. "cur_base_rev_id: {}, expect_base_rev_id: {} rev_id: {}",
  63. self.rev_id.load(SeqCst),
  64. revision.base_rev_id,
  65. revision.rev_id
  66. );
  67. let cli_socket = client_data.socket;
  68. let cur_rev_id = self.rev_id.load(SeqCst);
  69. if cur_rev_id > revision.rev_id {
  70. // The client document is outdated. Transform the client revision delta and then
  71. // send the prime delta to the client. Client should compose the this prime
  72. // delta.
  73. let (cli_prime, server_prime) = self.transform(&revision.delta_data).map_err(internal_error)?;
  74. let _ = self.update_document_delta(server_prime)?;
  75. log::debug!("{} client delta: {}", self.doc_id, cli_prime.to_json());
  76. let cli_revision = self.mk_revision(revision.rev_id, cli_prime);
  77. let ws_cli_revision = mk_push_rev_ws_message(&self.doc_id, cli_revision);
  78. cli_socket.do_send(ws_cli_revision).map_err(internal_error)?;
  79. Ok(())
  80. } else if cur_rev_id < revision.rev_id {
  81. if cur_rev_id != revision.base_rev_id {
  82. // The server document is outdated, try to get the missing revision from the
  83. // client.
  84. cli_socket
  85. .do_send(mk_pull_rev_ws_message(&self.doc_id, cur_rev_id, revision.rev_id))
  86. .map_err(internal_error)?;
  87. } else {
  88. let delta = Delta::from_bytes(&revision.delta_data).map_err(internal_error)?;
  89. let _ = self.update_document_delta(delta)?;
  90. cli_socket
  91. .do_send(mk_acked_ws_message(&revision))
  92. .map_err(internal_error)?;
  93. self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(revision.rev_id));
  94. let _ = self.save_revision(&revision).await?;
  95. }
  96. Ok(())
  97. } else {
  98. log::error!("Client rev_id should not equal to server rev_id");
  99. Ok(())
  100. }
  101. }
  102. fn mk_revision(&self, base_rev_id: i64, delta: Delta) -> Revision {
  103. let delta_data = delta.to_bytes().to_vec();
  104. let md5 = md5(&delta_data);
  105. let revision = Revision {
  106. base_rev_id,
  107. rev_id: self.rev_id.load(SeqCst),
  108. delta_data,
  109. md5,
  110. doc_id: self.doc_id.to_string(),
  111. ty: RevType::Remote,
  112. ..Default::default()
  113. };
  114. revision
  115. }
  116. #[tracing::instrument(level = "debug", skip(self, delta_data))]
  117. fn transform(&self, delta_data: &Vec<u8>) -> Result<(Delta, Delta), OTError> {
  118. log::debug!("Document: {}", self.document.read().to_json());
  119. let doc_delta = self.document.read().delta().clone();
  120. let cli_delta = Delta::from_bytes(delta_data)?;
  121. log::debug!("Compose delta: {}", cli_delta);
  122. let (cli_prime, server_prime) = doc_delta.transform(&cli_delta)?;
  123. Ok((cli_prime, server_prime))
  124. }
  125. #[tracing::instrument(level = "debug", skip(self, delta))]
  126. fn update_document_delta(&self, delta: Delta) -> Result<(), ServerError> {
  127. // Opti: push each revision into queue and process it one by one.
  128. match self.document.try_write_for(Duration::from_millis(300)) {
  129. None => {
  130. log::error!("Failed to acquire write lock of document");
  131. },
  132. Some(mut write_guard) => {
  133. let _ = write_guard.compose_delta(&delta).map_err(internal_error)?;
  134. log::debug!("Document: {}", write_guard.to_json());
  135. },
  136. }
  137. Ok(())
  138. }
  139. fn verify_md5(&self, revision: &Revision) -> Result<(), ServerError> {
  140. if md5(&revision.delta_data) != revision.md5 {
  141. return Err(ServerError::internal().context("Delta md5 not match"));
  142. }
  143. Ok(())
  144. }
  145. #[tracing::instrument(level = "debug", skip(self, revision))]
  146. async fn save_revision(&self, revision: &Revision) -> Result<(), ServerError> {
  147. // Opti: save with multiple revisions
  148. let mut params = UpdateDocParams::new();
  149. params.set_doc_id(self.doc_id.clone());
  150. params.set_data(self.document.read().to_json());
  151. params.set_rev_id(revision.rev_id);
  152. // let _ = update_doc(self.pg_pool.get_ref(), params).await?;
  153. Ok(())
  154. }
  155. }
  156. fn mk_push_rev_ws_message(doc_id: &str, revision: Revision) -> WsMessageAdaptor {
  157. let bytes = revision.write_to_bytes().unwrap();
  158. let data = WsDocumentData {
  159. id: doc_id.to_string(),
  160. ty: WsDataType::PushRev,
  161. data: bytes,
  162. };
  163. mk_ws_message(data)
  164. }
  165. fn mk_pull_rev_ws_message(doc_id: &str, from_rev_id: i64, to_rev_id: i64) -> WsMessageAdaptor {
  166. let range = RevisionRange {
  167. doc_id: doc_id.to_string(),
  168. from_rev_id,
  169. to_rev_id,
  170. ..Default::default()
  171. };
  172. let bytes = range.write_to_bytes().unwrap();
  173. let data = WsDocumentData {
  174. id: doc_id.to_string(),
  175. ty: WsDataType::PullRev,
  176. data: bytes,
  177. };
  178. mk_ws_message(data)
  179. }
  180. fn mk_acked_ws_message(revision: &Revision) -> WsMessageAdaptor {
  181. let mut wtr = vec![];
  182. let _ = wtr.write_i64::<BigEndian>(revision.rev_id);
  183. let data = WsDocumentData {
  184. id: revision.doc_id.clone(),
  185. ty: WsDataType::Acked,
  186. data: wtr,
  187. };
  188. mk_ws_message(data)
  189. }
  190. fn mk_ws_message<T: Into<WsMessage>>(data: T) -> WsMessageAdaptor {
  191. let msg: WsMessage = data.into();
  192. let bytes: Bytes = msg.try_into().unwrap();
  193. WsMessageAdaptor(bytes)
  194. }