sync.rs 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. use crate::{
  2. errors::{internal_error, DocResult},
  3. services::{
  4. doc::{
  5. edit::ClientDocEditor,
  6. revision::{RevisionIterator, RevisionManager},
  7. },
  8. ws::DocumentWebSocket,
  9. },
  10. };
  11. use async_stream::stream;
  12. use bytes::Bytes;
  13. use flowy_document_infra::entities::ws::{WsDataType, WsDocumentData};
  14. use futures::stream::StreamExt;
  15. use lib_ot::revision::{RevId, RevisionRange};
  16. use std::{convert::TryFrom, sync::Arc};
  17. use tokio::{
  18. sync::{broadcast, mpsc},
  19. task::spawn_blocking,
  20. time::{interval, Duration},
  21. };
  22. pub(crate) struct RevisionDownStream {
  23. editor: Arc<ClientDocEditor>,
  24. rev_manager: Arc<RevisionManager>,
  25. ws_msg_rx: Option<mpsc::UnboundedReceiver<WsDocumentData>>,
  26. ws_sender: Arc<dyn DocumentWebSocket>,
  27. stop_rx: Option<SteamStopRx>,
  28. }
  29. impl RevisionDownStream {
  30. pub(crate) fn new(
  31. editor: Arc<ClientDocEditor>,
  32. rev_manager: Arc<RevisionManager>,
  33. ws_msg_rx: mpsc::UnboundedReceiver<WsDocumentData>,
  34. ws_sender: Arc<dyn DocumentWebSocket>,
  35. stop_rx: SteamStopRx,
  36. ) -> Self {
  37. RevisionDownStream {
  38. editor,
  39. rev_manager,
  40. ws_msg_rx: Some(ws_msg_rx),
  41. ws_sender,
  42. stop_rx: Some(stop_rx),
  43. }
  44. }
  45. pub async fn run(mut self) {
  46. let mut receiver = self.ws_msg_rx.take().expect("Only take once");
  47. let mut stop_rx = self.stop_rx.take().expect("Only take once");
  48. let doc_id = self.editor.doc_id.clone();
  49. let stream = stream! {
  50. loop {
  51. tokio::select! {
  52. result = receiver.recv() => {
  53. match result {
  54. Some(msg) => yield msg,
  55. None => {},
  56. }
  57. },
  58. _ = stop_rx.recv() => {
  59. tracing::debug!("[RevisionDownStream:{}] loop exit", doc_id);
  60. break
  61. },
  62. };
  63. }
  64. };
  65. stream
  66. .for_each(|msg| async {
  67. match self.handle_message(msg).await {
  68. Ok(_) => {},
  69. Err(e) => log::error!("[RevisionDownStream:{}] error: {}", self.editor.doc_id, e),
  70. }
  71. })
  72. .await;
  73. }
  74. async fn handle_message(&self, msg: WsDocumentData) -> DocResult<()> {
  75. let WsDocumentData { doc_id: _, ty, data } = msg;
  76. let bytes = spawn_blocking(move || Bytes::from(data))
  77. .await
  78. .map_err(internal_error)?;
  79. log::debug!("[RevisionDownStream]: receives new message: {:?}", ty);
  80. match ty {
  81. WsDataType::PushRev => {
  82. let _ = self.editor.handle_push_rev(bytes).await?;
  83. },
  84. WsDataType::PullRev => {
  85. let range = RevisionRange::try_from(bytes)?;
  86. let revision = self.rev_manager.mk_revisions(range).await?;
  87. let _ = self.ws_sender.send(revision.into());
  88. },
  89. WsDataType::Acked => {
  90. let rev_id = RevId::try_from(bytes)?;
  91. let _ = self.rev_manager.ack_revision(rev_id).await?;
  92. },
  93. WsDataType::Conflict => {},
  94. WsDataType::NewDocUser => {},
  95. }
  96. Ok(())
  97. }
  98. }
  99. // RevisionUpStream
  100. pub(crate) enum UpStreamMsg {
  101. Tick,
  102. }
  103. pub type SteamStopRx = broadcast::Receiver<()>;
  104. pub type SteamStopTx = broadcast::Sender<()>;
  105. pub(crate) struct RevisionUpStream {
  106. revisions: Arc<dyn RevisionIterator>,
  107. ws_sender: Arc<dyn DocumentWebSocket>,
  108. stop_rx: Option<SteamStopRx>,
  109. doc_id: String,
  110. }
  111. impl RevisionUpStream {
  112. pub(crate) fn new(
  113. doc_id: &str,
  114. revisions: Arc<dyn RevisionIterator>,
  115. ws_sender: Arc<dyn DocumentWebSocket>,
  116. stop_rx: SteamStopRx,
  117. ) -> Self {
  118. Self {
  119. revisions,
  120. ws_sender,
  121. stop_rx: Some(stop_rx),
  122. doc_id: doc_id.to_owned(),
  123. }
  124. }
  125. pub async fn run(mut self) {
  126. let (tx, mut rx) = mpsc::unbounded_channel();
  127. let mut stop_rx = self.stop_rx.take().expect("Only take once");
  128. let doc_id = self.doc_id.clone();
  129. tokio::spawn(tick(tx));
  130. let stream = stream! {
  131. loop {
  132. tokio::select! {
  133. result = rx.recv() => {
  134. match result {
  135. Some(msg) => yield msg,
  136. None => {},
  137. }
  138. },
  139. _ = stop_rx.recv() => {
  140. tracing::debug!("[RevisionUpStream:{}] loop exit", doc_id);
  141. break
  142. },
  143. };
  144. }
  145. };
  146. stream
  147. .for_each(|msg| async {
  148. match self.handle_msg(msg).await {
  149. Ok(_) => {},
  150. Err(e) => log::error!("{:?}", e),
  151. }
  152. })
  153. .await;
  154. }
  155. async fn handle_msg(&self, msg: UpStreamMsg) -> DocResult<()> {
  156. match msg {
  157. UpStreamMsg::Tick => self.send_next_revision().await,
  158. }
  159. }
  160. async fn send_next_revision(&self) -> DocResult<()> {
  161. match self.revisions.next().await? {
  162. None => Ok(()),
  163. Some(record) => {
  164. tracing::debug!(
  165. "[RevisionUpStream]: processes revision: {}:{:?}",
  166. record.revision.doc_id,
  167. record.revision.rev_id
  168. );
  169. let _ = self.ws_sender.send(record.revision.into()).map_err(internal_error);
  170. // let _ = tokio::time::timeout(Duration::from_millis(2000), ret.recv()).await;
  171. Ok(())
  172. },
  173. }
  174. }
  175. }
  176. async fn tick(sender: mpsc::UnboundedSender<UpStreamMsg>) {
  177. let mut i = interval(Duration::from_secs(2));
  178. loop {
  179. match sender.send(UpStreamMsg::Tick) {
  180. Ok(_) => {},
  181. Err(e) => log::error!("RevisionUploadStream tick error: {}", e),
  182. }
  183. i.tick().await;
  184. }
  185. }