local_ws_impl.rs 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. use bytes::Bytes;
  2. use dashmap::DashMap;
  3. use flowy_collaboration::entities::ws::*;
  4. use flowy_error::{internal_error, FlowyError};
  5. use lib_infra::future::FutureResult;
  6. use lib_ws::{WSConnectState, WSMessageReceiver, WSModule, WebSocketRawMessage};
  7. use crate::services::{
  8. local_ws::local_server::{spawn_server, LocalDocumentServer},
  9. ws_conn::{FlowyRawWebSocket, FlowyWSSender},
  10. };
  11. use std::{convert::TryFrom, sync::Arc};
  12. use tokio::sync::{broadcast, broadcast::Receiver};
  13. pub struct LocalWebSocket {
  14. receivers: Arc<DashMap<WSModule, Arc<dyn WSMessageReceiver>>>,
  15. state_sender: broadcast::Sender<WSConnectState>,
  16. ws_sender: LocalWSSender,
  17. server: Arc<LocalDocumentServer>,
  18. }
  19. impl std::default::Default for LocalWebSocket {
  20. fn default() -> Self {
  21. let (state_sender, _) = broadcast::channel(16);
  22. let ws_sender = LocalWSSender::default();
  23. let receivers = Arc::new(DashMap::new());
  24. let server = spawn_server(receivers.clone());
  25. LocalWebSocket {
  26. receivers,
  27. state_sender,
  28. ws_sender,
  29. server,
  30. }
  31. }
  32. }
  33. impl LocalWebSocket {
  34. fn spawn_client(&self, _addr: String) {
  35. let mut ws_receiver = self.ws_sender.subscribe();
  36. let server = self.server.clone();
  37. tokio::spawn(async move {
  38. loop {
  39. match ws_receiver.recv().await {
  40. Ok(message) => {
  41. let fut = || async {
  42. let bytes = Bytes::from(message.data);
  43. let client_data = DocumentClientWSData::try_from(bytes).map_err(internal_error)?;
  44. let _ = server.handle_client_data(client_data).await?;
  45. Ok::<(), FlowyError>(())
  46. };
  47. match fut().await {
  48. Ok(_) => {},
  49. Err(e) => tracing::error!("[LocalWebSocket] error: {:?}", e),
  50. }
  51. },
  52. Err(e) => tracing::error!("[LocalWebSocket] error: {}", e),
  53. }
  54. }
  55. });
  56. }
  57. }
  58. impl FlowyRawWebSocket for LocalWebSocket {
  59. fn start_connect(&self, addr: String) -> FutureResult<(), FlowyError> {
  60. self.spawn_client(addr);
  61. FutureResult::new(async { Ok(()) })
  62. }
  63. fn stop_connect(&self) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
  64. fn subscribe_connect_state(&self) -> Receiver<WSConnectState> { self.state_sender.subscribe() }
  65. fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
  66. fn add_receiver(&self, receiver: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError> {
  67. self.receivers.insert(receiver.source(), receiver);
  68. Ok(())
  69. }
  70. fn sender(&self) -> Result<Arc<dyn FlowyWSSender>, FlowyError> { Ok(Arc::new(self.ws_sender.clone())) }
  71. }
  72. #[derive(Clone)]
  73. struct LocalWSSender(broadcast::Sender<WebSocketRawMessage>);
  74. impl std::default::Default for LocalWSSender {
  75. fn default() -> Self {
  76. let (tx, _) = broadcast::channel(16);
  77. Self(tx)
  78. }
  79. }
  80. impl FlowyWSSender for LocalWSSender {
  81. fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError> {
  82. let _ = self.0.send(msg);
  83. Ok(())
  84. }
  85. }
  86. impl std::ops::Deref for LocalWSSender {
  87. type Target = broadcast::Sender<WebSocketRawMessage>;
  88. fn deref(&self) -> &Self::Target { &self.0 }
  89. }