ws_local.rs 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. use crate::mock::server::MockDocServer;
  2. use bytes::Bytes;
  3. use dashmap::DashMap;
  4. use flowy_collaboration::entities::ws::*;
  5. use flowy_net::services::ws::*;
  6. use lib_infra::future::FutureResult;
  7. use lib_ws::{WSModule, WebSocketRawMessage};
  8. use parking_lot::RwLock;
  9. use std::{convert::TryFrom, sync::Arc};
  10. use tokio::sync::{broadcast, broadcast::Receiver};
  11. pub struct MockWebSocket {
  12. receivers: Arc<DashMap<WSModule, Arc<dyn WSMessageReceiver>>>,
  13. state_sender: broadcast::Sender<WSConnectState>,
  14. ws_sender: MockWSSender,
  15. is_stop: Arc<RwLock<bool>>,
  16. server: Arc<MockDocServer>,
  17. }
  18. impl std::default::Default for MockWebSocket {
  19. fn default() -> Self {
  20. let (state_sender, _) = broadcast::channel(16);
  21. let (ws_sender, _) = broadcast::channel(16);
  22. let server = Arc::new(MockDocServer::default());
  23. MockWebSocket {
  24. receivers: Arc::new(DashMap::new()),
  25. state_sender,
  26. ws_sender: MockWSSender(ws_sender),
  27. is_stop: Arc::new(RwLock::new(false)),
  28. server,
  29. }
  30. }
  31. }
  32. impl FlowyWebSocket for MockWebSocket {
  33. fn start_connect(&self, _addr: String) -> FutureResult<(), FlowyError> {
  34. *self.is_stop.write() = false;
  35. let mut ws_receiver = self.ws_sender.subscribe();
  36. let receivers = self.receivers.clone();
  37. let is_stop = self.is_stop.clone();
  38. let server = self.server.clone();
  39. tokio::spawn(async move {
  40. while let Ok(message) = ws_receiver.recv().await {
  41. if *is_stop.read() {
  42. // do nothing
  43. } else {
  44. let ws_data = DocumentClientWSData::try_from(Bytes::from(message.data.clone())).unwrap();
  45. if let Some(mut rx) = server.handle_client_data(ws_data).await {
  46. let new_ws_message = rx.recv().await.unwrap();
  47. match receivers.get(&new_ws_message.module) {
  48. None => tracing::error!("Can't find any handler for message: {:?}", new_ws_message),
  49. Some(handler) => handler.receive_message(new_ws_message.clone()),
  50. }
  51. }
  52. }
  53. }
  54. });
  55. FutureResult::new(async { Ok(()) })
  56. }
  57. fn stop_connect(&self) -> FutureResult<(), FlowyError> {
  58. *self.is_stop.write() = true;
  59. FutureResult::new(async { Ok(()) })
  60. }
  61. fn subscribe_connect_state(&self) -> Receiver<WSConnectState> { self.state_sender.subscribe() }
  62. fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
  63. fn add_message_receiver(&self, handler: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError> {
  64. self.receivers.insert(handler.source(), handler);
  65. Ok(())
  66. }
  67. fn ws_sender(&self) -> Result<Arc<dyn FlowyWSSender>, FlowyError> { Ok(Arc::new(self.ws_sender.clone())) }
  68. }
  69. #[derive(Clone)]
  70. pub struct MockWSSender(broadcast::Sender<WebSocketRawMessage>);
  71. impl FlowyWSSender for MockWSSender {
  72. fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError> {
  73. let _ = self.0.send(msg);
  74. Ok(())
  75. }
  76. }
  77. impl std::ops::Deref for MockWSSender {
  78. type Target = broadcast::Sender<WebSocketRawMessage>;
  79. fn deref(&self) -> &Self::Target { &self.0 }
  80. }