utils.rs 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. use crate::config::MAX_PAYLOAD_SIZE;
  2. use actix_web::web;
  3. use flowy_net::errors::{ErrorCode, ServerError};
  4. use futures::StreamExt;
  5. use protobuf::{Message, ProtobufResult};
  6. pub async fn parse_from_payload<T: Message>(payload: web::Payload) -> Result<T, ServerError> {
  7. let bytes = poll_payload(&mut payload.into_inner()).await?;
  8. parse_from_bytes(&bytes)
  9. }
  10. #[allow(dead_code)]
  11. pub async fn parse_from_dev_payload<T: Message>(
  12. payload: &mut actix_web::dev::Payload,
  13. ) -> Result<T, ServerError> {
  14. let bytes = poll_payload(payload).await?;
  15. parse_from_bytes(&bytes)
  16. }
  17. pub fn parse_from_bytes<T: Message>(bytes: &[u8]) -> Result<T, ServerError> {
  18. let result: ProtobufResult<T> = Message::parse_from_bytes(&bytes);
  19. match result {
  20. Ok(data) => Ok(data),
  21. Err(e) => Err(e.into()),
  22. }
  23. }
  24. pub async fn poll_payload(
  25. payload: &mut actix_web::dev::Payload,
  26. ) -> Result<web::BytesMut, ServerError> {
  27. let mut body = web::BytesMut::new();
  28. while let Some(chunk) = payload.next().await {
  29. let chunk = chunk.map_err(|err| ServerError::internal().context(err))?;
  30. if (body.len() + chunk.len()) > MAX_PAYLOAD_SIZE {
  31. return Err(ServerError::new(
  32. "Payload overflow".to_string(),
  33. ErrorCode::PayloadOverflow,
  34. ));
  35. }
  36. body.extend_from_slice(&chunk);
  37. }
  38. Ok(body)
  39. }