edit_doc.rs 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. use crate::service::{
  2. doc::{edit::edit_actor::EditUser, update_doc},
  3. util::md5,
  4. ws::WsMessageAdaptor,
  5. };
  6. use actix_web::web::Data;
  7. use bytes::Bytes;
  8. use dashmap::DashMap;
  9. use flowy_document::{
  10. entities::ws::{WsDataType, WsDocumentData},
  11. protobuf::{Doc, RevId, RevType, Revision, RevisionRange, UpdateDocParams},
  12. services::doc::Document,
  13. };
  14. use flowy_net::errors::{internal_error, ServerError};
  15. use flowy_ot::{
  16. core::{Delta, OperationTransformable},
  17. errors::OTError,
  18. };
  19. use flowy_ws::WsMessage;
  20. use parking_lot::RwLock;
  21. use protobuf::Message;
  22. use sqlx::PgPool;
  23. use std::{
  24. cmp::Ordering,
  25. convert::TryInto,
  26. sync::{
  27. atomic::{AtomicI64, Ordering::SeqCst},
  28. Arc,
  29. },
  30. time::Duration,
  31. };
  32. pub struct ServerEditDoc {
  33. doc_id: String,
  34. rev_id: AtomicI64,
  35. document: Arc<RwLock<Document>>,
  36. users: DashMap<String, EditUser>,
  37. }
  38. impl ServerEditDoc {
  39. pub fn new(doc: Doc) -> Result<Self, ServerError> {
  40. let delta = Delta::from_bytes(&doc.data).map_err(internal_error)?;
  41. let document = Arc::new(RwLock::new(Document::from_delta(delta)));
  42. let users = DashMap::new();
  43. Ok(Self {
  44. doc_id: doc.id.clone(),
  45. rev_id: AtomicI64::new(doc.rev_id),
  46. document,
  47. users,
  48. })
  49. }
  50. pub fn document_json(&self) -> String { self.document.read().to_json() }
  51. #[tracing::instrument(
  52. level = "debug",
  53. skip(self, user),
  54. fields(
  55. user_id = %user.id(),
  56. rev_id = %rev_id,
  57. )
  58. )]
  59. pub async fn new_doc_user(&self, user: EditUser, rev_id: i64) -> Result<(), ServerError> {
  60. self.users.insert(user.id(), user.clone());
  61. let cur_rev_id = self.rev_id.load(SeqCst);
  62. if cur_rev_id > rev_id {
  63. let doc_delta = self.document.read().delta().clone();
  64. let cli_revision = self.mk_revision(rev_id, doc_delta);
  65. let ws_cli_revision = mk_push_rev_ws_message(&self.doc_id, cli_revision);
  66. user.socket.do_send(ws_cli_revision).map_err(internal_error)?;
  67. }
  68. Ok(())
  69. }
  70. #[tracing::instrument(
  71. level = "debug",
  72. skip(self, user, pg_pool),
  73. fields(
  74. rev_id = %self.rev_id.load(SeqCst),
  75. revision_rev_id = %revision.rev_id,
  76. revision_base_rev_id = %revision.base_rev_id
  77. )
  78. )]
  79. pub async fn apply_revision(
  80. &self,
  81. user: EditUser,
  82. revision: Revision,
  83. pg_pool: Data<PgPool>,
  84. ) -> Result<(), ServerError> {
  85. // Opti: find out another way to keep the user socket available.
  86. self.users.insert(user.id(), user.clone());
  87. let cur_rev_id = self.rev_id.load(SeqCst);
  88. match cur_rev_id.cmp(&revision.rev_id) {
  89. Ordering::Less => {
  90. if cur_rev_id != revision.base_rev_id {
  91. // The server document is outdated, try to get the missing revision from the
  92. // client.
  93. user.socket
  94. .do_send(mk_pull_rev_ws_message(&self.doc_id, cur_rev_id, revision.rev_id))
  95. .map_err(internal_error)?;
  96. } else {
  97. let _ = self.compose_revision(&revision, pg_pool).await?;
  98. user.socket
  99. .do_send(mk_acked_ws_message(&revision))
  100. .map_err(internal_error)?;
  101. }
  102. },
  103. Ordering::Equal => {},
  104. Ordering::Greater => {
  105. // The client document is outdated. Transform the client revision delta and then
  106. // send the prime delta to the client. Client should compose the this prime
  107. // delta.
  108. let cli_revision = self.transform_client_revision(&revision)?;
  109. let ws_cli_revision = mk_push_rev_ws_message(&self.doc_id, cli_revision);
  110. user.socket.do_send(ws_cli_revision).map_err(internal_error)?;
  111. },
  112. }
  113. Ok(())
  114. }
  115. async fn compose_revision(&self, revision: &Revision, pg_pool: Data<PgPool>) -> Result<(), ServerError> {
  116. let delta = Delta::from_bytes(&revision.delta_data).map_err(internal_error)?;
  117. let _ = self.compose_delta(delta)?;
  118. let _ = self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(revision.rev_id));
  119. let _ = self.save_revision(&revision, pg_pool).await?;
  120. Ok(())
  121. }
  122. fn transform_client_revision(&self, revision: &Revision) -> Result<Revision, ServerError> {
  123. let (cli_prime, server_prime) = self.transform(&revision.delta_data).map_err(internal_error)?;
  124. let _ = self.compose_delta(server_prime)?;
  125. let cli_revision = self.mk_revision(revision.rev_id, cli_prime);
  126. Ok(cli_revision)
  127. }
  128. fn mk_revision(&self, base_rev_id: i64, delta: Delta) -> Revision {
  129. let delta_data = delta.to_bytes().to_vec();
  130. let md5 = md5(&delta_data);
  131. let revision = Revision {
  132. base_rev_id,
  133. rev_id: self.rev_id.load(SeqCst),
  134. delta_data,
  135. md5,
  136. doc_id: self.doc_id.to_string(),
  137. ty: RevType::Remote,
  138. ..Default::default()
  139. };
  140. revision
  141. }
  142. #[tracing::instrument(level = "debug", skip(self, delta_data))]
  143. fn transform(&self, delta_data: &Vec<u8>) -> Result<(Delta, Delta), OTError> {
  144. log::debug!("Document: {}", self.document.read().to_json());
  145. let doc_delta = self.document.read().delta().clone();
  146. let cli_delta = Delta::from_bytes(delta_data)?;
  147. log::debug!("Compose delta: {}", cli_delta);
  148. let (cli_prime, server_prime) = doc_delta.transform(&cli_delta)?;
  149. Ok((cli_prime, server_prime))
  150. }
  151. #[tracing::instrument(level = "debug", skip(self), err)]
  152. fn compose_delta(&self, delta: Delta) -> Result<(), ServerError> {
  153. // Opti: push each revision into queue and process it one by one.
  154. match self.document.try_write_for(Duration::from_millis(300)) {
  155. None => {
  156. log::error!("Failed to acquire write lock of document");
  157. },
  158. Some(mut write_guard) => {
  159. let _ = write_guard.compose_delta(&delta).map_err(internal_error)?;
  160. log::debug!("Document: {}", write_guard.to_json());
  161. },
  162. }
  163. Ok(())
  164. }
  165. #[tracing::instrument(level = "debug", skip(self, pg_pool), err)]
  166. async fn save_revision(&self, revision: &Revision, pg_pool: Data<PgPool>) -> Result<(), ServerError> {
  167. // Opti: save with multiple revisions
  168. let mut params = UpdateDocParams::new();
  169. params.set_doc_id(self.doc_id.clone());
  170. params.set_data(self.document.read().to_json());
  171. params.set_rev_id(revision.rev_id);
  172. let _ = update_doc(pg_pool.get_ref(), params).await?;
  173. Ok(())
  174. }
  175. }
  176. fn mk_push_rev_ws_message(doc_id: &str, revision: Revision) -> WsMessageAdaptor {
  177. let bytes = revision.write_to_bytes().unwrap();
  178. let data = WsDocumentData {
  179. doc_id: doc_id.to_string(),
  180. ty: WsDataType::PushRev,
  181. data: bytes,
  182. };
  183. mk_ws_message(data)
  184. }
  185. fn mk_pull_rev_ws_message(doc_id: &str, from_rev_id: i64, to_rev_id: i64) -> WsMessageAdaptor {
  186. let range = RevisionRange {
  187. doc_id: doc_id.to_string(),
  188. from_rev_id,
  189. to_rev_id,
  190. ..Default::default()
  191. };
  192. let bytes = range.write_to_bytes().unwrap();
  193. let data = WsDocumentData {
  194. doc_id: doc_id.to_string(),
  195. ty: WsDataType::PullRev,
  196. data: bytes,
  197. };
  198. mk_ws_message(data)
  199. }
  200. fn mk_acked_ws_message(revision: &Revision) -> WsMessageAdaptor {
  201. // let mut wtr = vec![];
  202. // let _ = wtr.write_i64::<BigEndian>(revision.rev_id);
  203. let mut rev_id = RevId::new();
  204. rev_id.set_inner(revision.rev_id);
  205. let data = rev_id.write_to_bytes().unwrap();
  206. let data = WsDocumentData {
  207. doc_id: revision.doc_id.clone(),
  208. ty: WsDataType::Acked,
  209. data,
  210. };
  211. mk_ws_message(data)
  212. }
  213. fn mk_ws_message<T: Into<WsMessage>>(data: T) -> WsMessageAdaptor {
  214. let msg: WsMessage = data.into();
  215. let bytes: Bytes = msg.try_into().unwrap();
  216. WsMessageAdaptor(bytes)
  217. }