editor.rs 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. use crate::{
  2. services::{
  3. doc::{edit::edit_actor::EditUser, update_doc},
  4. util::md5,
  5. },
  6. web_socket::{entities::Socket, WsMessageAdaptor},
  7. };
  8. use actix_web::web::Data;
  9. use backend_service::errors::{internal_error, ServerError};
  10. use dashmap::DashMap;
  11. use flowy_document_infra::{
  12. core::Document,
  13. entities::ws::{WsDataType, WsDocumentData},
  14. protobuf::{Doc, RevId, RevType, Revision, RevisionRange, UpdateDocParams},
  15. };
  16. use lib_ot::{core::OperationTransformable, rich_text::RichTextDelta};
  17. use parking_lot::RwLock;
  18. use protobuf::Message;
  19. use sqlx::PgPool;
  20. use std::{
  21. cmp::Ordering,
  22. sync::{
  23. atomic::{AtomicI64, Ordering::SeqCst},
  24. Arc,
  25. },
  26. time::Duration,
  27. };
  28. pub struct ServerDocEditor {
  29. pub doc_id: String,
  30. pub rev_id: AtomicI64,
  31. document: Arc<RwLock<Document>>,
  32. users: DashMap<String, EditUser>,
  33. }
  34. impl ServerDocEditor {
  35. pub fn new(doc: Doc) -> Result<Self, ServerError> {
  36. let delta = RichTextDelta::from_bytes(&doc.data).map_err(internal_error)?;
  37. let document = Arc::new(RwLock::new(Document::from_delta(delta)));
  38. let users = DashMap::new();
  39. Ok(Self {
  40. doc_id: doc.id.clone(),
  41. rev_id: AtomicI64::new(doc.rev_id),
  42. document,
  43. users,
  44. })
  45. }
  46. #[tracing::instrument(
  47. level = "debug",
  48. skip(self, user),
  49. fields(
  50. user_id = %user.id(),
  51. rev_id = %rev_id,
  52. )
  53. )]
  54. pub async fn new_doc_user(&self, user: EditUser, rev_id: i64) -> Result<(), ServerError> {
  55. self.users.insert(user.id(), user.clone());
  56. let cur_rev_id = self.rev_id.load(SeqCst);
  57. match cur_rev_id.cmp(&rev_id) {
  58. Ordering::Less => {
  59. user.socket
  60. .do_send(mk_pull_message(&self.doc_id, next(cur_rev_id), rev_id))
  61. .map_err(internal_error)?;
  62. },
  63. Ordering::Equal => {},
  64. Ordering::Greater => {
  65. let doc_delta = self.document.read().delta().clone();
  66. let cli_revision = self.mk_revision(rev_id, doc_delta);
  67. let ws_cli_revision = mk_push_message(&self.doc_id, cli_revision);
  68. user.socket.do_send(ws_cli_revision).map_err(internal_error)?;
  69. },
  70. }
  71. Ok(())
  72. }
  73. #[tracing::instrument(
  74. level = "debug",
  75. skip(self, user, pg_pool, revision),
  76. fields(
  77. cur_rev_id = %self.rev_id.load(SeqCst),
  78. base_rev_id = %revision.base_rev_id,
  79. rev_id = %revision.rev_id,
  80. ),
  81. err
  82. )]
  83. pub async fn apply_revision(
  84. &self,
  85. user: EditUser,
  86. revision: Revision,
  87. pg_pool: Data<PgPool>,
  88. ) -> Result<(), ServerError> {
  89. self.users.insert(user.id(), user.clone());
  90. let cur_rev_id = self.rev_id.load(SeqCst);
  91. match cur_rev_id.cmp(&revision.rev_id) {
  92. Ordering::Less => {
  93. let next_rev_id = next(cur_rev_id);
  94. if cur_rev_id == revision.base_rev_id || next_rev_id == revision.base_rev_id {
  95. // The rev is in the right order, just compose it.
  96. let _ = self.compose_revision(&revision, pg_pool).await?;
  97. let _ = send_acked_msg(&user.socket, &revision)?;
  98. } else {
  99. // The server document is outdated, pull the missing revision from the client.
  100. let _ = send_pull_message(&user.socket, &self.doc_id, next_rev_id, revision.rev_id)?;
  101. }
  102. },
  103. Ordering::Equal => {
  104. // Do nothing
  105. log::warn!("Applied revision rev_id is the same as cur_rev_id");
  106. },
  107. Ordering::Greater => {
  108. // The client document is outdated. Transform the client revision delta and then
  109. // send the prime delta to the client. Client should compose the this prime
  110. // delta.
  111. let cli_revision = self.transform_revision(&revision)?;
  112. let _ = send_push_message(&user.socket, &self.doc_id, cli_revision)?;
  113. },
  114. }
  115. Ok(())
  116. }
  117. pub fn document_json(&self) -> String { self.document.read().to_json() }
  118. async fn compose_revision(&self, revision: &Revision, pg_pool: Data<PgPool>) -> Result<(), ServerError> {
  119. let delta = RichTextDelta::from_bytes(&revision.delta_data).map_err(internal_error)?;
  120. let _ = self.compose_delta(delta)?;
  121. let _ = self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(revision.rev_id));
  122. let _ = self.save_revision(&revision, pg_pool).await?;
  123. Ok(())
  124. }
  125. #[tracing::instrument(level = "debug", skip(self, revision))]
  126. fn transform_revision(&self, revision: &Revision) -> Result<Revision, ServerError> {
  127. let cli_delta = RichTextDelta::from_bytes(&revision.delta_data).map_err(internal_error)?;
  128. let (cli_prime, server_prime) = self
  129. .document
  130. .read()
  131. .delta()
  132. .transform(&cli_delta)
  133. .map_err(internal_error)?;
  134. let _ = self.compose_delta(server_prime)?;
  135. let cli_revision = self.mk_revision(revision.rev_id, cli_prime);
  136. Ok(cli_revision)
  137. }
  138. fn mk_revision(&self, base_rev_id: i64, delta: RichTextDelta) -> Revision {
  139. let delta_data = delta.to_bytes().to_vec();
  140. let md5 = md5(&delta_data);
  141. Revision {
  142. base_rev_id,
  143. rev_id: self.rev_id.load(SeqCst),
  144. delta_data,
  145. md5,
  146. doc_id: self.doc_id.to_string(),
  147. ty: RevType::Remote,
  148. ..Default::default()
  149. }
  150. }
  151. #[tracing::instrument(
  152. level = "debug",
  153. skip(self, delta),
  154. fields(
  155. revision_delta = %delta.to_json(),
  156. result,
  157. )
  158. )]
  159. fn compose_delta(&self, delta: RichTextDelta) -> Result<(), ServerError> {
  160. if delta.is_empty() {
  161. log::warn!("Composed delta is empty");
  162. }
  163. match self.document.try_write_for(Duration::from_millis(300)) {
  164. None => {
  165. log::error!("Failed to acquire write lock of document");
  166. },
  167. Some(mut write_guard) => {
  168. let _ = write_guard.compose_delta(delta).map_err(internal_error)?;
  169. tracing::Span::current().record("result", &write_guard.to_json().as_str());
  170. },
  171. }
  172. Ok(())
  173. }
  174. #[tracing::instrument(level = "debug", skip(self, revision, pg_pool), err)]
  175. async fn save_revision(&self, revision: &Revision, pg_pool: Data<PgPool>) -> Result<(), ServerError> {
  176. // Opti: save with multiple revisions
  177. let mut params = UpdateDocParams::new();
  178. params.set_doc_id(self.doc_id.clone());
  179. params.set_data(self.document.read().to_json());
  180. params.set_rev_id(revision.rev_id);
  181. let _ = update_doc(pg_pool.get_ref(), params).await?;
  182. Ok(())
  183. }
  184. }
  185. #[tracing::instrument(level = "debug", skip(socket, doc_id, revision), err)]
  186. fn send_push_message(socket: &Socket, doc_id: &str, revision: Revision) -> Result<(), ServerError> {
  187. let msg = mk_push_message(doc_id, revision);
  188. socket.try_send(msg).map_err(internal_error)
  189. }
  190. fn mk_push_message(doc_id: &str, revision: Revision) -> WsMessageAdaptor {
  191. let bytes = revision.write_to_bytes().unwrap();
  192. let data = WsDocumentData {
  193. doc_id: doc_id.to_string(),
  194. ty: WsDataType::PushRev,
  195. data: bytes,
  196. };
  197. data.into()
  198. }
  199. #[tracing::instrument(level = "debug", skip(socket, doc_id), err)]
  200. fn send_pull_message(socket: &Socket, doc_id: &str, from_rev_id: i64, to_rev_id: i64) -> Result<(), ServerError> {
  201. let msg = mk_pull_message(doc_id, from_rev_id, to_rev_id);
  202. socket.try_send(msg).map_err(internal_error)
  203. }
  204. fn mk_pull_message(doc_id: &str, from_rev_id: i64, to_rev_id: i64) -> WsMessageAdaptor {
  205. let range = RevisionRange {
  206. doc_id: doc_id.to_string(),
  207. start: from_rev_id,
  208. end: to_rev_id,
  209. ..Default::default()
  210. };
  211. let bytes = range.write_to_bytes().unwrap();
  212. let data = WsDocumentData {
  213. doc_id: doc_id.to_string(),
  214. ty: WsDataType::PullRev,
  215. data: bytes,
  216. };
  217. data.into()
  218. }
  219. #[tracing::instrument(level = "debug", skip(socket, revision), err)]
  220. fn send_acked_msg(socket: &Socket, revision: &Revision) -> Result<(), ServerError> {
  221. let msg = mk_acked_message(revision);
  222. socket.try_send(msg).map_err(internal_error)
  223. }
  224. fn mk_acked_message(revision: &Revision) -> WsMessageAdaptor {
  225. // let mut wtr = vec![];
  226. // let _ = wtr.write_i64::<BigEndian>(revision.rev_id);
  227. let mut rev_id = RevId::new();
  228. rev_id.set_value(revision.rev_id);
  229. let data = rev_id.write_to_bytes().unwrap();
  230. let data = WsDocumentData {
  231. doc_id: revision.doc_id.clone(),
  232. ty: WsDataType::Acked,
  233. data,
  234. };
  235. data.into()
  236. }
  237. #[inline]
  238. fn next(rev_id: i64) -> i64 { rev_id + 1 }