editor.rs 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. use crate::service::{
  2. doc::{edit::edit_actor::EditUser, update_doc},
  3. util::md5,
  4. ws::{entities::Socket, WsMessageAdaptor},
  5. };
  6. use actix_web::web::Data;
  7. use backend_service::errors::{internal_error, ServerError};
  8. use dashmap::DashMap;
  9. use flowy_document_infra::{
  10. core::Document,
  11. entities::ws::{WsDataType, WsDocumentData},
  12. protobuf::{Doc, RevId, RevType, Revision, RevisionRange, UpdateDocParams},
  13. };
  14. use lib_ot::core::{Delta, OperationTransformable};
  15. use parking_lot::RwLock;
  16. use protobuf::Message;
  17. use sqlx::PgPool;
  18. use std::{
  19. cmp::Ordering,
  20. sync::{
  21. atomic::{AtomicI64, Ordering::SeqCst},
  22. Arc,
  23. },
  24. time::Duration,
  25. };
  26. pub struct ServerDocEditor {
  27. pub doc_id: String,
  28. pub rev_id: AtomicI64,
  29. document: Arc<RwLock<Document>>,
  30. users: DashMap<String, EditUser>,
  31. }
  32. impl ServerDocEditor {
  33. pub fn new(doc: Doc) -> Result<Self, ServerError> {
  34. let delta = Delta::from_bytes(&doc.data).map_err(internal_error)?;
  35. let document = Arc::new(RwLock::new(Document::from_delta(delta)));
  36. let users = DashMap::new();
  37. Ok(Self {
  38. doc_id: doc.id.clone(),
  39. rev_id: AtomicI64::new(doc.rev_id),
  40. document,
  41. users,
  42. })
  43. }
  44. #[tracing::instrument(
  45. level = "debug",
  46. skip(self, user),
  47. fields(
  48. user_id = %user.id(),
  49. rev_id = %rev_id,
  50. )
  51. )]
  52. pub async fn new_doc_user(&self, user: EditUser, rev_id: i64) -> Result<(), ServerError> {
  53. self.users.insert(user.id(), user.clone());
  54. let cur_rev_id = self.rev_id.load(SeqCst);
  55. match cur_rev_id.cmp(&rev_id) {
  56. Ordering::Less => {
  57. user.socket
  58. .do_send(mk_pull_message(&self.doc_id, next(cur_rev_id), rev_id))
  59. .map_err(internal_error)?;
  60. },
  61. Ordering::Equal => {},
  62. Ordering::Greater => {
  63. let doc_delta = self.document.read().delta().clone();
  64. let cli_revision = self.mk_revision(rev_id, doc_delta);
  65. let ws_cli_revision = mk_push_message(&self.doc_id, cli_revision);
  66. user.socket.do_send(ws_cli_revision).map_err(internal_error)?;
  67. },
  68. }
  69. Ok(())
  70. }
  71. #[tracing::instrument(
  72. level = "debug",
  73. skip(self, user, pg_pool, revision),
  74. fields(
  75. cur_rev_id = %self.rev_id.load(SeqCst),
  76. base_rev_id = %revision.base_rev_id,
  77. rev_id = %revision.rev_id,
  78. ),
  79. err
  80. )]
  81. pub async fn apply_revision(
  82. &self,
  83. user: EditUser,
  84. revision: Revision,
  85. pg_pool: Data<PgPool>,
  86. ) -> Result<(), ServerError> {
  87. self.users.insert(user.id(), user.clone());
  88. let cur_rev_id = self.rev_id.load(SeqCst);
  89. match cur_rev_id.cmp(&revision.rev_id) {
  90. Ordering::Less => {
  91. let next_rev_id = next(cur_rev_id);
  92. if cur_rev_id == revision.base_rev_id || next_rev_id == revision.base_rev_id {
  93. // The rev is in the right order, just compose it.
  94. let _ = self.compose_revision(&revision, pg_pool).await?;
  95. let _ = send_acked_msg(&user.socket, &revision)?;
  96. } else {
  97. // The server document is outdated, pull the missing revision from the client.
  98. let _ = send_pull_message(&user.socket, &self.doc_id, next_rev_id, revision.rev_id)?;
  99. }
  100. },
  101. Ordering::Equal => {
  102. // Do nothing
  103. log::warn!("Applied revision rev_id is the same as cur_rev_id");
  104. },
  105. Ordering::Greater => {
  106. // The client document is outdated. Transform the client revision delta and then
  107. // send the prime delta to the client. Client should compose the this prime
  108. // delta.
  109. let cli_revision = self.transform_revision(&revision)?;
  110. let _ = send_push_message(&user.socket, &self.doc_id, cli_revision)?;
  111. },
  112. }
  113. Ok(())
  114. }
  115. pub fn document_json(&self) -> String { self.document.read().to_json() }
  116. async fn compose_revision(&self, revision: &Revision, pg_pool: Data<PgPool>) -> Result<(), ServerError> {
  117. let delta = Delta::from_bytes(&revision.delta_data).map_err(internal_error)?;
  118. let _ = self.compose_delta(delta)?;
  119. let _ = self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(revision.rev_id));
  120. let _ = self.save_revision(&revision, pg_pool).await?;
  121. Ok(())
  122. }
  123. #[tracing::instrument(level = "debug", skip(self, revision))]
  124. fn transform_revision(&self, revision: &Revision) -> Result<Revision, ServerError> {
  125. let cli_delta = Delta::from_bytes(&revision.delta_data).map_err(internal_error)?;
  126. let (cli_prime, server_prime) = self
  127. .document
  128. .read()
  129. .delta()
  130. .transform(&cli_delta)
  131. .map_err(internal_error)?;
  132. let _ = self.compose_delta(server_prime)?;
  133. let cli_revision = self.mk_revision(revision.rev_id, cli_prime);
  134. Ok(cli_revision)
  135. }
  136. fn mk_revision(&self, base_rev_id: i64, delta: Delta) -> Revision {
  137. let delta_data = delta.to_bytes().to_vec();
  138. let md5 = md5(&delta_data);
  139. Revision {
  140. base_rev_id,
  141. rev_id: self.rev_id.load(SeqCst),
  142. delta_data,
  143. md5,
  144. doc_id: self.doc_id.to_string(),
  145. ty: RevType::Remote,
  146. ..Default::default()
  147. }
  148. }
  149. #[tracing::instrument(
  150. level = "debug",
  151. skip(self, delta),
  152. fields(
  153. revision_delta = %delta.to_json(),
  154. result,
  155. )
  156. )]
  157. fn compose_delta(&self, delta: Delta) -> Result<(), ServerError> {
  158. if delta.is_empty() {
  159. log::warn!("Composed delta is empty");
  160. }
  161. match self.document.try_write_for(Duration::from_millis(300)) {
  162. None => {
  163. log::error!("Failed to acquire write lock of document");
  164. },
  165. Some(mut write_guard) => {
  166. let _ = write_guard.compose_delta(delta).map_err(internal_error)?;
  167. tracing::Span::current().record("result", &write_guard.to_json().as_str());
  168. },
  169. }
  170. Ok(())
  171. }
  172. #[tracing::instrument(level = "debug", skip(self, revision, pg_pool), err)]
  173. async fn save_revision(&self, revision: &Revision, pg_pool: Data<PgPool>) -> Result<(), ServerError> {
  174. // Opti: save with multiple revisions
  175. let mut params = UpdateDocParams::new();
  176. params.set_doc_id(self.doc_id.clone());
  177. params.set_data(self.document.read().to_json());
  178. params.set_rev_id(revision.rev_id);
  179. let _ = update_doc(pg_pool.get_ref(), params).await?;
  180. Ok(())
  181. }
  182. }
  183. #[tracing::instrument(level = "debug", skip(socket, doc_id, revision), err)]
  184. fn send_push_message(socket: &Socket, doc_id: &str, revision: Revision) -> Result<(), ServerError> {
  185. let msg = mk_push_message(doc_id, revision);
  186. socket.try_send(msg).map_err(internal_error)
  187. }
  188. fn mk_push_message(doc_id: &str, revision: Revision) -> WsMessageAdaptor {
  189. let bytes = revision.write_to_bytes().unwrap();
  190. let data = WsDocumentData {
  191. doc_id: doc_id.to_string(),
  192. ty: WsDataType::PushRev,
  193. data: bytes,
  194. };
  195. data.into()
  196. }
  197. #[tracing::instrument(level = "debug", skip(socket, doc_id), err)]
  198. fn send_pull_message(socket: &Socket, doc_id: &str, from_rev_id: i64, to_rev_id: i64) -> Result<(), ServerError> {
  199. let msg = mk_pull_message(doc_id, from_rev_id, to_rev_id);
  200. socket.try_send(msg).map_err(internal_error)
  201. }
  202. fn mk_pull_message(doc_id: &str, from_rev_id: i64, to_rev_id: i64) -> WsMessageAdaptor {
  203. let range = RevisionRange {
  204. doc_id: doc_id.to_string(),
  205. start: from_rev_id,
  206. end: to_rev_id,
  207. ..Default::default()
  208. };
  209. let bytes = range.write_to_bytes().unwrap();
  210. let data = WsDocumentData {
  211. doc_id: doc_id.to_string(),
  212. ty: WsDataType::PullRev,
  213. data: bytes,
  214. };
  215. data.into()
  216. }
  217. #[tracing::instrument(level = "debug", skip(socket, revision), err)]
  218. fn send_acked_msg(socket: &Socket, revision: &Revision) -> Result<(), ServerError> {
  219. let msg = mk_acked_message(revision);
  220. socket.try_send(msg).map_err(internal_error)
  221. }
  222. fn mk_acked_message(revision: &Revision) -> WsMessageAdaptor {
  223. // let mut wtr = vec![];
  224. // let _ = wtr.write_i64::<BigEndian>(revision.rev_id);
  225. let mut rev_id = RevId::new();
  226. rev_id.set_value(revision.rev_id);
  227. let data = rev_id.write_to_bytes().unwrap();
  228. let data = WsDocumentData {
  229. doc_id: revision.doc_id.clone(),
  230. ty: WsDataType::Acked,
  231. data,
  232. };
  233. data.into()
  234. }
  235. #[inline]
  236. fn next(rev_id: i64) -> i64 { rev_id + 1 }