ws_client.rs 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. use crate::{
  2. config::{HEARTBEAT_INTERVAL, PING_TIMEOUT},
  3. ws_service::{
  4. entities::{Connect, Disconnect, SessionId},
  5. ClientMessage,
  6. MessageData,
  7. WSServer,
  8. },
  9. };
  10. use actix::{
  11. fut,
  12. Actor,
  13. ActorContext,
  14. ActorFutureExt,
  15. Addr,
  16. AsyncContext,
  17. ContextFutureSpawner,
  18. Handler,
  19. Running,
  20. StreamHandler,
  21. WrapFuture,
  22. };
  23. use actix_web_actors::{ws, ws::Message::Text};
  24. use std::time::Instant;
  25. pub struct WSClient {
  26. sid: SessionId,
  27. server: Addr<WSServer>,
  28. hb: Instant,
  29. }
  30. impl WSClient {
  31. pub fn new(sid: SessionId, server: Addr<WSServer>) -> Self {
  32. Self {
  33. sid,
  34. hb: Instant::now(),
  35. server,
  36. }
  37. }
  38. fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
  39. ctx.run_interval(HEARTBEAT_INTERVAL, |ws_session, ctx| {
  40. if Instant::now().duration_since(ws_session.hb) > PING_TIMEOUT {
  41. ws_session.server.do_send(Disconnect {
  42. sid: ws_session.sid.clone(),
  43. });
  44. ctx.stop();
  45. return;
  46. }
  47. ctx.ping(b"");
  48. });
  49. }
  50. fn send(&self, data: MessageData) {
  51. let msg = ClientMessage::new(self.sid.clone(), data);
  52. self.server.do_send(msg);
  53. }
  54. }
  55. impl Actor for WSClient {
  56. type Context = ws::WebsocketContext<Self>;
  57. fn started(&mut self, ctx: &mut Self::Context) {
  58. self.hb(ctx);
  59. let socket = ctx.address().recipient();
  60. let connect = Connect {
  61. socket,
  62. sid: self.sid.clone(),
  63. };
  64. self.server
  65. .send(connect)
  66. .into_actor(self)
  67. .then(|res, _ws_session, _ctx| {
  68. match res {
  69. Ok(Ok(_)) => {},
  70. Ok(Err(_e)) => {
  71. unimplemented!()
  72. },
  73. Err(_e) => unimplemented!(),
  74. }
  75. fut::ready(())
  76. })
  77. .wait(ctx);
  78. }
  79. fn stopping(&mut self, _: &mut Self::Context) -> Running {
  80. self.server.do_send(Disconnect {
  81. sid: self.sid.clone(),
  82. });
  83. Running::Stop
  84. }
  85. }
  86. impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSClient {
  87. fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
  88. match msg {
  89. Ok(ws::Message::Ping(msg)) => {
  90. log::debug!("Receive {} ping {:?}", &self.sid, &msg);
  91. self.hb = Instant::now();
  92. ctx.pong(&msg);
  93. },
  94. Ok(ws::Message::Pong(msg)) => {
  95. log::debug!("Receive {} pong {:?}", &self.sid, &msg);
  96. self.send(MessageData::Connect(self.sid.clone()));
  97. self.hb = Instant::now();
  98. },
  99. Ok(ws::Message::Binary(bin)) => {
  100. log::debug!(" Receive {} binary", &self.sid);
  101. self.send(MessageData::Binary(bin));
  102. },
  103. Ok(ws::Message::Close(reason)) => {
  104. log::debug!("Receive {} close {:?}", &self.sid, &reason);
  105. ctx.close(reason);
  106. ctx.stop();
  107. },
  108. Ok(ws::Message::Continuation(c)) => {
  109. log::debug!("Receive {} continues message {:?}", &self.sid, &c);
  110. },
  111. Ok(ws::Message::Nop) => {
  112. log::debug!("Receive Nop message");
  113. },
  114. Ok(Text(s)) => {
  115. log::debug!("Receive {} text {:?}", &self.sid, &s);
  116. self.send(MessageData::Text(s.to_string()));
  117. },
  118. Err(e) => {
  119. let msg = format!("{} error: {:?}", &self.sid, e);
  120. log::error!("stream {}", msg);
  121. ctx.text(msg);
  122. ctx.stop();
  123. },
  124. }
  125. }
  126. }
  127. impl Handler<ClientMessage> for WSClient {
  128. type Result = ();
  129. fn handle(&mut self, msg: ClientMessage, ctx: &mut Self::Context) {
  130. match msg.data {
  131. MessageData::Text(text) => {
  132. ctx.text(text);
  133. },
  134. MessageData::Binary(binary) => {
  135. ctx.binary(binary);
  136. },
  137. MessageData::Connect(sid) => {
  138. let connect_msg = format!("{} connect", &sid);
  139. ctx.text(connect_msg);
  140. },
  141. MessageData::Disconnect(text) => {
  142. log::debug!("Session start disconnecting {}", self.sid);
  143. ctx.text(text);
  144. ctx.stop();
  145. },
  146. }
  147. }
  148. }