lib.rs 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. pub mod errors;
  2. pub mod ext;
  3. pub mod util;
  4. use crate::errors::SyncError;
  5. use crate::util::{make_operations_from_revisions, next, pair_rev_id_from_revision_pbs};
  6. use lib_infra::future::BoxResultFuture;
  7. use lib_ot::core::{DeltaOperations, OperationAttributes};
  8. use parking_lot::RwLock;
  9. use revision_model::{Revision, RevisionRange};
  10. use serde::de::DeserializeOwned;
  11. use std::cmp::Ordering;
  12. use std::fmt::Debug;
  13. use std::sync::atomic::AtomicI64;
  14. use std::sync::atomic::Ordering::SeqCst;
  15. use std::sync::Arc;
  16. use std::time::Duration;
  17. use ws_model::ws_revision::{ServerRevisionWSData, ServerRevisionWSDataBuilder};
  18. pub type RevisionOperations<Attribute> = DeltaOperations<Attribute>;
  19. pub trait RevisionUser: Send + Sync + Debug {
  20. fn user_id(&self) -> String;
  21. fn receive(&self, resp: RevisionSyncResponse);
  22. }
  23. pub enum RevisionSyncResponse {
  24. Pull(ServerRevisionWSData),
  25. Push(ServerRevisionWSData),
  26. Ack(ServerRevisionWSData),
  27. }
  28. pub trait RevisionSyncObject<Attribute: OperationAttributes>: Send + Sync + 'static {
  29. fn object_id(&self) -> &str;
  30. fn object_json(&self) -> String;
  31. fn compose(&mut self, other: &RevisionOperations<Attribute>) -> Result<(), SyncError>;
  32. fn transform(
  33. &self,
  34. other: &RevisionOperations<Attribute>,
  35. ) -> Result<(RevisionOperations<Attribute>, RevisionOperations<Attribute>), SyncError>;
  36. fn set_operations(&mut self, operations: RevisionOperations<Attribute>);
  37. }
  38. pub trait RevisionSyncPersistence: Send + Sync + 'static {
  39. fn read_revisions(
  40. &self,
  41. object_id: &str,
  42. rev_ids: Option<Vec<i64>>,
  43. ) -> BoxResultFuture<Vec<Revision>, SyncError>;
  44. fn save_revisions(&self, revisions: Vec<Revision>) -> BoxResultFuture<(), SyncError>;
  45. fn reset_object(
  46. &self,
  47. object_id: &str,
  48. revisions: Vec<Revision>,
  49. ) -> BoxResultFuture<(), SyncError>;
  50. }
  51. impl<T> RevisionSyncPersistence for Arc<T>
  52. where
  53. T: RevisionSyncPersistence + Sized,
  54. {
  55. fn read_revisions(
  56. &self,
  57. object_id: &str,
  58. rev_ids: Option<Vec<i64>>,
  59. ) -> BoxResultFuture<Vec<Revision>, SyncError> {
  60. (**self).read_revisions(object_id, rev_ids)
  61. }
  62. fn save_revisions(&self, revisions: Vec<Revision>) -> BoxResultFuture<(), SyncError> {
  63. (**self).save_revisions(revisions)
  64. }
  65. fn reset_object(
  66. &self,
  67. object_id: &str,
  68. revisions: Vec<Revision>,
  69. ) -> BoxResultFuture<(), SyncError> {
  70. (**self).reset_object(object_id, revisions)
  71. }
  72. }
  73. pub struct RevisionSynchronizer<Attribute: OperationAttributes> {
  74. object_id: String,
  75. rev_id: AtomicI64,
  76. object: Arc<RwLock<dyn RevisionSyncObject<Attribute>>>,
  77. persistence: Arc<dyn RevisionSyncPersistence>,
  78. }
  79. impl<Attribute> RevisionSynchronizer<Attribute>
  80. where
  81. Attribute: OperationAttributes + DeserializeOwned + serde::Serialize + 'static,
  82. {
  83. pub fn new<S, P>(rev_id: i64, sync_object: S, persistence: P) -> RevisionSynchronizer<Attribute>
  84. where
  85. S: RevisionSyncObject<Attribute>,
  86. P: RevisionSyncPersistence,
  87. {
  88. let object = Arc::new(RwLock::new(sync_object));
  89. let persistence = Arc::new(persistence);
  90. let object_id = object.read().object_id().to_owned();
  91. RevisionSynchronizer {
  92. object_id,
  93. rev_id: AtomicI64::new(rev_id),
  94. object,
  95. persistence,
  96. }
  97. }
  98. #[tracing::instrument(level = "trace", skip(self, user, revisions), err)]
  99. pub async fn sync_revisions(
  100. &self,
  101. user: Arc<dyn RevisionUser>,
  102. revisions: Vec<Revision>,
  103. ) -> Result<(), SyncError> {
  104. let object_id = self.object_id.clone();
  105. if revisions.is_empty() {
  106. // Return all the revisions to client
  107. let revisions = self.persistence.read_revisions(&object_id, None).await?;
  108. let data = ServerRevisionWSDataBuilder::build_push_message(&object_id, revisions);
  109. user.receive(RevisionSyncResponse::Push(data));
  110. return Ok(());
  111. }
  112. let server_base_rev_id = self.rev_id.load(SeqCst);
  113. let first_revision = revisions.first().unwrap().clone();
  114. if self
  115. .is_applied_before(&first_revision, &self.persistence)
  116. .await
  117. {
  118. // Server has received this revision before, so ignore the following revisions
  119. return Ok(());
  120. }
  121. match server_base_rev_id.cmp(&first_revision.rev_id) {
  122. Ordering::Less => {
  123. let server_rev_id = next(server_base_rev_id);
  124. if server_base_rev_id == first_revision.base_rev_id
  125. || server_rev_id == first_revision.rev_id
  126. {
  127. // The rev is in the right order, just compose it.
  128. for revision in revisions.iter() {
  129. self.compose_revision(revision)?;
  130. }
  131. self.persistence.save_revisions(revisions).await?;
  132. } else {
  133. // The server ops is outdated, pull the missing revision from the client.
  134. let range = RevisionRange {
  135. start: server_rev_id,
  136. end: first_revision.rev_id,
  137. };
  138. let msg = ServerRevisionWSDataBuilder::build_pull_message(&self.object_id, range);
  139. user.receive(RevisionSyncResponse::Pull(msg));
  140. }
  141. },
  142. Ordering::Equal => {
  143. // Do nothing
  144. tracing::trace!(
  145. "Applied {} revision rev_id is the same as cur_rev_id",
  146. self.object_id
  147. );
  148. },
  149. Ordering::Greater => {
  150. // The client ops is outdated. Transform the client revision ops and then
  151. // send the prime ops to the client. Client should compose the this prime
  152. // ops.
  153. let from_rev_id = first_revision.rev_id;
  154. let to_rev_id = server_base_rev_id;
  155. self
  156. .push_revisions_to_user(user, from_rev_id, to_rev_id)
  157. .await;
  158. },
  159. }
  160. Ok(())
  161. }
  162. #[tracing::instrument(level = "trace", skip(self, user), fields(server_rev_id), err)]
  163. pub async fn pong(
  164. &self,
  165. user: Arc<dyn RevisionUser>,
  166. client_rev_id: i64,
  167. ) -> Result<(), SyncError> {
  168. let object_id = self.object_id.clone();
  169. let server_rev_id = self.rev_id();
  170. tracing::Span::current().record("server_rev_id", &server_rev_id);
  171. match server_rev_id.cmp(&client_rev_id) {
  172. Ordering::Less => {
  173. tracing::trace!(
  174. "Client should not send ping and the server should pull the revisions from the client"
  175. )
  176. },
  177. Ordering::Equal => tracing::trace!("{} is up to date.", object_id),
  178. Ordering::Greater => {
  179. // The client ops is outdated. Transform the client revision ops and then
  180. // send the prime ops to the client. Client should compose the this prime
  181. // ops.
  182. let from_rev_id = client_rev_id;
  183. let to_rev_id = server_rev_id;
  184. tracing::trace!("Push revisions to user");
  185. self
  186. .push_revisions_to_user(user, from_rev_id, to_rev_id)
  187. .await;
  188. },
  189. }
  190. Ok(())
  191. }
  192. #[tracing::instrument(level = "debug", skip(self, revisions), fields(object_id), err)]
  193. pub async fn reset(&self, revisions: Vec<Revision>) -> Result<(), SyncError> {
  194. let object_id = self.object_id.clone();
  195. tracing::Span::current().record("object_id", &object_id.as_str());
  196. let (_, rev_id) = pair_rev_id_from_revision_pbs(&revisions);
  197. let operations = make_operations_from_revisions(revisions.clone())?;
  198. self.persistence.reset_object(&object_id, revisions).await?;
  199. self.object.write().set_operations(operations);
  200. let _ = self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(rev_id));
  201. Ok(())
  202. }
  203. pub fn object_json(&self) -> String {
  204. self.object.read().object_json()
  205. }
  206. fn compose_revision(&self, revision: &Revision) -> Result<(), SyncError> {
  207. let operations = RevisionOperations::<Attribute>::from_bytes(&revision.bytes)?;
  208. self.compose_operations(operations)?;
  209. let _ = self
  210. .rev_id
  211. .fetch_update(SeqCst, SeqCst, |_e| Some(revision.rev_id));
  212. Ok(())
  213. }
  214. #[tracing::instrument(level = "debug", skip(self, revision))]
  215. fn transform_revision(
  216. &self,
  217. revision: &Revision,
  218. ) -> Result<(RevisionOperations<Attribute>, RevisionOperations<Attribute>), SyncError> {
  219. let client_operations = RevisionOperations::<Attribute>::from_bytes(&revision.bytes)?;
  220. let result = self.object.read().transform(&client_operations)?;
  221. Ok(result)
  222. }
  223. fn compose_operations(&self, operations: RevisionOperations<Attribute>) -> Result<(), SyncError> {
  224. if operations.is_empty() {
  225. tracing::warn!("Composed operations is empty");
  226. }
  227. match self.object.try_write_for(Duration::from_millis(300)) {
  228. None => tracing::error!("Failed to acquire write lock of object"),
  229. Some(mut write_guard) => {
  230. write_guard.compose(&operations)?;
  231. },
  232. }
  233. Ok(())
  234. }
  235. pub(crate) fn rev_id(&self) -> i64 {
  236. self.rev_id.load(SeqCst)
  237. }
  238. async fn is_applied_before(
  239. &self,
  240. new_revision: &Revision,
  241. persistence: &Arc<dyn RevisionSyncPersistence>,
  242. ) -> bool {
  243. let rev_ids = Some(vec![new_revision.rev_id]);
  244. if let Ok(revisions) = persistence.read_revisions(&self.object_id, rev_ids).await {
  245. if let Some(revision) = revisions.first() {
  246. if revision.md5 == new_revision.md5 {
  247. return true;
  248. }
  249. }
  250. };
  251. false
  252. }
  253. async fn push_revisions_to_user(&self, user: Arc<dyn RevisionUser>, from: i64, to: i64) {
  254. let rev_ids: Vec<i64> = (from..=to).collect();
  255. tracing::debug!("Push revision: {} -> {} to client", from, to);
  256. match self
  257. .persistence
  258. .read_revisions(&self.object_id, Some(rev_ids.clone()))
  259. .await
  260. {
  261. Ok(revisions) => {
  262. if !rev_ids.is_empty() && revisions.is_empty() {
  263. tracing::trace!(
  264. "{}: can not read the revisions in range {:?}",
  265. self.object_id,
  266. rev_ids
  267. );
  268. // assert_eq!(revisions.is_empty(), rev_ids.is_empty(),);
  269. }
  270. let data = ServerRevisionWSDataBuilder::build_push_message(&self.object_id, revisions);
  271. user.receive(RevisionSyncResponse::Push(data));
  272. },
  273. Err(e) => {
  274. tracing::error!("{:?}", e);
  275. },
  276. };
  277. }
  278. }