edit_doc.rs 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. use crate::service::{
  2. doc::{edit::edit_actor::EditUser, update_doc},
  3. util::md5,
  4. ws::WsMessageAdaptor,
  5. };
  6. use actix_web::web::Data;
  7. use bytes::Bytes;
  8. use dashmap::DashMap;
  9. use flowy_document::{
  10. entities::ws::{WsDataType, WsDocumentData},
  11. protobuf::{Doc, RevId, RevType, Revision, RevisionRange, UpdateDocParams},
  12. services::doc::Document,
  13. };
  14. use flowy_net::errors::{internal_error, ServerError};
  15. use flowy_ot::core::{Delta, OperationTransformable};
  16. use flowy_ws::WsMessage;
  17. use parking_lot::RwLock;
  18. use protobuf::Message;
  19. use sqlx::PgPool;
  20. use std::{
  21. cmp::Ordering,
  22. convert::TryInto,
  23. sync::{
  24. atomic::{AtomicI64, Ordering::SeqCst},
  25. Arc,
  26. },
  27. time::Duration,
  28. };
  29. pub struct ServerEditDoc {
  30. pub doc_id: String,
  31. pub rev_id: AtomicI64,
  32. document: Arc<RwLock<Document>>,
  33. users: DashMap<String, EditUser>,
  34. }
  35. impl ServerEditDoc {
  36. pub fn new(doc: Doc) -> Result<Self, ServerError> {
  37. let delta = Delta::from_bytes(&doc.data).map_err(internal_error)?;
  38. let document = Arc::new(RwLock::new(Document::from_delta(delta)));
  39. let users = DashMap::new();
  40. Ok(Self {
  41. doc_id: doc.id.clone(),
  42. rev_id: AtomicI64::new(doc.rev_id),
  43. document,
  44. users,
  45. })
  46. }
  47. pub fn document_json(&self) -> String { self.document.read().to_json() }
  48. #[tracing::instrument(
  49. level = "debug",
  50. skip(self, user),
  51. fields(
  52. user_id = %user.id(),
  53. rev_id = %rev_id,
  54. )
  55. )]
  56. pub async fn new_doc_user(&self, user: EditUser, rev_id: i64) -> Result<(), ServerError> {
  57. self.users.insert(user.id(), user.clone());
  58. let cur_rev_id = self.rev_id.load(SeqCst);
  59. match cur_rev_id.cmp(&rev_id) {
  60. Ordering::Less => {
  61. user.socket
  62. .do_send(mk_pull_rev_ws_message(&self.doc_id, cur_rev_id, rev_id))
  63. .map_err(internal_error)?;
  64. },
  65. Ordering::Equal => {},
  66. Ordering::Greater => {
  67. let doc_delta = self.document.read().delta().clone();
  68. let cli_revision = self.mk_revision(rev_id, doc_delta);
  69. let ws_cli_revision = mk_push_rev_ws_message(&self.doc_id, cli_revision);
  70. user.socket.do_send(ws_cli_revision).map_err(internal_error)?;
  71. },
  72. }
  73. Ok(())
  74. }
  75. #[tracing::instrument(
  76. level = "debug",
  77. skip(self, user, pg_pool),
  78. fields(
  79. rev_id = %self.rev_id.load(SeqCst),
  80. revision_rev_id = %revision.rev_id,
  81. revision_base_rev_id = %revision.base_rev_id
  82. )
  83. )]
  84. pub async fn apply_revision(
  85. &self,
  86. user: EditUser,
  87. revision: Revision,
  88. pg_pool: Data<PgPool>,
  89. ) -> Result<(), ServerError> {
  90. // Opti: find out another way to keep the user socket available.
  91. self.users.insert(user.id(), user.clone());
  92. let cur_rev_id = self.rev_id.load(SeqCst);
  93. match cur_rev_id.cmp(&revision.rev_id) {
  94. Ordering::Less => {
  95. if cur_rev_id != revision.base_rev_id {
  96. // The server document is outdated, try to get the missing revision from the
  97. // client.
  98. user.socket
  99. .do_send(mk_pull_rev_ws_message(&self.doc_id, cur_rev_id, revision.rev_id))
  100. .map_err(internal_error)?;
  101. } else {
  102. let _ = self.compose_revision(&revision, pg_pool).await?;
  103. user.socket
  104. .do_send(mk_acked_ws_message(&revision))
  105. .map_err(internal_error)?;
  106. }
  107. },
  108. Ordering::Equal => {},
  109. Ordering::Greater => {
  110. // The client document is outdated. Transform the client revision delta and then
  111. // send the prime delta to the client. Client should compose the this prime
  112. // delta.
  113. let cli_revision = self.transform_revision(&revision)?;
  114. let ws_cli_revision = mk_push_rev_ws_message(&self.doc_id, cli_revision);
  115. user.socket.do_send(ws_cli_revision).map_err(internal_error)?;
  116. },
  117. }
  118. Ok(())
  119. }
  120. async fn compose_revision(&self, revision: &Revision, pg_pool: Data<PgPool>) -> Result<(), ServerError> {
  121. let delta = Delta::from_bytes(&revision.delta_data).map_err(internal_error)?;
  122. let _ = self.compose_delta(delta)?;
  123. let _ = self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(revision.rev_id));
  124. let _ = self.save_revision(&revision, pg_pool).await?;
  125. Ok(())
  126. }
  127. #[tracing::instrument(level = "debug", skip(self, revision))]
  128. fn transform_revision(&self, revision: &Revision) -> Result<Revision, ServerError> {
  129. let cli_delta = Delta::from_bytes(&revision.delta_data).map_err(internal_error)?;
  130. let (cli_prime, server_prime) = self
  131. .document
  132. .read()
  133. .delta()
  134. .transform(&cli_delta)
  135. .map_err(internal_error)?;
  136. let _ = self.compose_delta(server_prime)?;
  137. let cli_revision = self.mk_revision(revision.rev_id, cli_prime);
  138. Ok(cli_revision)
  139. }
  140. fn mk_revision(&self, base_rev_id: i64, delta: Delta) -> Revision {
  141. let delta_data = delta.to_bytes().to_vec();
  142. let md5 = md5(&delta_data);
  143. let revision = Revision {
  144. base_rev_id,
  145. rev_id: self.rev_id.load(SeqCst),
  146. delta_data,
  147. md5,
  148. doc_id: self.doc_id.to_string(),
  149. ty: RevType::Remote,
  150. ..Default::default()
  151. };
  152. revision
  153. }
  154. #[tracing::instrument(
  155. level = "debug",
  156. skip(self, delta),
  157. fields(
  158. delta = %delta.to_json(),
  159. result,
  160. )
  161. )]
  162. fn compose_delta(&self, delta: Delta) -> Result<(), ServerError> {
  163. // Opti: push each revision into queue and process it one by one.
  164. match self.document.try_write_for(Duration::from_millis(300)) {
  165. None => {
  166. log::error!("Failed to acquire write lock of document");
  167. },
  168. Some(mut write_guard) => {
  169. let _ = write_guard.compose_delta(&delta).map_err(internal_error)?;
  170. tracing::Span::current().record("result", &write_guard.to_json().as_str());
  171. },
  172. }
  173. Ok(())
  174. }
  175. #[tracing::instrument(level = "debug", skip(self, pg_pool), err)]
  176. async fn save_revision(&self, revision: &Revision, pg_pool: Data<PgPool>) -> Result<(), ServerError> {
  177. // Opti: save with multiple revisions
  178. let mut params = UpdateDocParams::new();
  179. params.set_doc_id(self.doc_id.clone());
  180. params.set_data(self.document.read().to_json());
  181. params.set_rev_id(revision.rev_id);
  182. let _ = update_doc(pg_pool.get_ref(), params).await?;
  183. Ok(())
  184. }
  185. }
  186. fn mk_push_rev_ws_message(doc_id: &str, revision: Revision) -> WsMessageAdaptor {
  187. let bytes = revision.write_to_bytes().unwrap();
  188. let data = WsDocumentData {
  189. doc_id: doc_id.to_string(),
  190. ty: WsDataType::PushRev,
  191. data: bytes,
  192. };
  193. mk_ws_message(data)
  194. }
  195. fn mk_pull_rev_ws_message(doc_id: &str, from_rev_id: i64, to_rev_id: i64) -> WsMessageAdaptor {
  196. let range = RevisionRange {
  197. doc_id: doc_id.to_string(),
  198. from_rev_id,
  199. to_rev_id,
  200. ..Default::default()
  201. };
  202. let bytes = range.write_to_bytes().unwrap();
  203. let data = WsDocumentData {
  204. doc_id: doc_id.to_string(),
  205. ty: WsDataType::PullRev,
  206. data: bytes,
  207. };
  208. mk_ws_message(data)
  209. }
  210. fn mk_acked_ws_message(revision: &Revision) -> WsMessageAdaptor {
  211. // let mut wtr = vec![];
  212. // let _ = wtr.write_i64::<BigEndian>(revision.rev_id);
  213. let mut rev_id = RevId::new();
  214. rev_id.set_value(revision.rev_id);
  215. let data = rev_id.write_to_bytes().unwrap();
  216. let data = WsDocumentData {
  217. doc_id: revision.doc_id.clone(),
  218. ty: WsDataType::Acked,
  219. data,
  220. };
  221. mk_ws_message(data)
  222. }
  223. fn mk_ws_message<T: Into<WsMessage>>(data: T) -> WsMessageAdaptor {
  224. let msg: WsMessage = data.into();
  225. let bytes: Bytes = msg.try_into().unwrap();
  226. WsMessageAdaptor(bytes)
  227. }