util.rs 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. use crate::config::MAX_PAYLOAD_SIZE;
  2. use actix_web::web;
  3. use backend_service::errors::{ErrorCode, ServerError};
  4. use futures::StreamExt;
  5. use protobuf::{Message, ProtobufResult};
  6. pub async fn parse_from_payload<T: Message>(payload: web::Payload) -> Result<T, ServerError> {
  7. let bytes = poll_payload(&mut payload.into_inner()).await?;
  8. parse_from_bytes(&bytes)
  9. }
  10. #[allow(dead_code)]
  11. pub async fn parse_from_dev_payload<T: Message>(payload: &mut actix_web::dev::Payload) -> Result<T, ServerError> {
  12. let bytes = poll_payload(payload).await?;
  13. parse_from_bytes(&bytes)
  14. }
  15. #[inline]
  16. pub fn md5<T: AsRef<[u8]>>(data: T) -> String {
  17. let md5 = format!("{:x}", md5::compute(data));
  18. md5
  19. }
  20. pub fn parse_from_bytes<T: Message>(bytes: &[u8]) -> Result<T, ServerError> {
  21. let result: ProtobufResult<T> = Message::parse_from_bytes(&bytes);
  22. match result {
  23. Ok(data) => Ok(data),
  24. Err(e) => Err(e.into()),
  25. }
  26. }
  27. pub async fn poll_payload(payload: &mut actix_web::dev::Payload) -> Result<web::BytesMut, ServerError> {
  28. let mut body = web::BytesMut::new();
  29. while let Some(chunk) = payload.next().await {
  30. let chunk = chunk.map_err(|err| ServerError::internal().context(err))?;
  31. if (body.len() + chunk.len()) > MAX_PAYLOAD_SIZE {
  32. return Err(ServerError::new(
  33. "Payload overflow".to_string(),
  34. ErrorCode::PayloadOverflow,
  35. ));
  36. }
  37. body.extend_from_slice(&chunk);
  38. }
  39. Ok(body)
  40. }