request.rs 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. use crate::{
  2. errors::{ErrorCode, ServerError},
  3. response::FlowyResponse,
  4. };
  5. use bytes::Bytes;
  6. use hyper::http;
  7. use protobuf::ProtobufError;
  8. use reqwest::{header::HeaderMap, Client, Method, Response};
  9. use std::{
  10. convert::{TryFrom, TryInto},
  11. sync::Arc,
  12. time::Duration,
  13. };
  14. use tokio::sync::oneshot;
  15. pub trait ResponseMiddleware {
  16. fn receive_response(&self, response: &FlowyResponse);
  17. }
  18. pub struct HttpRequestBuilder {
  19. url: String,
  20. body: Option<Bytes>,
  21. response: Option<Bytes>,
  22. headers: HeaderMap,
  23. method: Method,
  24. middleware: Vec<Arc<dyn ResponseMiddleware + Send + Sync>>,
  25. }
  26. impl HttpRequestBuilder {
  27. pub fn new() -> Self {
  28. Self {
  29. url: "".to_owned(),
  30. body: None,
  31. response: None,
  32. headers: HeaderMap::new(),
  33. method: Method::GET,
  34. middleware: Vec::new(),
  35. }
  36. }
  37. pub fn middleware<T>(mut self, middleware: Arc<T>) -> Self
  38. where
  39. T: 'static + ResponseMiddleware + Send + Sync,
  40. {
  41. self.middleware.push(middleware);
  42. self
  43. }
  44. pub fn get(mut self, url: &str) -> Self {
  45. self.url = url.to_owned();
  46. self.method = Method::GET;
  47. self
  48. }
  49. pub fn post(mut self, url: &str) -> Self {
  50. self.url = url.to_owned();
  51. self.method = Method::POST;
  52. self
  53. }
  54. pub fn patch(mut self, url: &str) -> Self {
  55. self.url = url.to_owned();
  56. self.method = Method::PATCH;
  57. self
  58. }
  59. pub fn delete(mut self, url: &str) -> Self {
  60. self.url = url.to_owned();
  61. self.method = Method::DELETE;
  62. self
  63. }
  64. pub fn header(mut self, key: &'static str, value: &str) -> Self {
  65. self.headers.insert(key, value.parse().unwrap());
  66. self
  67. }
  68. pub fn protobuf<T>(self, body: T) -> Result<Self, ServerError>
  69. where
  70. T: TryInto<Bytes, Error = ProtobufError>,
  71. {
  72. let body: Bytes = body.try_into()?;
  73. self.bytes(body)
  74. }
  75. pub fn bytes(mut self, body: Bytes) -> Result<Self, ServerError> {
  76. self.body = Some(body);
  77. Ok(self)
  78. }
  79. pub async fn send(mut self) -> Result<Self, ServerError> {
  80. let (tx, rx) = oneshot::channel::<Result<Response, _>>();
  81. let url = self.url.clone();
  82. let body = self.body.take();
  83. let method = self.method.clone();
  84. let headers = self.headers.clone();
  85. // reqwest client is not 'Sync' by channel is.
  86. tokio::spawn(async move {
  87. let client = default_client();
  88. let mut builder = client.request(method.clone(), url).headers(headers);
  89. if let Some(body) = body {
  90. builder = builder.body(body);
  91. }
  92. let response = builder.send().await;
  93. match tx.send(response) {
  94. Ok(_) => {},
  95. Err(e) => {
  96. log::error!("[{}] Send http request failed: {:?}", method, e);
  97. },
  98. }
  99. });
  100. let response = rx.await??;
  101. let flowy_response = flowy_response_from(response).await?;
  102. self.middleware.iter().for_each(|middleware| {
  103. middleware.receive_response(&flowy_response);
  104. });
  105. match flowy_response.error {
  106. None => {
  107. self.response = Some(flowy_response.data);
  108. Ok(self)
  109. },
  110. Some(error) => Err(error),
  111. }
  112. }
  113. pub async fn response<T2>(self) -> Result<T2, ServerError>
  114. where
  115. T2: TryFrom<Bytes, Error = ProtobufError>,
  116. {
  117. match self.response {
  118. None => {
  119. let msg = format!("Request: {} receives unexpected empty body", self.url);
  120. Err(ServerError::payload_none().context(msg))
  121. },
  122. Some(data) => Ok(T2::try_from(data)?),
  123. }
  124. }
  125. }
  126. async fn flowy_response_from(original: Response) -> Result<FlowyResponse, ServerError> {
  127. let bytes = original.bytes().await?;
  128. let response: FlowyResponse = serde_json::from_slice(&bytes)?;
  129. Ok(response)
  130. }
  131. #[allow(dead_code)]
  132. async fn get_response_data(original: Response) -> Result<Bytes, ServerError> {
  133. if original.status() == http::StatusCode::OK {
  134. let bytes = original.bytes().await?;
  135. let response: FlowyResponse = serde_json::from_slice(&bytes)?;
  136. match response.error {
  137. None => Ok(response.data),
  138. Some(error) => Err(error),
  139. }
  140. } else {
  141. Err(ServerError::http().context(original))
  142. }
  143. }
  144. fn default_client() -> Client {
  145. let result = reqwest::Client::builder()
  146. .connect_timeout(Duration::from_millis(500))
  147. .timeout(Duration::from_secs(5))
  148. .build();
  149. match result {
  150. Ok(client) => client,
  151. Err(e) => {
  152. log::error!("Create reqwest client failed: {}", e);
  153. reqwest::Client::new()
  154. },
  155. }
  156. }