From 1691e1adc26941b3089c850224c29f93f78c47b7 Mon Sep 17 00:00:00 2001 From: "Stephen M. Coakley" Date: Sat, 4 Sep 2021 14:56:45 -0500 Subject: [PATCH 1/3] Begin work on multipart forms API --- Cargo.toml | 7 +- src/forms/mod.rs | 7 ++ src/forms/multipart.rs | 278 +++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + 4 files changed, 292 insertions(+), 1 deletion(-) create mode 100644 src/forms/mod.rs create mode 100644 src/forms/multipart.rs diff --git a/Cargo.toml b/Cargo.toml index 03a6c4dd..74b36f5f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,10 +19,11 @@ features = ["cookies", "json"] status = "actively-developed" [features] -default = ["http2", "static-curl", "text-decoding"] +default = ["http2", "multipart", "static-curl", "text-decoding"] cookies = ["chrono"] http2 = ["curl/http2"] json = ["serde", "serde_json"] +multipart = ["fastrand"] psl = ["parking_lot", "publicsuffix"] spnego = ["curl-sys/spnego"] static-curl = ["curl/static-curl"] @@ -54,6 +55,10 @@ optional = true version = "0.8" optional = true +[dependencies.fastrand] +version = "1" +optional = true + [dependencies.mime] version = "0.3" optional = true diff --git a/src/forms/mod.rs b/src/forms/mod.rs new file mode 100644 index 00000000..618f9338 --- /dev/null +++ b/src/forms/mod.rs @@ -0,0 +1,7 @@ +//! Helpers for sending form bodies. + +#[cfg(feature = "multipart")] +mod multipart; + +#[cfg(feature = "multipart")] +pub use multipart::*; diff --git a/src/forms/multipart.rs b/src/forms/multipart.rs new file mode 100644 index 00000000..8bcfeea7 --- /dev/null +++ b/src/forms/multipart.rs @@ -0,0 +1,278 @@ +use std::{collections::VecDeque, convert::TryFrom, io, io::Write, iter::repeat_with, pin::Pin, task::{Context, Poll}}; + +use futures_lite::{ + io::{Chain, Cursor}, + ready, + AsyncRead, + AsyncReadExt, +}; +use http::header::{HeaderValue, HeaderName}; + +use crate::AsyncBody; + +type PartReader = Chain>, AsyncBody>, &'static [u8]>; + +/// Builder for constructing a multipart form. +/// +/// Generates a multipart form body as described in [RFC +/// 7578](https://datatracker.ietf.org/doc/html/rfc7578). +#[derive(Debug)] +pub struct FormDataBuilder { + boundary: String, + fields: Vec, +} + +impl FormDataBuilder { + /// Create a new form builder. + pub fn new() -> Self { + Self::with_boundary(generate_boundary()) + } + + /// Specify a boundary to use. A random one will be generated if one is not + /// specified. + fn with_boundary>(boundary: S) -> Self { + Self { + boundary: boundary.into(), + fields: Vec::new(), + } + } + + /// Append a field to this form with a given name and value. + /// + /// Duplicate fields with the same name are allowed and will be preserved in + /// the order they are added. + pub fn field(self, name: N, value: V) -> Self + where + N: Into, + V: Into, + { + self.part(FormPart::new(name, value)) + } + + /// Append a part to this form. + pub fn part(mut self, part: FormPart) -> Self { + self.fields.push(part); + self + } + + /// Build the form. + pub fn build(self) -> FormData { + let boundary = self.boundary; + + let parts = self + .fields + .into_iter() + .map(|field| field.into_writer(boundary.as_str())) + .collect::>(); + + let terminator = Cursor::new(format!("--{}--\r\n", &boundary).into_bytes()); + + // Try to compute the size of the body we will write. This can only be + // determined if all parts contain values that are also sized. + let len = parts + .iter() + .map(|part| { + Some( + part.get_ref().0.get_ref().1.len()? + + part.get_ref().0.get_ref().0.get_ref().len() as u64 + + part.get_ref().1.len() as u64, + ) + }) + .fold(Some(0), |a, b| Some(a? + b?)) + .map(|size| size + terminator.get_ref().len() as u64); + + FormData { + len, + parts, + terminator, + } + } +} + +/// A single part of a multipart form representing a single field. +#[derive(Debug)] +pub struct FormPart { + name: String, + filename: Option, + content_type: Option, + headers: Vec<(HeaderName, HeaderValue)>, + value: AsyncBody, + error: Option, +} + +impl FormPart { + /// Create a new form part with a name and value. + pub fn new(name: N, value: V) -> Self + where + N: Into, + V: Into, + { + FormPart { + name: name.into(), + filename: None, + content_type: None, + headers: Vec::new(), + value: value.into(), + error: None, + } + } + + /// Set the filename of this form part. + pub fn filename(mut self, filename: String) -> Self { + self.filename = Some(filename); + self + } + + /// Set the content type of this form part. + pub fn content_type(mut self, content_type: String) -> Self { + self.content_type = Some(content_type); + self + } + + /// Append a custom header to this form part. + pub fn header(mut self, name: K, value: V) -> Self + where + HeaderName: TryFrom, + >::Error: Into, + HeaderValue: TryFrom, + >::Error: Into, + { + if self.error.is_none() { + let name = >::try_from(name).map_err(Into::into).unwrap(); + let value = >::try_from(value).map_err(Into::into).unwrap(); + + self.headers.push((name, value)); + } + + self + } + + fn into_writer(self, boundary: &str) -> PartReader { + let mut header = Vec::new(); + + write!(header, "--{}\r\n", boundary).unwrap(); + write!( + header, + "Content-Disposition: form-data; name=\"{}\"", + &self.name + ) + .unwrap(); + + if let Some(filename) = self.filename.as_ref() { + write!(header, "; filename=\"{}\"", filename).unwrap(); + } + + header.extend_from_slice(b"\r\n"); + + if let Some(content_type) = self.content_type.as_ref() { + write!(header, "Content-Type: {}\r\n", content_type).unwrap(); + } + + for (name, value) in self.headers { + header.extend_from_slice(name.as_ref()); + header.extend_from_slice(b": "); + header.extend_from_slice(value.as_ref()); + header.extend_from_slice(b"\r\n"); + } + + header.extend_from_slice(b"\r\n"); + + Cursor::new(header).chain(self.value).chain(b"\r\n") + } +} + +/// A multipart form body. +#[derive(Debug)] +pub struct FormData { + len: Option, + parts: VecDeque, + terminator: Cursor>, +} + +impl From for AsyncBody { + fn from(form: FormData) -> Self { + if let Some(len) = form.len { + AsyncBody::from_reader_sized(form, len) + } else { + AsyncBody::from_reader(form) + } + } +} + +impl AsyncRead for FormData { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + while let Some(part) = self.parts.front_mut() { + match ready!(AsyncRead::poll_read(Pin::new(part), cx, buf)) { + Ok(0) => { + // This part has finished being read, discard it and move to + // the next one. + self.parts.pop_front(); + } + result => return Poll::Ready(result), + } + } + + AsyncRead::poll_read(Pin::new(&mut self.terminator), cx, buf) + } +} + +fn generate_boundary() -> String { + repeat_with(fastrand::alphanumeric).take(24).collect() +} + +#[cfg(test)] +mod tests { + use super::*; + use futures_lite::{future::block_on, io::AsyncReadExt}; + + #[test] + fn empty_form() { + let mut form: AsyncBody = FormDataBuilder::with_boundary("boundary").build().into(); + + let expected = "--boundary--\r\n"; + + assert_eq!(form.len(), Some(expected.len() as u64)); + + let contents = block_on(async { + let mut buf = String::new(); + form.read_to_string(&mut buf).await.unwrap(); + buf + }); + + assert_eq!(contents, expected); + } + + #[test] + fn sized_form() { + let mut form: AsyncBody = FormDataBuilder::with_boundary("boundary") + .field("foo", "value1") + .field("bar", "value2") + .build() + .into(); + + let expected = "\ + --boundary\r\n\ + Content-Disposition: form-data; name=\"foo\"\r\n\ + \r\n\ + value1\r\n\ + --boundary\r\n\ + Content-Disposition: form-data; name=\"bar\"\r\n\ + \r\n\ + value2\r\n\ + --boundary--\r\n\ + "; + + let contents = block_on(async { + let mut buf = String::new(); + form.read_to_string(&mut buf).await.unwrap(); + buf + }); + + assert_eq!(contents, expected); + assert_eq!(form.len(), Some(expected.len() as u64)); + } +} diff --git a/src/lib.rs b/src/lib.rs index 236815b3..ba3d53e3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -259,6 +259,7 @@ mod trailer; pub mod auth; pub mod config; pub mod error; +pub mod forms; #[cfg(feature = "unstable-interceptors")] pub mod interceptor; From 18abad1447558e5fe24ffd270a158bf842e5c0c5 Mon Sep 17 00:00:00 2001 From: "Stephen M. Coakley" Date: Mon, 14 Feb 2022 22:02:38 -0600 Subject: [PATCH 2/3] Automatic content-type discovery --- examples/multipart_form.rs | 12 +++ src/body/mod.rs | 87 ++++++++++++------ src/body/sync.rs | 79 +++++++++++----- src/client.rs | 7 ++ src/forms/multipart.rs | 181 ++++++++++++++++++++++++++++--------- 5 files changed, 270 insertions(+), 96 deletions(-) create mode 100644 examples/multipart_form.rs diff --git a/examples/multipart_form.rs b/examples/multipart_form.rs new file mode 100644 index 00000000..0eaa8156 --- /dev/null +++ b/examples/multipart_form.rs @@ -0,0 +1,12 @@ +use isahc::{forms::FormDataBuilder, prelude::*, Body}; + +fn main() -> Result<(), isahc::Error> { + let form = FormDataBuilder::::new().field("foo", "bar").build(); + + let mut response = isahc::post("https://httpbin.org/post", form)?; + + println!("{:?}", response); + print!("{}", response.text()?); + + Ok(()) +} diff --git a/src/body/mod.rs b/src/body/mod.rs index 58306213..3c55b996 100644 --- a/src/body/mod.rs +++ b/src/body/mod.rs @@ -1,6 +1,7 @@ //! Provides types for working with request and response bodies. use futures_lite::io::{AsyncRead, BlockOn}; +use http::HeaderValue; use std::{ borrow::Cow, fmt, @@ -28,10 +29,13 @@ pub use sync::Body; /// implements. /// /// For synchronous requests, use [`Body`] instead. -pub struct AsyncBody(Inner); +pub struct AsyncBody { + pub(crate) content_type: Option, + repr: Repr, +} /// All possible body implementations. -enum Inner { +enum Repr { /// An empty body. Empty, @@ -48,7 +52,10 @@ impl AsyncBody { /// An empty body represents the *absence* of a body, which is semantically /// different than the presence of a body of zero length. pub const fn empty() -> Self { - Self(Inner::Empty) + Self { + content_type: None, + repr: Repr::Empty, + } } /// Create a new body from a potentially static byte buffer. @@ -76,7 +83,10 @@ impl AsyncBody { B: AsRef<[u8]> + 'static, { castaway::match_type!(bytes, { - Cursor> as bytes => Self(Inner::Buffer(bytes)), + Cursor> as bytes => Self { + content_type: None, + repr: Repr::Buffer(bytes), + }, &'static [u8] as bytes => Self::from_static_impl(bytes), &'static str as bytes => Self::from_static_impl(bytes.as_bytes()), Vec as bytes => Self::from(bytes), @@ -87,7 +97,10 @@ impl AsyncBody { #[inline] fn from_static_impl(bytes: &'static [u8]) -> Self { - Self(Inner::Buffer(Cursor::new(Cow::Borrowed(bytes)))) + Self { + content_type: None, + repr: Repr::Buffer(Cursor::new(Cow::Borrowed(bytes))), + } } /// Create a streaming body that reads from the given reader. @@ -100,7 +113,10 @@ impl AsyncBody { where R: AsyncRead + Send + Sync + 'static, { - Self(Inner::Reader(Box::pin(read), None)) + Self { + content_type: None, + repr: Repr::Reader(Box::pin(read), None), + } } /// Create a streaming body with a known length. @@ -116,7 +132,10 @@ impl AsyncBody { where R: AsyncRead + Send + Sync + 'static, { - Self(Inner::Reader(Box::pin(read), Some(length))) + Self { + content_type: None, + repr: Repr::Reader(Box::pin(read), Some(length)), + } } /// Report if this body is empty. @@ -126,8 +145,8 @@ impl AsyncBody { /// difference between the absence of a body and the presence of a /// zero-length body. This method will only return `true` for the former. pub fn is_empty(&self) -> bool { - match self.0 { - Inner::Empty => true, + match self.repr { + Repr::Empty => true, _ => false, } } @@ -146,23 +165,28 @@ impl AsyncBody { /// bytes, even if a value is returned it should not be relied on as always /// being accurate, and should be treated as a "hint". pub fn len(&self) -> Option { - match &self.0 { - Inner::Empty => Some(0), - Inner::Buffer(bytes) => Some(bytes.get_ref().len() as u64), - Inner::Reader(_, len) => *len, + match &self.repr { + Repr::Empty => Some(0), + Repr::Buffer(bytes) => Some(bytes.get_ref().len() as u64), + Repr::Reader(_, len) => *len, } } + /// Get the content type of this body, if any. + pub fn content_type(&self) -> Option<&HeaderValue> { + self.content_type.as_ref() + } + /// If this body is repeatable, reset the body stream back to the start of /// the content. Returns `false` if the body cannot be reset. pub fn reset(&mut self) -> bool { - match &mut self.0 { - Inner::Empty => true, - Inner::Buffer(cursor) => { + match &mut self.repr { + Repr::Empty => true, + Repr::Buffer(cursor) => { cursor.set_position(0); true } - Inner::Reader(_, _) => false, + Repr::Reader(_, _) => false, } } @@ -174,14 +198,18 @@ impl AsyncBody { /// generally if the underlying reader only supports blocking under a /// specific runtime. pub(crate) fn into_sync(self) -> sync::Body { - match self.0 { - Inner::Empty => sync::Body::empty(), - Inner::Buffer(cursor) => sync::Body::from_bytes_static(cursor.into_inner()), - Inner::Reader(reader, Some(len)) => { + let mut b = match self.repr { + Repr::Empty => sync::Body::empty(), + Repr::Buffer(cursor) => sync::Body::from_bytes_static(cursor.into_inner()), + Repr::Reader(reader, Some(len)) => { sync::Body::from_reader_sized(BlockOn::new(reader), len) } - Inner::Reader(reader, None) => sync::Body::from_reader(BlockOn::new(reader)), - } + Repr::Reader(reader, None) => sync::Body::from_reader(BlockOn::new(reader)), + }; + + b.content_type = self.content_type; + + b } } @@ -191,10 +219,10 @@ impl AsyncRead for AsyncBody { cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - match &mut self.0 { - Inner::Empty => Poll::Ready(Ok(0)), - Inner::Buffer(cursor) => Poll::Ready(cursor.read(buf)), - Inner::Reader(read, _) => AsyncRead::poll_read(read.as_mut(), cx, buf), + match &mut self.repr { + Repr::Empty => Poll::Ready(Ok(0)), + Repr::Buffer(cursor) => Poll::Ready(cursor.read(buf)), + Repr::Reader(read, _) => AsyncRead::poll_read(read.as_mut(), cx, buf), } } } @@ -213,7 +241,10 @@ impl From<()> for AsyncBody { impl From> for AsyncBody { fn from(body: Vec) -> Self { - Self(Inner::Buffer(Cursor::new(Cow::Owned(body)))) + Self { + content_type: None, + repr: Repr::Buffer(Cursor::new(Cow::Owned(body))), + } } } diff --git a/src/body/sync.rs b/src/body/sync.rs index e035deee..06119399 100644 --- a/src/body/sync.rs +++ b/src/body/sync.rs @@ -1,5 +1,6 @@ use super::AsyncBody; use futures_lite::{future::yield_now, io::AsyncWriteExt}; +use http::HeaderValue; use sluice::pipe::{pipe, PipeWriter}; use std::{ borrow::Cow, @@ -17,9 +18,12 @@ use std::{ /// implements [`Read`], which [`Body`] itself also implements. /// /// For asynchronous requests, use [`AsyncBody`] instead. -pub struct Body(Inner); +pub struct Body { + pub(crate) content_type: Option, + repr: Repr, +} -enum Inner { +enum Repr { Empty, Buffer(Cursor>), Reader(Box, Option), @@ -31,7 +35,10 @@ impl Body { /// An empty body represents the *absence* of a body, which is semantically /// different than the presence of a body of zero length. pub const fn empty() -> Self { - Self(Inner::Empty) + Self { + content_type: None, + repr: Repr::Empty, + } } /// Create a new body from a potentially static byte buffer. @@ -59,7 +66,10 @@ impl Body { B: AsRef<[u8]> + 'static, { castaway::match_type!(bytes, { - Cursor> as bytes => Self(Inner::Buffer(bytes)), + Cursor> as bytes => Self { + content_type: None, + repr: Repr::Buffer(bytes), + }, Vec as bytes => Self::from(bytes), String as bytes => Self::from(bytes.into_bytes()), bytes => Self::from(bytes.as_ref().to_vec()), @@ -76,7 +86,10 @@ impl Body { where R: Read + Send + Sync + 'static, { - Self(Inner::Reader(Box::new(reader), None)) + Self { + content_type: None, + repr: Repr::Reader(Box::new(reader), None), + } } /// Create a streaming body with a known length. @@ -92,7 +105,11 @@ impl Body { where R: Read + Send + Sync + 'static, { - Self(Inner::Reader(Box::new(reader), Some(length))) + + Self { + content_type: None, + repr: Repr::Reader(Box::new(reader), Some(length)), + } } /// Report if this body is empty. @@ -102,8 +119,8 @@ impl Body { /// difference between the absence of a body and the presence of a /// zero-length body. This method will only return `true` for the former. pub fn is_empty(&self) -> bool { - match self.0 { - Inner::Empty => true, + match self.repr { + Repr::Empty => true, _ => false, } } @@ -122,19 +139,24 @@ impl Body { /// bytes, even if a value is returned it should not be relied on as always /// being accurate, and should be treated as a "hint". pub fn len(&self) -> Option { - match &self.0 { - Inner::Empty => Some(0), - Inner::Buffer(bytes) => Some(bytes.get_ref().len() as u64), - Inner::Reader(_, len) => *len, + match &self.repr { + Repr::Empty => Some(0), + Repr::Buffer(bytes) => Some(bytes.get_ref().len() as u64), + Repr::Reader(_, len) => *len, } } + /// Get the content type of this body, if any. + pub fn content_type(&self) -> Option<&HeaderValue> { + self.content_type.as_ref() + } + /// If this body is repeatable, reset the body stream back to the start of /// the content. Returns `false` if the body cannot be reset. pub fn reset(&mut self) -> bool { - match &mut self.0 { - Inner::Empty => true, - Inner::Buffer(cursor) => { + match &mut self.repr { + Repr::Empty => true, + Repr::Buffer(cursor) => { cursor.set_position(0); true } @@ -154,10 +176,10 @@ impl Body { /// copy the bytes from the reader to the writing half of the pipe in a /// blocking fashion. pub(crate) fn into_async(self) -> (AsyncBody, Option) { - match self.0 { - Inner::Empty => (AsyncBody::empty(), None), - Inner::Buffer(cursor) => (AsyncBody::from_bytes_static(cursor.into_inner()), None), - Inner::Reader(reader, len) => { + let mut b = match self.repr { + Repr::Empty => (AsyncBody::empty(), None), + Repr::Buffer(cursor) => (AsyncBody::from_bytes_static(cursor.into_inner()), None), + Repr::Reader(reader, len) => { let (pipe_reader, writer) = pipe(); ( @@ -172,16 +194,20 @@ impl Body { }), ) } - } + }; + + b.0.content_type = self.content_type; + + b } } impl Read for Body { fn read(&mut self, buf: &mut [u8]) -> Result { - match &mut self.0 { - Inner::Empty => Ok(0), - Inner::Buffer(cursor) => cursor.read(buf), - Inner::Reader(reader, _) => reader.read(buf), + match &mut self.repr { + Repr::Empty => Ok(0), + Repr::Buffer(cursor) => cursor.read(buf), + Repr::Reader(reader, _) => reader.read(buf), } } } @@ -200,7 +226,10 @@ impl From<()> for Body { impl From> for Body { fn from(body: Vec) -> Self { - Self(Inner::Buffer(Cursor::new(Cow::Owned(body)))) + Self { + content_type: None, + repr: Repr::Buffer(Cursor::new(Cow::Owned(body))), + } } } diff --git a/src/client.rs b/src/client.rs index 835b1bfb..d44a1eb2 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1175,6 +1175,13 @@ impl crate::interceptor::Invoke for &HttpClient { .entry(http::header::USER_AGENT) .or_insert(USER_AGENT.parse().unwrap()); + // Copy Content-Type from body if not otherwise specified. + if !request.headers().contains_key(http::header::CONTENT_TYPE) { + if let Some(content_type) = request.body().content_type().cloned() { + request.headers_mut().insert(http::header::CONTENT_TYPE, content_type); + } + } + // Check if automatic decompression is enabled; we'll need to know // this later after the response is sent. let is_automatic_decompression = request diff --git a/src/forms/multipart.rs b/src/forms/multipart.rs index 8bcfeea7..4a8526af 100644 --- a/src/forms/multipart.rs +++ b/src/forms/multipart.rs @@ -1,4 +1,4 @@ -use std::{collections::VecDeque, convert::TryFrom, io, io::Write, iter::repeat_with, pin::Pin, task::{Context, Poll}}; +use std::{collections::VecDeque, convert::TryFrom, io, io::{Write, Read}, iter::repeat_with, pin::Pin, task::{Context, Poll}}; use futures_lite::{ io::{Chain, Cursor}, @@ -8,21 +8,19 @@ use futures_lite::{ }; use http::header::{HeaderValue, HeaderName}; -use crate::AsyncBody; - -type PartReader = Chain>, AsyncBody>, &'static [u8]>; +use crate::body::{Body, AsyncBody}; /// Builder for constructing a multipart form. /// /// Generates a multipart form body as described in [RFC /// 7578](https://datatracker.ietf.org/doc/html/rfc7578). #[derive(Debug)] -pub struct FormDataBuilder { +pub struct FormDataBuilder { boundary: String, - fields: Vec, + fields: Vec>, } -impl FormDataBuilder { +impl FormDataBuilder { /// Create a new form builder. pub fn new() -> Self { Self::with_boundary(generate_boundary()) @@ -44,19 +42,63 @@ impl FormDataBuilder { pub fn field(self, name: N, value: V) -> Self where N: Into, - V: Into, + V: Into, { self.part(FormPart::new(name, value)) } /// Append a part to this form. - pub fn part(mut self, part: FormPart) -> Self { + pub fn part(mut self, part: FormPart) -> Self { self.fields.push(part); self } +} + +impl FormDataBuilder { + /// Build the form. + pub fn build(self) -> Body { + let boundary = self.boundary; + + let mut parts = VecDeque::with_capacity(self.fields.len()); + let mut len = Some(boundary.len() as u64); + + for part in self.fields { + let (read, part_size) = part.into_read(boundary.as_str()); + parts.push_back(read); + + if let (Some(a), Some(b)) = (len, part_size) { + len = Some(a + b); + } + else { + len = None; + } + } + + let terminator = std::io::Cursor::new(format!("--{}--\r\n", &boundary).into_bytes()); + + len = len.map(|size| size + terminator.get_ref().len() as u64); + + let parts = MultiChain { + items: parts, + }; + + let full_reader = parts.chain(terminator); + + let mut body = if let Some(len) = len { + Body::from_reader_sized(full_reader, len) + } else { + Body::from_reader(full_reader) + }; + + body.content_type = Some(format!("multipart/form-data; boundary={}", boundary).parse().unwrap()); + + body + } +} +impl FormDataBuilder { /// Build the form. - pub fn build(self) -> FormData { + pub fn build(self) -> AsyncBody { let boundary = self.boundary; let parts = self @@ -81,36 +123,46 @@ impl FormDataBuilder { .fold(Some(0), |a, b| Some(a? + b?)) .map(|size| size + terminator.get_ref().len() as u64); - FormData { - len, - parts, - terminator, - } + let parts = MultiChain { + items: parts, + }; + + let full_reader = parts.chain(terminator); + + let mut body = if let Some(len) = len { + AsyncBody::from_reader_sized(full_reader, len) + } else { + AsyncBody::from_reader(full_reader) + }; + + body.content_type = Some(format!("multipart/form-data; boundary={}", boundary).parse().unwrap()); + + body } } /// A single part of a multipart form representing a single field. #[derive(Debug)] -pub struct FormPart { +pub struct FormPart { name: String, filename: Option, content_type: Option, headers: Vec<(HeaderName, HeaderValue)>, - value: AsyncBody, + value: BODY, error: Option, } -impl FormPart { +impl FormPart { /// Create a new form part with a name and value. pub fn new(name: N, value: V) -> Self where N: Into, - V: Into, + V: Into, { FormPart { name: name.into(), filename: None, - content_type: None, + content_type: Some(String::from("text/plain;charset=UTF-8")), headers: Vec::new(), value: value.into(), error: None, @@ -146,8 +198,48 @@ impl FormPart { self } +} + +impl FormPart { + fn into_read(self, boundary: &str) -> (impl Read, Option) { + let mut header = Vec::new(); + + write!(header, "--{}\r\n", boundary).unwrap(); + write!( + header, + "Content-Disposition: form-data; name=\"{}\"", + &self.name + ) + .unwrap(); + + if let Some(filename) = self.filename.as_ref() { + write!(header, "; filename=\"{}\"", filename).unwrap(); + } + + header.extend_from_slice(b"\r\n"); + + if let Some(content_type) = self.content_type.as_ref() { + write!(header, "Content-Type: {}\r\n", content_type).unwrap(); + } + + for (name, value) in self.headers { + header.extend_from_slice(name.as_ref()); + header.extend_from_slice(b": "); + header.extend_from_slice(value.as_ref()); + header.extend_from_slice(b"\r\n"); + } + + header.extend_from_slice(b"\r\n"); + + let reader = std::io::Cursor::new(header).chain(self.value).chain(std::io::Cursor::new(b"\r\n")); + + (reader, None) + } +} - fn into_writer(self, boundary: &str) -> PartReader { +impl FormPart { + // Chain>, AsyncBody>, &'static [u8]> + fn into_writer(self, boundary: &str) -> Chain>, AsyncBody>, &'static [u8]> { let mut header = Vec::new(); write!(header, "--{}\r\n", boundary).unwrap(); @@ -181,42 +273,46 @@ impl FormPart { } } -/// A multipart form body. -#[derive(Debug)] -pub struct FormData { - len: Option, - parts: VecDeque, - terminator: Cursor>, +/// A chained reader which can chain multiple readers of the same type together. +struct MultiChain { + items: VecDeque, } -impl From for AsyncBody { - fn from(form: FormData) -> Self { - if let Some(len) = form.len { - AsyncBody::from_reader_sized(form, len) - } else { - AsyncBody::from_reader(form) +impl Read for MultiChain { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + while let Some(item) = self.items.front_mut() { + match item.read(buf) { + Ok(0) => { + // This item has finished being read, discard it and move to + // the next one. + self.items.pop_front(); + } + result => return result, + } } + + Ok(0) } } -impl AsyncRead for FormData { +impl AsyncRead for MultiChain { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - while let Some(part) = self.parts.front_mut() { - match ready!(AsyncRead::poll_read(Pin::new(part), cx, buf)) { + while let Some(item) = self.items.front_mut() { + match ready!(AsyncRead::poll_read(Pin::new(item), cx, buf)) { Ok(0) => { - // This part has finished being read, discard it and move to + // This item has finished being read, discard it and move to // the next one. - self.parts.pop_front(); + self.items.pop_front(); } result => return Poll::Ready(result), } } - AsyncRead::poll_read(Pin::new(&mut self.terminator), cx, buf) + Poll::Ready(Ok(0)) } } @@ -231,7 +327,7 @@ mod tests { #[test] fn empty_form() { - let mut form: AsyncBody = FormDataBuilder::with_boundary("boundary").build().into(); + let mut form: AsyncBody = FormDataBuilder::::with_boundary("boundary").build(); let expected = "--boundary--\r\n"; @@ -248,11 +344,10 @@ mod tests { #[test] fn sized_form() { - let mut form: AsyncBody = FormDataBuilder::with_boundary("boundary") + let mut form: AsyncBody = FormDataBuilder::::with_boundary("boundary") .field("foo", "value1") .field("bar", "value2") - .build() - .into(); + .build(); let expected = "\ --boundary\r\n\ From 0c923ae73004208639ff49763398de19334066dc Mon Sep 17 00:00:00 2001 From: "Stephen M. Coakley" Date: Fri, 25 Feb 2022 22:07:12 -0600 Subject: [PATCH 3/3] Keep content type private for now --- src/body/mod.rs | 17 +++++++++-------- src/body/sync.rs | 15 +++++++++------ src/forms/multipart.rs | 4 ++-- 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/src/body/mod.rs b/src/body/mod.rs index 3c55b996..378a7c29 100644 --- a/src/body/mod.rs +++ b/src/body/mod.rs @@ -30,7 +30,7 @@ pub use sync::Body; /// /// For synchronous requests, use [`Body`] instead. pub struct AsyncBody { - pub(crate) content_type: Option, + content_type: Option, repr: Repr, } @@ -138,6 +138,11 @@ impl AsyncBody { } } + pub(crate) fn with_content_type(mut self, content_type: Option) -> Self { + self.content_type = content_type; + self + } + /// Report if this body is empty. /// /// This is not necessarily the same as checking for `self.len() == @@ -173,7 +178,7 @@ impl AsyncBody { } /// Get the content type of this body, if any. - pub fn content_type(&self) -> Option<&HeaderValue> { + pub(crate) fn content_type(&self) -> Option<&HeaderValue> { self.content_type.as_ref() } @@ -198,18 +203,14 @@ impl AsyncBody { /// generally if the underlying reader only supports blocking under a /// specific runtime. pub(crate) fn into_sync(self) -> sync::Body { - let mut b = match self.repr { + match self.repr { Repr::Empty => sync::Body::empty(), Repr::Buffer(cursor) => sync::Body::from_bytes_static(cursor.into_inner()), Repr::Reader(reader, Some(len)) => { sync::Body::from_reader_sized(BlockOn::new(reader), len) } Repr::Reader(reader, None) => sync::Body::from_reader(BlockOn::new(reader)), - }; - - b.content_type = self.content_type; - - b + }.with_content_type(self.content_type) } } diff --git a/src/body/sync.rs b/src/body/sync.rs index 06119399..bb60c633 100644 --- a/src/body/sync.rs +++ b/src/body/sync.rs @@ -19,7 +19,7 @@ use std::{ /// /// For asynchronous requests, use [`AsyncBody`] instead. pub struct Body { - pub(crate) content_type: Option, + content_type: Option, repr: Repr, } @@ -112,6 +112,11 @@ impl Body { } } + pub(crate) fn with_content_type(mut self, content_type: Option) -> Self { + self.content_type = content_type; + self + } + /// Report if this body is empty. /// /// This is not necessarily the same as checking for `self.len() == @@ -147,7 +152,7 @@ impl Body { } /// Get the content type of this body, if any. - pub fn content_type(&self) -> Option<&HeaderValue> { + pub(crate) fn content_type(&self) -> Option<&HeaderValue> { self.content_type.as_ref() } @@ -176,7 +181,7 @@ impl Body { /// copy the bytes from the reader to the writing half of the pipe in a /// blocking fashion. pub(crate) fn into_async(self) -> (AsyncBody, Option) { - let mut b = match self.repr { + let (body, writer) = match self.repr { Repr::Empty => (AsyncBody::empty(), None), Repr::Buffer(cursor) => (AsyncBody::from_bytes_static(cursor.into_inner()), None), Repr::Reader(reader, len) => { @@ -196,9 +201,7 @@ impl Body { } }; - b.0.content_type = self.content_type; - - b + (body.with_content_type(self.content_type), writer) } } diff --git a/src/forms/multipart.rs b/src/forms/multipart.rs index 4a8526af..7eda94a3 100644 --- a/src/forms/multipart.rs +++ b/src/forms/multipart.rs @@ -90,7 +90,7 @@ impl FormDataBuilder { Body::from_reader(full_reader) }; - body.content_type = Some(format!("multipart/form-data; boundary={}", boundary).parse().unwrap()); + body = body.with_content_type(Some(format!("multipart/form-data; boundary={}", boundary).parse().unwrap())); body } @@ -135,7 +135,7 @@ impl FormDataBuilder { AsyncBody::from_reader(full_reader) }; - body.content_type = Some(format!("multipart/form-data; boundary={}", boundary).parse().unwrap()); + body = body.with_content_type(Some(format!("multipart/form-data; boundary={}", boundary).parse().unwrap())); body }