|
@@ -9,7 +9,7 @@ use flowy_error::{FlowyError, FlowyResult};
|
|
|
use futures_util::stream::StreamExt;
|
|
|
use lib_infra::future::{BoxResultFuture, FutureResult};
|
|
|
use lib_ws::WSConnectState;
|
|
|
-use std::{collections::VecDeque, convert::TryFrom, sync::Arc};
|
|
|
+use std::{collections::VecDeque, convert::TryFrom, fmt::Formatter, sync::Arc};
|
|
|
use tokio::{
|
|
|
sync::{
|
|
|
broadcast,
|
|
@@ -41,6 +41,7 @@ pub trait RevisionWebSocket: Send + Sync + 'static {
|
|
|
}
|
|
|
|
|
|
pub struct RevisionWebSocketManager {
|
|
|
+ pub object_name: String,
|
|
|
pub object_id: String,
|
|
|
sink_provider: Arc<dyn RevisionWSSinkDataProvider>,
|
|
|
stream_consumer: Arc<dyn RevisionWSSteamConsumer>,
|
|
@@ -51,8 +52,14 @@ pub struct RevisionWebSocketManager {
|
|
|
stop_sync_tx: SinkStopTx,
|
|
|
}
|
|
|
|
|
|
+impl std::fmt::Display for RevisionWebSocketManager {
|
|
|
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
|
|
+ f.write_fmt(format_args!("{}RevisionWebSocketManager", self.object_name))
|
|
|
+ }
|
|
|
+}
|
|
|
impl RevisionWebSocketManager {
|
|
|
pub fn new(
|
|
|
+ object_name: &str,
|
|
|
object_id: &str,
|
|
|
web_socket: Arc<dyn RevisionWebSocket>,
|
|
|
sink_provider: Arc<dyn RevisionWSSinkDataProvider>,
|
|
@@ -62,9 +69,11 @@ impl RevisionWebSocketManager {
|
|
|
let (ws_passthrough_tx, ws_passthrough_rx) = mpsc::channel(1000);
|
|
|
let (stop_sync_tx, _) = tokio::sync::broadcast::channel(2);
|
|
|
let object_id = object_id.to_string();
|
|
|
+ let object_name = object_name.to_string();
|
|
|
let (state_passthrough_tx, _) = broadcast::channel(2);
|
|
|
let mut manager = RevisionWebSocketManager {
|
|
|
object_id,
|
|
|
+ object_name,
|
|
|
sink_provider,
|
|
|
stream_consumer,
|
|
|
web_socket,
|
|
@@ -81,12 +90,14 @@ impl RevisionWebSocketManager {
|
|
|
let ws_msg_rx = self.ws_passthrough_rx.take().expect("Only take once");
|
|
|
let sink = RevisionWSSink::new(
|
|
|
&self.object_id,
|
|
|
+ &self.object_name,
|
|
|
self.sink_provider.clone(),
|
|
|
self.web_socket.clone(),
|
|
|
self.stop_sync_tx.subscribe(),
|
|
|
ping_duration,
|
|
|
);
|
|
|
let stream = RevisionWSStream::new(
|
|
|
+ &self.object_name,
|
|
|
&self.object_id,
|
|
|
self.stream_consumer.clone(),
|
|
|
ws_msg_rx,
|
|
@@ -106,28 +117,37 @@ impl RevisionWebSocketManager {
|
|
|
}
|
|
|
|
|
|
impl std::ops::Drop for RevisionWebSocketManager {
|
|
|
- fn drop(&mut self) { tracing::trace!("{} RevisionWebSocketManager was dropped", self.object_id) }
|
|
|
+ fn drop(&mut self) { tracing::trace!("{} was dropped", self) }
|
|
|
}
|
|
|
|
|
|
pub struct RevisionWSStream {
|
|
|
+ object_name: String,
|
|
|
object_id: String,
|
|
|
consumer: Arc<dyn RevisionWSSteamConsumer>,
|
|
|
ws_msg_rx: Option<mpsc::Receiver<ServerRevisionWSData>>,
|
|
|
stop_rx: Option<SinkStopRx>,
|
|
|
}
|
|
|
|
|
|
+impl std::fmt::Display for RevisionWSStream {
|
|
|
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
|
|
+ f.write_fmt(format_args!("{}RevisionWSStream", self.object_name))
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
impl std::ops::Drop for RevisionWSStream {
|
|
|
- fn drop(&mut self) { tracing::trace!("{} RevisionWSStream was dropped", self.object_id) }
|
|
|
+ fn drop(&mut self) { tracing::trace!("{} was dropped", self) }
|
|
|
}
|
|
|
|
|
|
impl RevisionWSStream {
|
|
|
pub fn new(
|
|
|
+ object_name: &str,
|
|
|
object_id: &str,
|
|
|
consumer: Arc<dyn RevisionWSSteamConsumer>,
|
|
|
ws_msg_rx: mpsc::Receiver<ServerRevisionWSData>,
|
|
|
stop_rx: SinkStopRx,
|
|
|
) -> Self {
|
|
|
RevisionWSStream {
|
|
|
+ object_name: object_name.to_string(),
|
|
|
object_id: object_id.to_owned(),
|
|
|
consumer,
|
|
|
ws_msg_rx: Some(ws_msg_rx),
|
|
@@ -139,6 +159,7 @@ impl RevisionWSStream {
|
|
|
let mut receiver = self.ws_msg_rx.take().expect("Only take once");
|
|
|
let mut stop_rx = self.stop_rx.take().expect("Only take once");
|
|
|
let object_id = self.object_id.clone();
|
|
|
+ let name = format!("{}", &self);
|
|
|
let stream = stream! {
|
|
|
loop {
|
|
|
tokio::select! {
|
|
@@ -148,13 +169,13 @@ impl RevisionWSStream {
|
|
|
yield msg
|
|
|
},
|
|
|
None => {
|
|
|
- tracing::debug!("[RevisionWSStream]:{} loop exit", object_id);
|
|
|
+ tracing::debug!("[{}]:{} loop exit", name, object_id);
|
|
|
break;
|
|
|
},
|
|
|
}
|
|
|
},
|
|
|
_ = stop_rx.recv() => {
|
|
|
- tracing::debug!("[RevisionWSStream]:{} loop exit", object_id);
|
|
|
+ tracing::debug!("[{}]:{} loop exit", name, object_id);
|
|
|
break
|
|
|
},
|
|
|
};
|
|
@@ -165,7 +186,7 @@ impl RevisionWSStream {
|
|
|
.for_each(|msg| async {
|
|
|
match self.handle_message(msg).await {
|
|
|
Ok(_) => {},
|
|
|
- Err(e) => tracing::error!("[RevisionWSStream]:{} error: {}", self.object_id, e),
|
|
|
+ Err(e) => tracing::error!("[{}]:{} error: {}", &self, self.object_id, e),
|
|
|
}
|
|
|
})
|
|
|
.await;
|
|
@@ -174,7 +195,7 @@ impl RevisionWSStream {
|
|
|
async fn handle_message(&self, msg: ServerRevisionWSData) -> FlowyResult<()> {
|
|
|
let ServerRevisionWSData { object_id, ty, data } = msg;
|
|
|
let bytes = Bytes::from(data);
|
|
|
- tracing::trace!("[RevisionWSStream]: new message: {}:{:?}", object_id, ty);
|
|
|
+ tracing::trace!("[{}]: new message: {}:{:?}", self, object_id, ty);
|
|
|
match ty {
|
|
|
ServerRevisionWSDataType::ServerPushRev => {
|
|
|
let _ = self.consumer.receive_push_revision(bytes).await?;
|
|
@@ -199,26 +220,29 @@ impl RevisionWSStream {
|
|
|
type SinkStopRx = broadcast::Receiver<()>;
|
|
|
type SinkStopTx = broadcast::Sender<()>;
|
|
|
pub struct RevisionWSSink {
|
|
|
+ object_id: String,
|
|
|
+ object_name: String,
|
|
|
provider: Arc<dyn RevisionWSSinkDataProvider>,
|
|
|
ws_sender: Arc<dyn RevisionWebSocket>,
|
|
|
stop_rx: Option<SinkStopRx>,
|
|
|
- object_id: String,
|
|
|
ping_duration: Duration,
|
|
|
}
|
|
|
|
|
|
impl RevisionWSSink {
|
|
|
pub fn new(
|
|
|
object_id: &str,
|
|
|
+ object_name: &str,
|
|
|
provider: Arc<dyn RevisionWSSinkDataProvider>,
|
|
|
ws_sender: Arc<dyn RevisionWebSocket>,
|
|
|
stop_rx: SinkStopRx,
|
|
|
ping_duration: Duration,
|
|
|
) -> Self {
|
|
|
Self {
|
|
|
+ object_id: object_id.to_owned(),
|
|
|
+ object_name: object_name.to_owned(),
|
|
|
provider,
|
|
|
ws_sender,
|
|
|
stop_rx: Some(stop_rx),
|
|
|
- object_id: object_id.to_owned(),
|
|
|
ping_duration,
|
|
|
}
|
|
|
}
|
|
@@ -228,6 +252,7 @@ impl RevisionWSSink {
|
|
|
let mut stop_rx = self.stop_rx.take().expect("Only take once");
|
|
|
let object_id = self.object_id.clone();
|
|
|
tokio::spawn(tick(tx, self.ping_duration));
|
|
|
+ let name = format!("{}", self);
|
|
|
let stream = stream! {
|
|
|
loop {
|
|
|
tokio::select! {
|
|
@@ -238,7 +263,7 @@ impl RevisionWSSink {
|
|
|
}
|
|
|
},
|
|
|
_ = stop_rx.recv() => {
|
|
|
- tracing::trace!("[RevisionWSSink:{}] loop exit", object_id);
|
|
|
+ tracing::trace!("[{}]:{} loop exit", name, object_id);
|
|
|
break
|
|
|
},
|
|
|
};
|
|
@@ -248,7 +273,7 @@ impl RevisionWSSink {
|
|
|
.for_each(|_| async {
|
|
|
match self.send_next_revision().await {
|
|
|
Ok(_) => {},
|
|
|
- Err(e) => tracing::error!("[RevisionWSSink] send failed, {:?}", e),
|
|
|
+ Err(e) => tracing::error!("[{}] send failed, {:?}", self, e),
|
|
|
}
|
|
|
})
|
|
|
.await;
|
|
@@ -257,19 +282,25 @@ impl RevisionWSSink {
|
|
|
async fn send_next_revision(&self) -> FlowyResult<()> {
|
|
|
match self.provider.next().await? {
|
|
|
None => {
|
|
|
- tracing::trace!("Finish synchronizing revisions");
|
|
|
+ tracing::trace!("[{}]: Finish synchronizing revisions", self);
|
|
|
Ok(())
|
|
|
},
|
|
|
Some(data) => {
|
|
|
- tracing::trace!("[RevisionWSSink] send: {}:{}-{:?}", data.object_id, data.id(), data.ty);
|
|
|
+ tracing::trace!("[{}]: send {}:{}-{:?}", self, data.object_id, data.id(), data.ty);
|
|
|
self.ws_sender.send(data)
|
|
|
},
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+impl std::fmt::Display for RevisionWSSink {
|
|
|
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
|
|
+ f.write_fmt(format_args!("{}RevisionWSSink", self.object_name))
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
impl std::ops::Drop for RevisionWSSink {
|
|
|
- fn drop(&mut self) { tracing::trace!("{} RevisionWSSink was dropped", self.object_id) }
|
|
|
+ fn drop(&mut self) { tracing::trace!("{} was dropped", self) }
|
|
|
}
|
|
|
|
|
|
async fn tick(sender: mpsc::Sender<()>, duration: Duration) {
|
|
@@ -330,11 +361,6 @@ impl CompositeWSSinkDataProvider {
|
|
|
}
|
|
|
},
|
|
|
};
|
|
|
-
|
|
|
- if let Ok(Some(data)) = &data {
|
|
|
- tracing::trace!("[CompositeWSSinkDataProvider]: {}:{:?}", data.object_id, data.ty);
|
|
|
- }
|
|
|
-
|
|
|
data
|
|
|
}
|
|
|
|