connect.rs 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. #![allow(clippy::all)]
  2. use crate::{
  3. errors::{internal_error, WSError},
  4. MsgReceiver, MsgSender,
  5. };
  6. use futures_core::{future::BoxFuture, ready};
  7. use futures_util::{FutureExt, StreamExt};
  8. use pin_project::pin_project;
  9. use std::{
  10. fmt,
  11. future::Future,
  12. pin::Pin,
  13. task::{Context, Poll},
  14. };
  15. use tokio::net::TcpStream;
  16. use tokio_tungstenite::{
  17. connect_async,
  18. tungstenite::{handshake::client::Response, Error, Message},
  19. MaybeTlsStream, WebSocketStream,
  20. };
  21. type WsConnectResult = Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response), Error>;
  22. #[pin_project]
  23. pub struct WSConnectionFuture {
  24. msg_tx: Option<MsgSender>,
  25. ws_rx: Option<MsgReceiver>,
  26. #[pin]
  27. fut: Pin<Box<dyn Future<Output = WsConnectResult> + Send + Sync>>,
  28. }
  29. impl WSConnectionFuture {
  30. pub fn new(msg_tx: MsgSender, ws_rx: MsgReceiver, addr: String) -> Self {
  31. WSConnectionFuture {
  32. msg_tx: Some(msg_tx),
  33. ws_rx: Some(ws_rx),
  34. fut: Box::pin(async move { connect_async(&addr).await }),
  35. }
  36. }
  37. }
  38. impl Future for WSConnectionFuture {
  39. type Output = Result<WSStream, WSError>;
  40. fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
  41. // [[pin]]
  42. // poll async function. The following methods not work.
  43. // 1.
  44. // let f = connect_async("");
  45. // pin_mut!(f);
  46. // ready!(Pin::new(&mut a).poll(cx))
  47. //
  48. // 2.ready!(Pin::new(&mut Box::pin(connect_async(""))).poll(cx))
  49. //
  50. // An async method calls poll multiple times and might return to the executor. A
  51. // single poll call can only return to the executor once and will get
  52. // resumed through another poll invocation. the connect_async call multiple time
  53. // from the beginning. So I use fut to hold the future and continue to
  54. // poll it. (Fix me if i was wrong)
  55. loop {
  56. return match ready!(self.as_mut().project().fut.poll(cx)) {
  57. Ok((stream, _)) => {
  58. tracing::debug!("[WebSocket]: connect success");
  59. let (msg_tx, ws_rx) = (
  60. self.msg_tx
  61. .take()
  62. .expect("[WebSocket]: WSConnection should be call once "),
  63. self.ws_rx
  64. .take()
  65. .expect("[WebSocket]: WSConnection should be call once "),
  66. );
  67. Poll::Ready(Ok(WSStream::new(msg_tx, ws_rx, stream)))
  68. }
  69. Err(error) => {
  70. tracing::debug!("[WebSocket]: ❌ connect failed: {:?}", error);
  71. Poll::Ready(Err(error.into()))
  72. }
  73. };
  74. }
  75. }
  76. }
  77. type Fut = BoxFuture<'static, Result<(), WSError>>;
  78. #[pin_project]
  79. pub struct WSStream {
  80. #[allow(dead_code)]
  81. msg_tx: MsgSender,
  82. #[pin]
  83. inner: Option<(Fut, Fut)>,
  84. }
  85. impl WSStream {
  86. pub fn new(msg_tx: MsgSender, ws_rx: MsgReceiver, stream: WebSocketStream<MaybeTlsStream<TcpStream>>) -> Self {
  87. let (ws_write, ws_read) = stream.split();
  88. Self {
  89. msg_tx: msg_tx.clone(),
  90. inner: Some((
  91. Box::pin(async move {
  92. let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
  93. let read = async {
  94. ws_read
  95. .for_each(|message| async {
  96. match tx.send(send_message(msg_tx.clone(), message)) {
  97. Ok(_) => {}
  98. Err(e) => log::error!("[WebSocket]: WSStream sender closed unexpectedly: {} ", e),
  99. }
  100. })
  101. .await;
  102. Ok(())
  103. };
  104. let read_ret = async {
  105. loop {
  106. match rx.recv().await {
  107. None => {
  108. return Err(WSError::internal()
  109. .context("[WebSocket]: WSStream receiver closed unexpectedly"));
  110. }
  111. Some(result) => {
  112. if result.is_err() {
  113. return result;
  114. }
  115. }
  116. }
  117. }
  118. };
  119. futures::pin_mut!(read);
  120. futures::pin_mut!(read_ret);
  121. return tokio::select! {
  122. result = read => result,
  123. result = read_ret => result,
  124. };
  125. }),
  126. Box::pin(async move {
  127. let result = ws_rx.map(Ok).forward(ws_write).await.map_err(internal_error);
  128. result
  129. }),
  130. )),
  131. }
  132. }
  133. }
  134. impl fmt::Debug for WSStream {
  135. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  136. f.debug_struct("WSStream").finish()
  137. }
  138. }
  139. impl Future for WSStream {
  140. type Output = Result<(), WSError>;
  141. fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
  142. let (mut ws_read, mut ws_write) = self.inner.take().unwrap();
  143. match ws_read.poll_unpin(cx) {
  144. Poll::Ready(l) => Poll::Ready(l),
  145. Poll::Pending => {
  146. //
  147. match ws_write.poll_unpin(cx) {
  148. Poll::Ready(r) => Poll::Ready(r),
  149. Poll::Pending => {
  150. self.inner = Some((ws_read, ws_write));
  151. Poll::Pending
  152. }
  153. }
  154. }
  155. }
  156. }
  157. }
  158. fn send_message(msg_tx: MsgSender, message: Result<Message, Error>) -> Result<(), WSError> {
  159. match message {
  160. Ok(Message::Binary(bytes)) => msg_tx.unbounded_send(Message::Binary(bytes)).map_err(internal_error),
  161. Ok(_) => Ok(()),
  162. Err(e) => Err(WSError::internal().context(e)),
  163. }
  164. }
  165. #[allow(dead_code)]
  166. pub struct Retry<F> {
  167. f: F,
  168. #[allow(dead_code)]
  169. retry_time: usize,
  170. addr: String,
  171. }
  172. impl<F> Retry<F>
  173. where
  174. F: Fn(&str),
  175. {
  176. #[allow(dead_code)]
  177. pub fn new(addr: &str, f: F) -> Self {
  178. Self {
  179. f,
  180. retry_time: 3,
  181. addr: addr.to_owned(),
  182. }
  183. }
  184. }
  185. impl<F> Future for Retry<F>
  186. where
  187. F: Fn(&str),
  188. {
  189. type Output = ();
  190. fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
  191. (self.f)(&self.addr);
  192. Poll::Ready(())
  193. }
  194. }