ws_client.rs 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. use crate::{
  2. config::{HEARTBEAT_INTERVAL, PING_TIMEOUT},
  3. service::{
  4. user::LoggedUser,
  5. ws::{
  6. entities::{Connect, Disconnect, Socket},
  7. WsBizHandlers,
  8. WsMessageAdaptor,
  9. WsServer,
  10. },
  11. },
  12. };
  13. use actix::*;
  14. use actix_web::web::Data;
  15. use actix_web_actors::{ws, ws::Message::Text};
  16. use bytes::Bytes;
  17. use lib_ws::WsMessage;
  18. use std::{convert::TryFrom, sync::Arc, time::Instant};
  19. #[derive(Debug)]
  20. pub struct WsUser {
  21. inner: LoggedUser,
  22. }
  23. impl WsUser {
  24. pub fn new(inner: LoggedUser) -> Self { Self { inner } }
  25. pub fn id(&self) -> &str { &self.inner.user_id }
  26. }
  27. pub struct WsClientData {
  28. pub(crate) user: Arc<WsUser>,
  29. pub(crate) socket: Socket,
  30. pub(crate) data: Bytes,
  31. }
  32. pub struct WsClient {
  33. user: Arc<WsUser>,
  34. server: Addr<WsServer>,
  35. biz_handlers: Data<WsBizHandlers>,
  36. hb: Instant,
  37. }
  38. impl WsClient {
  39. pub fn new(user: WsUser, server: Addr<WsServer>, biz_handlers: Data<WsBizHandlers>) -> Self {
  40. Self {
  41. user: Arc::new(user),
  42. server,
  43. biz_handlers,
  44. hb: Instant::now(),
  45. }
  46. }
  47. fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
  48. ctx.run_interval(HEARTBEAT_INTERVAL, |client, ctx| {
  49. if Instant::now().duration_since(client.hb) > PING_TIMEOUT {
  50. client.server.do_send(Disconnect {
  51. sid: client.user.id().into(),
  52. });
  53. ctx.stop();
  54. } else {
  55. ctx.ping(b"");
  56. }
  57. });
  58. }
  59. fn handle_binary_message(&self, bytes: Bytes, socket: Socket) {
  60. // TODO: ok to unwrap?
  61. let message: WsMessage = WsMessage::try_from(bytes).unwrap();
  62. match self.biz_handlers.get(&message.module) {
  63. None => {
  64. log::error!("Can't find the handler for {:?}", message.module);
  65. },
  66. Some(handler) => {
  67. let client_data = WsClientData {
  68. user: self.user.clone(),
  69. socket,
  70. data: Bytes::from(message.data),
  71. };
  72. handler.receive_data(client_data);
  73. },
  74. }
  75. }
  76. }
  77. impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsClient {
  78. fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
  79. match msg {
  80. Ok(ws::Message::Ping(msg)) => {
  81. self.hb = Instant::now();
  82. ctx.pong(&msg);
  83. },
  84. Ok(ws::Message::Pong(_msg)) => {
  85. // tracing::debug!("Receive {} pong {:?}", &self.session_id, &msg);
  86. self.hb = Instant::now();
  87. },
  88. Ok(ws::Message::Binary(bytes)) => {
  89. let socket = ctx.address().recipient();
  90. self.handle_binary_message(bytes, socket);
  91. },
  92. Ok(Text(_)) => {
  93. log::warn!("Receive unexpected text message");
  94. },
  95. Ok(ws::Message::Close(reason)) => {
  96. ctx.close(reason);
  97. ctx.stop();
  98. },
  99. Ok(ws::Message::Continuation(_)) => {},
  100. Ok(ws::Message::Nop) => {},
  101. Err(e) => {
  102. log::error!("[{}]: WebSocketStream protocol error {:?}", self.user.id(), e);
  103. ctx.stop();
  104. },
  105. }
  106. }
  107. }
  108. impl Handler<WsMessageAdaptor> for WsClient {
  109. type Result = ();
  110. fn handle(&mut self, msg: WsMessageAdaptor, ctx: &mut Self::Context) { ctx.binary(msg.0); }
  111. }
  112. impl Actor for WsClient {
  113. type Context = ws::WebsocketContext<Self>;
  114. fn started(&mut self, ctx: &mut Self::Context) {
  115. self.hb(ctx);
  116. let socket = ctx.address().recipient();
  117. let connect = Connect {
  118. socket,
  119. sid: self.user.id().into(),
  120. };
  121. self.server
  122. .send(connect)
  123. .into_actor(self)
  124. .then(|res, _client, _ctx| {
  125. match res {
  126. Ok(Ok(_)) => tracing::trace!("Send connect message to server success"),
  127. Ok(Err(e)) => log::error!("Send connect message to server failed: {:?}", e),
  128. Err(e) => log::error!("Send connect message to server failed: {:?}", e),
  129. }
  130. fut::ready(())
  131. })
  132. .wait(ctx);
  133. }
  134. fn stopping(&mut self, _: &mut Self::Context) -> Running {
  135. self.server.do_send(Disconnect {
  136. sid: self.user.id().into(),
  137. });
  138. Running::Stop
  139. }
  140. }