connect.rs 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. use crate::{
  2. errors::{internal_error, WsError},
  3. MsgReceiver,
  4. MsgSender,
  5. };
  6. use futures_core::{future::BoxFuture, ready};
  7. use futures_util::{FutureExt, StreamExt};
  8. use pin_project::pin_project;
  9. use std::{
  10. fmt,
  11. future::Future,
  12. pin::Pin,
  13. task::{Context, Poll},
  14. };
  15. use tokio::net::TcpStream;
  16. use tokio_tungstenite::{
  17. connect_async,
  18. tungstenite::{handshake::client::Response, Error, Message},
  19. MaybeTlsStream,
  20. WebSocketStream,
  21. };
  22. #[pin_project]
  23. pub struct WsConnectionFuture {
  24. msg_tx: Option<MsgSender>,
  25. ws_rx: Option<MsgReceiver>,
  26. #[pin]
  27. fut: Pin<
  28. Box<dyn Future<Output = Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response), Error>> + Send + Sync>,
  29. >,
  30. }
  31. impl WsConnectionFuture {
  32. pub fn new(msg_tx: MsgSender, ws_rx: MsgReceiver, addr: String) -> Self {
  33. WsConnectionFuture {
  34. msg_tx: Some(msg_tx),
  35. ws_rx: Some(ws_rx),
  36. fut: Box::pin(async move { connect_async(&addr).await }),
  37. }
  38. }
  39. }
  40. impl Future for WsConnectionFuture {
  41. type Output = Result<WsStream, WsError>;
  42. fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
  43. // [[pin]]
  44. // poll async function. The following methods not work.
  45. // 1.
  46. // let f = connect_async("");
  47. // pin_mut!(f);
  48. // ready!(Pin::new(&mut a).poll(cx))
  49. //
  50. // 2.ready!(Pin::new(&mut Box::pin(connect_async(""))).poll(cx))
  51. //
  52. // An async method calls poll multiple times and might return to the executor. A
  53. // single poll call can only return to the executor once and will get
  54. // resumed through another poll invocation. the connect_async call multiple time
  55. // from the beginning. So I use fut to hold the future and continue to
  56. // poll it. (Fix me if i was wrong)
  57. loop {
  58. return match ready!(self.as_mut().project().fut.poll(cx)) {
  59. Ok((stream, _)) => {
  60. tracing::debug!("🐴 ws connect success");
  61. let (msg_tx, ws_rx) = (
  62. self.msg_tx.take().expect("WsConnection should be call once "),
  63. self.ws_rx.take().expect("WsConnection should be call once "),
  64. );
  65. Poll::Ready(Ok(WsStream::new(msg_tx, ws_rx, stream)))
  66. },
  67. Err(error) => {
  68. tracing::debug!("🐴 ws connect failed: {:?}", error);
  69. Poll::Ready(Err(error.into()))
  70. },
  71. };
  72. }
  73. }
  74. }
  75. type Fut = BoxFuture<'static, Result<(), WsError>>;
  76. #[pin_project]
  77. pub struct WsStream {
  78. #[allow(dead_code)]
  79. msg_tx: MsgSender,
  80. #[pin]
  81. inner: Option<(Fut, Fut)>,
  82. }
  83. impl WsStream {
  84. pub fn new(msg_tx: MsgSender, ws_rx: MsgReceiver, stream: WebSocketStream<MaybeTlsStream<TcpStream>>) -> Self {
  85. let (ws_write, ws_read) = stream.split();
  86. Self {
  87. msg_tx: msg_tx.clone(),
  88. inner: Some((
  89. Box::pin(async move {
  90. let (tx, mut rx) = tokio::sync::mpsc::channel(100);
  91. let read = async {
  92. ws_read
  93. .for_each(|message| async {
  94. match tx.send(post_message(msg_tx.clone(), message)).await {
  95. Ok(_) => {},
  96. Err(e) => log::error!("WsStream tx closed unexpectedly: {} ", e),
  97. }
  98. })
  99. .await;
  100. Ok(())
  101. };
  102. let ret = async {
  103. loop {
  104. match rx.recv().await {
  105. None => {
  106. return Err(WsError::internal().context("WsStream rx closed unexpectedly"));
  107. },
  108. Some(result) => {
  109. if result.is_err() {
  110. return result;
  111. }
  112. },
  113. }
  114. }
  115. };
  116. futures::pin_mut!(ret);
  117. futures::pin_mut!(read);
  118. tokio::select! {
  119. result = read => {return result},
  120. result = ret => {return result},
  121. };
  122. }),
  123. Box::pin(async move {
  124. let result = ws_rx.map(Ok).forward(ws_write).await.map_err(internal_error);
  125. result
  126. }),
  127. )),
  128. }
  129. }
  130. }
  131. impl fmt::Debug for WsStream {
  132. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("WsStream").finish() }
  133. }
  134. impl Future for WsStream {
  135. type Output = Result<(), WsError>;
  136. fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
  137. let (mut ws_read, mut ws_write) = self.inner.take().unwrap();
  138. match ws_read.poll_unpin(cx) {
  139. Poll::Ready(l) => Poll::Ready(l),
  140. Poll::Pending => {
  141. //
  142. match ws_write.poll_unpin(cx) {
  143. Poll::Ready(r) => Poll::Ready(r),
  144. Poll::Pending => {
  145. self.inner = Some((ws_read, ws_write));
  146. Poll::Pending
  147. },
  148. }
  149. },
  150. }
  151. }
  152. }
  153. fn post_message(tx: MsgSender, message: Result<Message, Error>) -> Result<(), WsError> {
  154. match message {
  155. Ok(Message::Binary(bytes)) => tx.unbounded_send(Message::Binary(bytes)).map_err(internal_error),
  156. Ok(_) => Ok(()),
  157. Err(e) => Err(WsError::internal().context(e)),
  158. }
  159. }
  160. #[allow(dead_code)]
  161. pub struct Retry<F> {
  162. f: F,
  163. #[allow(dead_code)]
  164. retry_time: usize,
  165. addr: String,
  166. }
  167. impl<F> Retry<F>
  168. where
  169. F: Fn(&str),
  170. {
  171. #[allow(dead_code)]
  172. pub fn new(addr: &str, f: F) -> Self {
  173. Self {
  174. f,
  175. retry_time: 3,
  176. addr: addr.to_owned(),
  177. }
  178. }
  179. }
  180. impl<F> Future for Retry<F>
  181. where
  182. F: Fn(&str),
  183. {
  184. type Output = ();
  185. fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
  186. (self.f)(&self.addr);
  187. Poll::Ready(())
  188. }
  189. }