diff --git a/Cargo.toml b/Cargo.toml index 7a09d9ee..d2b82e38 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ default = ["http2", "static-curl", "text-decoding"] cookies = ["httpdate"] http2 = ["curl/http2"] json = ["serde", "serde_json"] +multipart = ["fastrand"] nightly = [] psl = ["httpdate", "parking_lot", "publicsuffix"] spnego = ["curl-sys/spnego"] @@ -53,6 +54,10 @@ waker-fn = "1" version = "0.8" optional = true +[dependencies.fastrand] +version = "1" +optional = true + [dependencies.httpdate] version = "1" optional = true diff --git a/examples/multipart_form.rs b/examples/multipart_form.rs new file mode 100644 index 00000000..0eaa8156 --- /dev/null +++ b/examples/multipart_form.rs @@ -0,0 +1,12 @@ +use isahc::{forms::FormDataBuilder, prelude::*, Body}; + +fn main() -> Result<(), isahc::Error> { + let form = FormDataBuilder::::new().field("foo", "bar").build(); + + let mut response = isahc::post("https://httpbin.org/post", form)?; + + println!("{:?}", response); + print!("{}", response.text()?); + + Ok(()) +} diff --git a/src/body/mod.rs b/src/body/mod.rs index 58306213..378a7c29 100644 --- a/src/body/mod.rs +++ b/src/body/mod.rs @@ -1,6 +1,7 @@ //! Provides types for working with request and response bodies. use futures_lite::io::{AsyncRead, BlockOn}; +use http::HeaderValue; use std::{ borrow::Cow, fmt, @@ -28,10 +29,13 @@ pub use sync::Body; /// implements. /// /// For synchronous requests, use [`Body`] instead. -pub struct AsyncBody(Inner); +pub struct AsyncBody { + content_type: Option, + repr: Repr, +} /// All possible body implementations. -enum Inner { +enum Repr { /// An empty body. Empty, @@ -48,7 +52,10 @@ impl AsyncBody { /// An empty body represents the *absence* of a body, which is semantically /// different than the presence of a body of zero length. pub const fn empty() -> Self { - Self(Inner::Empty) + Self { + content_type: None, + repr: Repr::Empty, + } } /// Create a new body from a potentially static byte buffer. @@ -76,7 +83,10 @@ impl AsyncBody { B: AsRef<[u8]> + 'static, { castaway::match_type!(bytes, { - Cursor> as bytes => Self(Inner::Buffer(bytes)), + Cursor> as bytes => Self { + content_type: None, + repr: Repr::Buffer(bytes), + }, &'static [u8] as bytes => Self::from_static_impl(bytes), &'static str as bytes => Self::from_static_impl(bytes.as_bytes()), Vec as bytes => Self::from(bytes), @@ -87,7 +97,10 @@ impl AsyncBody { #[inline] fn from_static_impl(bytes: &'static [u8]) -> Self { - Self(Inner::Buffer(Cursor::new(Cow::Borrowed(bytes)))) + Self { + content_type: None, + repr: Repr::Buffer(Cursor::new(Cow::Borrowed(bytes))), + } } /// Create a streaming body that reads from the given reader. @@ -100,7 +113,10 @@ impl AsyncBody { where R: AsyncRead + Send + Sync + 'static, { - Self(Inner::Reader(Box::pin(read), None)) + Self { + content_type: None, + repr: Repr::Reader(Box::pin(read), None), + } } /// Create a streaming body with a known length. @@ -116,7 +132,15 @@ impl AsyncBody { where R: AsyncRead + Send + Sync + 'static, { - Self(Inner::Reader(Box::pin(read), Some(length))) + Self { + content_type: None, + repr: Repr::Reader(Box::pin(read), Some(length)), + } + } + + pub(crate) fn with_content_type(mut self, content_type: Option) -> Self { + self.content_type = content_type; + self } /// Report if this body is empty. @@ -126,8 +150,8 @@ impl AsyncBody { /// difference between the absence of a body and the presence of a /// zero-length body. This method will only return `true` for the former. pub fn is_empty(&self) -> bool { - match self.0 { - Inner::Empty => true, + match self.repr { + Repr::Empty => true, _ => false, } } @@ -146,23 +170,28 @@ impl AsyncBody { /// bytes, even if a value is returned it should not be relied on as always /// being accurate, and should be treated as a "hint". pub fn len(&self) -> Option { - match &self.0 { - Inner::Empty => Some(0), - Inner::Buffer(bytes) => Some(bytes.get_ref().len() as u64), - Inner::Reader(_, len) => *len, + match &self.repr { + Repr::Empty => Some(0), + Repr::Buffer(bytes) => Some(bytes.get_ref().len() as u64), + Repr::Reader(_, len) => *len, } } + /// Get the content type of this body, if any. + pub(crate) fn content_type(&self) -> Option<&HeaderValue> { + self.content_type.as_ref() + } + /// If this body is repeatable, reset the body stream back to the start of /// the content. Returns `false` if the body cannot be reset. pub fn reset(&mut self) -> bool { - match &mut self.0 { - Inner::Empty => true, - Inner::Buffer(cursor) => { + match &mut self.repr { + Repr::Empty => true, + Repr::Buffer(cursor) => { cursor.set_position(0); true } - Inner::Reader(_, _) => false, + Repr::Reader(_, _) => false, } } @@ -174,14 +203,14 @@ impl AsyncBody { /// generally if the underlying reader only supports blocking under a /// specific runtime. pub(crate) fn into_sync(self) -> sync::Body { - match self.0 { - Inner::Empty => sync::Body::empty(), - Inner::Buffer(cursor) => sync::Body::from_bytes_static(cursor.into_inner()), - Inner::Reader(reader, Some(len)) => { + match self.repr { + Repr::Empty => sync::Body::empty(), + Repr::Buffer(cursor) => sync::Body::from_bytes_static(cursor.into_inner()), + Repr::Reader(reader, Some(len)) => { sync::Body::from_reader_sized(BlockOn::new(reader), len) } - Inner::Reader(reader, None) => sync::Body::from_reader(BlockOn::new(reader)), - } + Repr::Reader(reader, None) => sync::Body::from_reader(BlockOn::new(reader)), + }.with_content_type(self.content_type) } } @@ -191,10 +220,10 @@ impl AsyncRead for AsyncBody { cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - match &mut self.0 { - Inner::Empty => Poll::Ready(Ok(0)), - Inner::Buffer(cursor) => Poll::Ready(cursor.read(buf)), - Inner::Reader(read, _) => AsyncRead::poll_read(read.as_mut(), cx, buf), + match &mut self.repr { + Repr::Empty => Poll::Ready(Ok(0)), + Repr::Buffer(cursor) => Poll::Ready(cursor.read(buf)), + Repr::Reader(read, _) => AsyncRead::poll_read(read.as_mut(), cx, buf), } } } @@ -213,7 +242,10 @@ impl From<()> for AsyncBody { impl From> for AsyncBody { fn from(body: Vec) -> Self { - Self(Inner::Buffer(Cursor::new(Cow::Owned(body)))) + Self { + content_type: None, + repr: Repr::Buffer(Cursor::new(Cow::Owned(body))), + } } } diff --git a/src/body/sync.rs b/src/body/sync.rs index e035deee..bb60c633 100644 --- a/src/body/sync.rs +++ b/src/body/sync.rs @@ -1,5 +1,6 @@ use super::AsyncBody; use futures_lite::{future::yield_now, io::AsyncWriteExt}; +use http::HeaderValue; use sluice::pipe::{pipe, PipeWriter}; use std::{ borrow::Cow, @@ -17,9 +18,12 @@ use std::{ /// implements [`Read`], which [`Body`] itself also implements. /// /// For asynchronous requests, use [`AsyncBody`] instead. -pub struct Body(Inner); +pub struct Body { + content_type: Option, + repr: Repr, +} -enum Inner { +enum Repr { Empty, Buffer(Cursor>), Reader(Box, Option), @@ -31,7 +35,10 @@ impl Body { /// An empty body represents the *absence* of a body, which is semantically /// different than the presence of a body of zero length. pub const fn empty() -> Self { - Self(Inner::Empty) + Self { + content_type: None, + repr: Repr::Empty, + } } /// Create a new body from a potentially static byte buffer. @@ -59,7 +66,10 @@ impl Body { B: AsRef<[u8]> + 'static, { castaway::match_type!(bytes, { - Cursor> as bytes => Self(Inner::Buffer(bytes)), + Cursor> as bytes => Self { + content_type: None, + repr: Repr::Buffer(bytes), + }, Vec as bytes => Self::from(bytes), String as bytes => Self::from(bytes.into_bytes()), bytes => Self::from(bytes.as_ref().to_vec()), @@ -76,7 +86,10 @@ impl Body { where R: Read + Send + Sync + 'static, { - Self(Inner::Reader(Box::new(reader), None)) + Self { + content_type: None, + repr: Repr::Reader(Box::new(reader), None), + } } /// Create a streaming body with a known length. @@ -92,7 +105,16 @@ impl Body { where R: Read + Send + Sync + 'static, { - Self(Inner::Reader(Box::new(reader), Some(length))) + + Self { + content_type: None, + repr: Repr::Reader(Box::new(reader), Some(length)), + } + } + + pub(crate) fn with_content_type(mut self, content_type: Option) -> Self { + self.content_type = content_type; + self } /// Report if this body is empty. @@ -102,8 +124,8 @@ impl Body { /// difference between the absence of a body and the presence of a /// zero-length body. This method will only return `true` for the former. pub fn is_empty(&self) -> bool { - match self.0 { - Inner::Empty => true, + match self.repr { + Repr::Empty => true, _ => false, } } @@ -122,19 +144,24 @@ impl Body { /// bytes, even if a value is returned it should not be relied on as always /// being accurate, and should be treated as a "hint". pub fn len(&self) -> Option { - match &self.0 { - Inner::Empty => Some(0), - Inner::Buffer(bytes) => Some(bytes.get_ref().len() as u64), - Inner::Reader(_, len) => *len, + match &self.repr { + Repr::Empty => Some(0), + Repr::Buffer(bytes) => Some(bytes.get_ref().len() as u64), + Repr::Reader(_, len) => *len, } } + /// Get the content type of this body, if any. + pub(crate) fn content_type(&self) -> Option<&HeaderValue> { + self.content_type.as_ref() + } + /// If this body is repeatable, reset the body stream back to the start of /// the content. Returns `false` if the body cannot be reset. pub fn reset(&mut self) -> bool { - match &mut self.0 { - Inner::Empty => true, - Inner::Buffer(cursor) => { + match &mut self.repr { + Repr::Empty => true, + Repr::Buffer(cursor) => { cursor.set_position(0); true } @@ -154,10 +181,10 @@ impl Body { /// copy the bytes from the reader to the writing half of the pipe in a /// blocking fashion. pub(crate) fn into_async(self) -> (AsyncBody, Option) { - match self.0 { - Inner::Empty => (AsyncBody::empty(), None), - Inner::Buffer(cursor) => (AsyncBody::from_bytes_static(cursor.into_inner()), None), - Inner::Reader(reader, len) => { + let (body, writer) = match self.repr { + Repr::Empty => (AsyncBody::empty(), None), + Repr::Buffer(cursor) => (AsyncBody::from_bytes_static(cursor.into_inner()), None), + Repr::Reader(reader, len) => { let (pipe_reader, writer) = pipe(); ( @@ -172,16 +199,18 @@ impl Body { }), ) } - } + }; + + (body.with_content_type(self.content_type), writer) } } impl Read for Body { fn read(&mut self, buf: &mut [u8]) -> Result { - match &mut self.0 { - Inner::Empty => Ok(0), - Inner::Buffer(cursor) => cursor.read(buf), - Inner::Reader(reader, _) => reader.read(buf), + match &mut self.repr { + Repr::Empty => Ok(0), + Repr::Buffer(cursor) => cursor.read(buf), + Repr::Reader(reader, _) => reader.read(buf), } } } @@ -200,7 +229,10 @@ impl From<()> for Body { impl From> for Body { fn from(body: Vec) -> Self { - Self(Inner::Buffer(Cursor::new(Cow::Owned(body)))) + Self { + content_type: None, + repr: Repr::Buffer(Cursor::new(Cow::Owned(body))), + } } } diff --git a/src/client.rs b/src/client.rs index 835b1bfb..d44a1eb2 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1175,6 +1175,13 @@ impl crate::interceptor::Invoke for &HttpClient { .entry(http::header::USER_AGENT) .or_insert(USER_AGENT.parse().unwrap()); + // Copy Content-Type from body if not otherwise specified. + if !request.headers().contains_key(http::header::CONTENT_TYPE) { + if let Some(content_type) = request.body().content_type().cloned() { + request.headers_mut().insert(http::header::CONTENT_TYPE, content_type); + } + } + // Check if automatic decompression is enabled; we'll need to know // this later after the response is sent. let is_automatic_decompression = request diff --git a/src/forms/mod.rs b/src/forms/mod.rs new file mode 100644 index 00000000..618f9338 --- /dev/null +++ b/src/forms/mod.rs @@ -0,0 +1,7 @@ +//! Helpers for sending form bodies. + +#[cfg(feature = "multipart")] +mod multipart; + +#[cfg(feature = "multipart")] +pub use multipart::*; diff --git a/src/forms/multipart.rs b/src/forms/multipart.rs new file mode 100644 index 00000000..7eda94a3 --- /dev/null +++ b/src/forms/multipart.rs @@ -0,0 +1,373 @@ +use std::{collections::VecDeque, convert::TryFrom, io, io::{Write, Read}, iter::repeat_with, pin::Pin, task::{Context, Poll}}; + +use futures_lite::{ + io::{Chain, Cursor}, + ready, + AsyncRead, + AsyncReadExt, +}; +use http::header::{HeaderValue, HeaderName}; + +use crate::body::{Body, AsyncBody}; + +/// Builder for constructing a multipart form. +/// +/// Generates a multipart form body as described in [RFC +/// 7578](https://datatracker.ietf.org/doc/html/rfc7578). +#[derive(Debug)] +pub struct FormDataBuilder { + boundary: String, + fields: Vec>, +} + +impl FormDataBuilder { + /// Create a new form builder. + pub fn new() -> Self { + Self::with_boundary(generate_boundary()) + } + + /// Specify a boundary to use. A random one will be generated if one is not + /// specified. + fn with_boundary>(boundary: S) -> Self { + Self { + boundary: boundary.into(), + fields: Vec::new(), + } + } + + /// Append a field to this form with a given name and value. + /// + /// Duplicate fields with the same name are allowed and will be preserved in + /// the order they are added. + pub fn field(self, name: N, value: V) -> Self + where + N: Into, + V: Into, + { + self.part(FormPart::new(name, value)) + } + + /// Append a part to this form. + pub fn part(mut self, part: FormPart) -> Self { + self.fields.push(part); + self + } +} + +impl FormDataBuilder { + /// Build the form. + pub fn build(self) -> Body { + let boundary = self.boundary; + + let mut parts = VecDeque::with_capacity(self.fields.len()); + let mut len = Some(boundary.len() as u64); + + for part in self.fields { + let (read, part_size) = part.into_read(boundary.as_str()); + parts.push_back(read); + + if let (Some(a), Some(b)) = (len, part_size) { + len = Some(a + b); + } + else { + len = None; + } + } + + let terminator = std::io::Cursor::new(format!("--{}--\r\n", &boundary).into_bytes()); + + len = len.map(|size| size + terminator.get_ref().len() as u64); + + let parts = MultiChain { + items: parts, + }; + + let full_reader = parts.chain(terminator); + + let mut body = if let Some(len) = len { + Body::from_reader_sized(full_reader, len) + } else { + Body::from_reader(full_reader) + }; + + body = body.with_content_type(Some(format!("multipart/form-data; boundary={}", boundary).parse().unwrap())); + + body + } +} + +impl FormDataBuilder { + /// Build the form. + pub fn build(self) -> AsyncBody { + let boundary = self.boundary; + + let parts = self + .fields + .into_iter() + .map(|field| field.into_writer(boundary.as_str())) + .collect::>(); + + let terminator = Cursor::new(format!("--{}--\r\n", &boundary).into_bytes()); + + // Try to compute the size of the body we will write. This can only be + // determined if all parts contain values that are also sized. + let len = parts + .iter() + .map(|part| { + Some( + part.get_ref().0.get_ref().1.len()? + + part.get_ref().0.get_ref().0.get_ref().len() as u64 + + part.get_ref().1.len() as u64, + ) + }) + .fold(Some(0), |a, b| Some(a? + b?)) + .map(|size| size + terminator.get_ref().len() as u64); + + let parts = MultiChain { + items: parts, + }; + + let full_reader = parts.chain(terminator); + + let mut body = if let Some(len) = len { + AsyncBody::from_reader_sized(full_reader, len) + } else { + AsyncBody::from_reader(full_reader) + }; + + body = body.with_content_type(Some(format!("multipart/form-data; boundary={}", boundary).parse().unwrap())); + + body + } +} + +/// A single part of a multipart form representing a single field. +#[derive(Debug)] +pub struct FormPart { + name: String, + filename: Option, + content_type: Option, + headers: Vec<(HeaderName, HeaderValue)>, + value: BODY, + error: Option, +} + +impl FormPart { + /// Create a new form part with a name and value. + pub fn new(name: N, value: V) -> Self + where + N: Into, + V: Into, + { + FormPart { + name: name.into(), + filename: None, + content_type: Some(String::from("text/plain;charset=UTF-8")), + headers: Vec::new(), + value: value.into(), + error: None, + } + } + + /// Set the filename of this form part. + pub fn filename(mut self, filename: String) -> Self { + self.filename = Some(filename); + self + } + + /// Set the content type of this form part. + pub fn content_type(mut self, content_type: String) -> Self { + self.content_type = Some(content_type); + self + } + + /// Append a custom header to this form part. + pub fn header(mut self, name: K, value: V) -> Self + where + HeaderName: TryFrom, + >::Error: Into, + HeaderValue: TryFrom, + >::Error: Into, + { + if self.error.is_none() { + let name = >::try_from(name).map_err(Into::into).unwrap(); + let value = >::try_from(value).map_err(Into::into).unwrap(); + + self.headers.push((name, value)); + } + + self + } +} + +impl FormPart { + fn into_read(self, boundary: &str) -> (impl Read, Option) { + let mut header = Vec::new(); + + write!(header, "--{}\r\n", boundary).unwrap(); + write!( + header, + "Content-Disposition: form-data; name=\"{}\"", + &self.name + ) + .unwrap(); + + if let Some(filename) = self.filename.as_ref() { + write!(header, "; filename=\"{}\"", filename).unwrap(); + } + + header.extend_from_slice(b"\r\n"); + + if let Some(content_type) = self.content_type.as_ref() { + write!(header, "Content-Type: {}\r\n", content_type).unwrap(); + } + + for (name, value) in self.headers { + header.extend_from_slice(name.as_ref()); + header.extend_from_slice(b": "); + header.extend_from_slice(value.as_ref()); + header.extend_from_slice(b"\r\n"); + } + + header.extend_from_slice(b"\r\n"); + + let reader = std::io::Cursor::new(header).chain(self.value).chain(std::io::Cursor::new(b"\r\n")); + + (reader, None) + } +} + +impl FormPart { + // Chain>, AsyncBody>, &'static [u8]> + fn into_writer(self, boundary: &str) -> Chain>, AsyncBody>, &'static [u8]> { + let mut header = Vec::new(); + + write!(header, "--{}\r\n", boundary).unwrap(); + write!( + header, + "Content-Disposition: form-data; name=\"{}\"", + &self.name + ) + .unwrap(); + + if let Some(filename) = self.filename.as_ref() { + write!(header, "; filename=\"{}\"", filename).unwrap(); + } + + header.extend_from_slice(b"\r\n"); + + if let Some(content_type) = self.content_type.as_ref() { + write!(header, "Content-Type: {}\r\n", content_type).unwrap(); + } + + for (name, value) in self.headers { + header.extend_from_slice(name.as_ref()); + header.extend_from_slice(b": "); + header.extend_from_slice(value.as_ref()); + header.extend_from_slice(b"\r\n"); + } + + header.extend_from_slice(b"\r\n"); + + Cursor::new(header).chain(self.value).chain(b"\r\n") + } +} + +/// A chained reader which can chain multiple readers of the same type together. +struct MultiChain { + items: VecDeque, +} + +impl Read for MultiChain { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + while let Some(item) = self.items.front_mut() { + match item.read(buf) { + Ok(0) => { + // This item has finished being read, discard it and move to + // the next one. + self.items.pop_front(); + } + result => return result, + } + } + + Ok(0) + } +} + +impl AsyncRead for MultiChain { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + while let Some(item) = self.items.front_mut() { + match ready!(AsyncRead::poll_read(Pin::new(item), cx, buf)) { + Ok(0) => { + // This item has finished being read, discard it and move to + // the next one. + self.items.pop_front(); + } + result => return Poll::Ready(result), + } + } + + Poll::Ready(Ok(0)) + } +} + +fn generate_boundary() -> String { + repeat_with(fastrand::alphanumeric).take(24).collect() +} + +#[cfg(test)] +mod tests { + use super::*; + use futures_lite::{future::block_on, io::AsyncReadExt}; + + #[test] + fn empty_form() { + let mut form: AsyncBody = FormDataBuilder::::with_boundary("boundary").build(); + + let expected = "--boundary--\r\n"; + + assert_eq!(form.len(), Some(expected.len() as u64)); + + let contents = block_on(async { + let mut buf = String::new(); + form.read_to_string(&mut buf).await.unwrap(); + buf + }); + + assert_eq!(contents, expected); + } + + #[test] + fn sized_form() { + let mut form: AsyncBody = FormDataBuilder::::with_boundary("boundary") + .field("foo", "value1") + .field("bar", "value2") + .build(); + + let expected = "\ + --boundary\r\n\ + Content-Disposition: form-data; name=\"foo\"\r\n\ + \r\n\ + value1\r\n\ + --boundary\r\n\ + Content-Disposition: form-data; name=\"bar\"\r\n\ + \r\n\ + value2\r\n\ + --boundary--\r\n\ + "; + + let contents = block_on(async { + let mut buf = String::new(); + form.read_to_string(&mut buf).await.unwrap(); + buf + }); + + assert_eq!(contents, expected); + assert_eq!(form.len(), Some(expected.len() as u64)); + } +} diff --git a/src/lib.rs b/src/lib.rs index c416cbc6..a3004702 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -269,6 +269,7 @@ mod trailer; pub mod auth; pub mod config; pub mod error; +pub mod forms; #[cfg(feature = "unstable-interceptors")] pub mod interceptor;