ws_client.rs 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. use crate::{
  2. config::{HEARTBEAT_INTERVAL, PING_TIMEOUT},
  3. services::user::LoggedUser,
  4. web_socket::{
  5. entities::{Connect, Disconnect, Socket},
  6. WsBizHandlers,
  7. WsMessageAdaptor,
  8. WsServer,
  9. },
  10. };
  11. use actix::*;
  12. use actix_web::web::Data;
  13. use actix_web_actors::{ws, ws::Message::Text};
  14. use bytes::Bytes;
  15. use lib_ws::WsMessage;
  16. use std::{convert::TryFrom, sync::Arc, time::Instant};
  17. #[derive(Debug)]
  18. pub struct WsUser {
  19. inner: LoggedUser,
  20. }
  21. impl WsUser {
  22. pub fn new(inner: LoggedUser) -> Self { Self { inner } }
  23. pub fn id(&self) -> &str { &self.inner.user_id }
  24. }
  25. pub struct WsClientData {
  26. pub(crate) user: Arc<WsUser>,
  27. pub(crate) socket: Socket,
  28. pub(crate) data: Bytes,
  29. }
  30. pub struct WsClient {
  31. user: Arc<WsUser>,
  32. server: Addr<WsServer>,
  33. biz_handlers: Data<WsBizHandlers>,
  34. hb: Instant,
  35. }
  36. impl WsClient {
  37. pub fn new(user: WsUser, server: Addr<WsServer>, biz_handlers: Data<WsBizHandlers>) -> Self {
  38. Self {
  39. user: Arc::new(user),
  40. server,
  41. biz_handlers,
  42. hb: Instant::now(),
  43. }
  44. }
  45. fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
  46. ctx.run_interval(HEARTBEAT_INTERVAL, |client, ctx| {
  47. if Instant::now().duration_since(client.hb) > PING_TIMEOUT {
  48. client.server.do_send(Disconnect {
  49. sid: client.user.id().into(),
  50. });
  51. ctx.stop();
  52. } else {
  53. ctx.ping(b"");
  54. }
  55. });
  56. }
  57. fn handle_binary_message(&self, bytes: Bytes, socket: Socket) {
  58. // TODO: ok to unwrap?
  59. let message: WsMessage = WsMessage::try_from(bytes).unwrap();
  60. match self.biz_handlers.get(&message.module) {
  61. None => {
  62. log::error!("Can't find the handler for {:?}", message.module);
  63. },
  64. Some(handler) => {
  65. let client_data = WsClientData {
  66. user: self.user.clone(),
  67. socket,
  68. data: Bytes::from(message.data),
  69. };
  70. handler.receive(client_data);
  71. },
  72. }
  73. }
  74. }
  75. impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsClient {
  76. fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
  77. match msg {
  78. Ok(ws::Message::Ping(msg)) => {
  79. self.hb = Instant::now();
  80. ctx.pong(&msg);
  81. },
  82. Ok(ws::Message::Pong(_msg)) => {
  83. // tracing::debug!("Receive {} pong {:?}", &self.session_id, &msg);
  84. self.hb = Instant::now();
  85. },
  86. Ok(ws::Message::Binary(bytes)) => {
  87. let socket = ctx.address().recipient();
  88. self.handle_binary_message(bytes, socket);
  89. },
  90. Ok(Text(_)) => {
  91. log::warn!("Receive unexpected text message");
  92. },
  93. Ok(ws::Message::Close(reason)) => {
  94. ctx.close(reason);
  95. ctx.stop();
  96. },
  97. Ok(ws::Message::Continuation(_)) => {},
  98. Ok(ws::Message::Nop) => {},
  99. Err(e) => {
  100. log::error!("[{}]: WebSocketStream protocol error {:?}", self.user.id(), e);
  101. ctx.stop();
  102. },
  103. }
  104. }
  105. }
  106. impl Handler<WsMessageAdaptor> for WsClient {
  107. type Result = ();
  108. fn handle(&mut self, msg: WsMessageAdaptor, ctx: &mut Self::Context) { ctx.binary(msg.0); }
  109. }
  110. impl Actor for WsClient {
  111. type Context = ws::WebsocketContext<Self>;
  112. fn started(&mut self, ctx: &mut Self::Context) {
  113. self.hb(ctx);
  114. let socket = ctx.address().recipient();
  115. let connect = Connect {
  116. socket,
  117. sid: self.user.id().into(),
  118. };
  119. self.server
  120. .send(connect)
  121. .into_actor(self)
  122. .then(|res, _client, _ctx| {
  123. match res {
  124. Ok(Ok(_)) => tracing::trace!("Send connect message to server success"),
  125. Ok(Err(e)) => log::error!("Send connect message to server failed: {:?}", e),
  126. Err(e) => log::error!("Send connect message to server failed: {:?}", e),
  127. }
  128. fut::ready(())
  129. })
  130. .wait(ctx);
  131. }
  132. fn stopping(&mut self, _: &mut Self::Context) -> Running {
  133. self.server.do_send(Disconnect {
  134. sid: self.user.id().into(),
  135. });
  136. Running::Stop
  137. }
  138. }