Skip to content

Commit 06c207f

Browse files
committed
Started porting #252 back to main
1 parent 079f6c6 commit 06c207f

File tree

13 files changed

+448
-63
lines changed

13 files changed

+448
-63
lines changed

Cargo.lock

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ jsonwebtoken = { version = "9.3.1", default-features = false }
4848
lazy_static = "1.5.0"
4949
lh_eth2_keystore = { package = "eth2_keystore", git = "https://github.com/sigp/lighthouse", tag = "v7.1.0" }
5050
lh_types = { package = "types", git = "https://github.com/sigp/lighthouse", tag = "v7.1.0" }
51+
mediatype = "0.20.0"
5152
parking_lot = "0.12.3"
5253
pbkdf2 = "0.12.2"
5354
prometheus = "0.13.4"

crates/common/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ futures.workspace = true
2525
jsonwebtoken.workspace = true
2626
lh_eth2_keystore.workspace = true
2727
lh_types.workspace = true
28+
mediatype.workspace = true
2829
pbkdf2.workspace = true
2930
rand.workspace = true
3031
rayon.workspace = true

crates/common/src/signer/store.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -709,7 +709,7 @@ mod test {
709709
.join(consensus_signer.pubkey().to_string())
710710
.join("TEST_MODULE")
711711
.join("bls")
712-
.join(format!("{}.sig", proxy_signer.pubkey().to_string()))
712+
.join(format!("{}.sig", proxy_signer.pubkey()))
713713
)
714714
.unwrap()
715715
)

crates/common/src/utils.rs

Lines changed: 197 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,27 @@
11
#[cfg(test)]
22
use std::cell::Cell;
33
use std::{
4+
fmt,
45
net::Ipv4Addr,
6+
str::FromStr,
57
time::{SystemTime, UNIX_EPOCH},
68
};
79

810
use alloy::{hex, primitives::U256};
9-
use axum::http::HeaderValue;
11+
use axum::{
12+
extract::{FromRequest, Request},
13+
http::HeaderValue,
14+
response::{IntoResponse, Response as AxumResponse},
15+
};
16+
use bytes::Bytes;
1017
use futures::StreamExt;
1118
use lh_types::test_utils::{SeedableRng, TestRandom, XorShiftRng};
19+
use mediatype::{MediaType, MediaTypeList, names};
1220
use rand::{Rng, distr::Alphanumeric};
13-
use reqwest::{Response, header::HeaderMap};
21+
use reqwest::{
22+
Response, StatusCode,
23+
header::{ACCEPT, CONTENT_TYPE, HeaderMap},
24+
};
1425
use serde::{Serialize, de::DeserializeOwned};
1526
use serde_json::Value;
1627
use ssz::{Decode, Encode};
@@ -31,6 +42,7 @@ use crate::{
3142
};
3243

3344
const MILLIS_PER_SECOND: u64 = 1_000;
45+
pub const CONSENSUS_VERSION_HEADER: &str = "Eth-Consensus-Version";
3446

3547
#[derive(Debug, Error)]
3648
pub enum ResponseReadError {
@@ -408,6 +420,189 @@ pub fn get_user_agent_with_version(req_headers: &HeaderMap) -> eyre::Result<Head
408420
Ok(HeaderValue::from_str(&format!("commit-boost/{HEADER_VERSION_VALUE} {ua}"))?)
409421
}
410422

423+
/// Parse ACCEPT header, default to JSON if missing or mal-formatted
424+
pub fn get_accept_header(req_headers: &HeaderMap) -> Accept {
425+
Accept::from_str(
426+
req_headers.get(ACCEPT).and_then(|value| value.to_str().ok()).unwrap_or("application/json"),
427+
)
428+
.unwrap_or(Accept::Json)
429+
}
430+
431+
/// Parse CONTENT TYPE header, default to JSON if missing or mal-formatted
432+
pub fn get_content_type_header(req_headers: &HeaderMap) -> ContentType {
433+
ContentType::from_str(
434+
req_headers
435+
.get(CONTENT_TYPE)
436+
.and_then(|value| value.to_str().ok())
437+
.unwrap_or("application/json"),
438+
)
439+
.unwrap_or(ContentType::Json)
440+
}
441+
442+
/// Parse CONSENSUS_VERSION header
443+
pub fn get_consensus_version_header(req_headers: &HeaderMap) -> Option<ForkName> {
444+
ForkName::from_str(
445+
req_headers
446+
.get(CONSENSUS_VERSION_HEADER)
447+
.and_then(|value| value.to_str().ok())
448+
.unwrap_or(""),
449+
)
450+
.ok()
451+
}
452+
453+
#[derive(Debug, Clone, Copy, PartialEq)]
454+
pub enum ForkName {
455+
Electra,
456+
}
457+
458+
impl std::fmt::Display for ForkName {
459+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
460+
match self {
461+
ForkName::Electra => write!(f, "electra"),
462+
}
463+
}
464+
}
465+
466+
impl FromStr for ForkName {
467+
type Err = String;
468+
fn from_str(value: &str) -> Result<Self, Self::Err> {
469+
match value {
470+
"electra" => Ok(ForkName::Electra),
471+
_ => Err(format!("Invalid fork name {}", value)),
472+
}
473+
}
474+
}
475+
476+
#[derive(Debug, Clone, Copy, PartialEq)]
477+
pub enum ContentType {
478+
Json,
479+
Ssz,
480+
}
481+
482+
impl std::fmt::Display for ContentType {
483+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
484+
match self {
485+
ContentType::Json => write!(f, "application/json"),
486+
ContentType::Ssz => write!(f, "application/octet-stream"),
487+
}
488+
}
489+
}
490+
491+
impl FromStr for ContentType {
492+
type Err = String;
493+
fn from_str(value: &str) -> Result<Self, Self::Err> {
494+
match value {
495+
"application/json" => Ok(ContentType::Json),
496+
"application/octet-stream" => Ok(ContentType::Ssz),
497+
_ => Ok(ContentType::Json),
498+
}
499+
}
500+
}
501+
502+
#[derive(Debug, Clone, Copy, PartialEq)]
503+
pub enum Accept {
504+
Json,
505+
Ssz,
506+
Any,
507+
}
508+
509+
impl fmt::Display for Accept {
510+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
511+
match self {
512+
Accept::Ssz => write!(f, "application/octet-stream"),
513+
Accept::Json => write!(f, "application/json"),
514+
Accept::Any => write!(f, "*/*"),
515+
}
516+
}
517+
}
518+
519+
impl FromStr for Accept {
520+
type Err = String;
521+
522+
fn from_str(s: &str) -> Result<Self, Self::Err> {
523+
let media_type_list = MediaTypeList::new(s);
524+
525+
// [q-factor weighting]: https://datatracker.ietf.org/doc/html/rfc7231#section-5.3.2
526+
// find the highest q-factor supported accept type
527+
let mut highest_q = 0_u16;
528+
let mut accept_type = None;
529+
530+
const APPLICATION: &str = names::APPLICATION.as_str();
531+
const OCTET_STREAM: &str = names::OCTET_STREAM.as_str();
532+
const JSON: &str = names::JSON.as_str();
533+
const STAR: &str = names::_STAR.as_str();
534+
const Q: &str = names::Q.as_str();
535+
536+
media_type_list.into_iter().for_each(|item| {
537+
if let Ok(MediaType { ty, subty, suffix: _, params }) = item {
538+
let q_accept = match (ty.as_str(), subty.as_str()) {
539+
(APPLICATION, OCTET_STREAM) => Some(Accept::Ssz),
540+
(APPLICATION, JSON) => Some(Accept::Json),
541+
(STAR, STAR) => Some(Accept::Any),
542+
_ => None,
543+
}
544+
.map(|item_accept_type| {
545+
let q_val = params
546+
.iter()
547+
.find_map(|(n, v)| match n.as_str() {
548+
Q => {
549+
Some((v.as_str().parse::<f32>().unwrap_or(0_f32) * 1000_f32) as u16)
550+
}
551+
_ => None,
552+
})
553+
.or(Some(1000_u16));
554+
555+
(q_val.unwrap(), item_accept_type)
556+
});
557+
558+
match q_accept {
559+
Some((q, accept)) if q > highest_q => {
560+
highest_q = q;
561+
accept_type = Some(accept);
562+
}
563+
_ => (),
564+
}
565+
}
566+
});
567+
accept_type.ok_or_else(|| "accept header is not supported".to_string())
568+
}
569+
}
570+
571+
#[must_use]
572+
#[derive(Debug, Clone, Copy, Default)]
573+
pub struct JsonOrSsz<T>(pub T);
574+
575+
impl<T, S> FromRequest<S> for JsonOrSsz<T>
576+
where
577+
T: serde::de::DeserializeOwned + ssz::Decode + 'static,
578+
S: Send + Sync,
579+
{
580+
type Rejection = AxumResponse;
581+
582+
async fn from_request(req: Request, _state: &S) -> Result<Self, Self::Rejection> {
583+
let headers = req.headers().clone();
584+
let content_type = headers.get(CONTENT_TYPE).and_then(|value| value.to_str().ok());
585+
586+
let bytes = Bytes::from_request(req, _state).await.map_err(IntoResponse::into_response)?;
587+
588+
if let Some(content_type) = content_type {
589+
if content_type.starts_with(&ContentType::Json.to_string()) {
590+
let payload: T = serde_json::from_slice(&bytes)
591+
.map_err(|_| StatusCode::BAD_REQUEST.into_response())?;
592+
return Ok(Self(payload));
593+
}
594+
595+
if content_type.starts_with(&ContentType::Ssz.to_string()) {
596+
let payload = T::from_ssz_bytes(&bytes)
597+
.map_err(|_| StatusCode::BAD_REQUEST.into_response())?;
598+
return Ok(Self(payload));
599+
}
600+
}
601+
602+
Err(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response())
603+
}
604+
}
605+
411606
#[cfg(unix)]
412607
pub async fn wait_for_signal() -> eyre::Result<()> {
413608
use tokio::signal::unix::{SignalKind, signal};

crates/pbs/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ axum.workspace = true
1212
axum-extra.workspace = true
1313
cb-common.workspace = true
1414
cb-metrics.workspace = true
15+
ethereum_ssz.workspace = true
1516
eyre.workspace = true
1617
futures.workspace = true
1718
lazy_static.workspace = true

crates/pbs/src/routes/get_header.rs

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
use alloy::primitives::utils::format_ether;
22
use axum::{
33
extract::{Path, State},
4-
http::HeaderMap,
4+
http::{HeaderMap, HeaderValue},
55
response::IntoResponse,
66
};
77
use cb_common::{
8-
pbs::GetHeaderParams,
9-
utils::{get_user_agent, ms_into_slot},
8+
pbs::{GetHeaderParams, VersionedResponse},
9+
utils::{Accept, CONSENSUS_VERSION_HEADER, get_accept_header, get_user_agent, ms_into_slot},
1010
};
11-
use reqwest::StatusCode;
11+
use reqwest::{StatusCode, header::CONTENT_TYPE};
12+
use ssz::Encode;
1213
use tracing::{error, info};
1314

1415
use crate::{
@@ -32,6 +33,7 @@ pub async fn handle_get_header<S: BuilderApiState, A: BuilderApi<S>>(
3233

3334
let ua = get_user_agent(&req_headers);
3435
let ms_into_slot = ms_into_slot(params.slot, state.config.chain);
36+
let accept_header = get_accept_header(&req_headers);
3537

3638
info!(ua, ms_into_slot, "new request");
3739

@@ -41,7 +43,35 @@ pub async fn handle_get_header<S: BuilderApiState, A: BuilderApi<S>>(
4143
info!(value_eth = format_ether(max_bid.value()), block_hash =% max_bid.block_hash(), "received header");
4244

4345
BEACON_NODE_STATUS.with_label_values(&["200", GET_HEADER_ENDPOINT_TAG]).inc();
44-
Ok((StatusCode::OK, axum::Json(max_bid)).into_response())
46+
let response = match accept_header {
47+
Accept::Ssz => {
48+
let mut res = match &max_bid {
49+
VersionedResponse::Electra(max_bid) => {
50+
(StatusCode::OK, max_bid.as_ssz_bytes()).into_response()
51+
}
52+
};
53+
let Ok(consensus_version_header) = HeaderValue::from_str(max_bid.version())
54+
else {
55+
info!("sending response as JSON");
56+
return Ok((StatusCode::OK, axum::Json(max_bid)).into_response());
57+
};
58+
let Ok(content_type_header) =
59+
HeaderValue::from_str(&format!("{}", Accept::Ssz))
60+
else {
61+
info!("sending response as JSON");
62+
return Ok((StatusCode::OK, axum::Json(max_bid)).into_response());
63+
};
64+
res.headers_mut()
65+
.insert(CONSENSUS_VERSION_HEADER, consensus_version_header);
66+
res.headers_mut().insert(CONTENT_TYPE, content_type_header);
67+
info!("sending response as SSZ");
68+
res
69+
}
70+
Accept::Json | Accept::Any => {
71+
(StatusCode::OK, axum::Json(max_bid)).into_response()
72+
}
73+
};
74+
Ok(response)
4575
} else {
4676
// spec: return 204 if request is valid but no bid available
4777
info!("no header available for slot");

0 commit comments

Comments
 (0)