From 0a47aaae14432ddfe7b7e019dc16f6b66449a01e Mon Sep 17 00:00:00 2001 From: Facundo Tuesca Date: Fri, 24 Oct 2025 02:41:32 +0200 Subject: [PATCH 1/2] asn1: Add support for `IMPLICIT` and `EXPLICIT` Signed-off-by: Facundo Tuesca --- src/cryptography/hazmat/asn1/__init__.py | 4 + src/cryptography/hazmat/asn1/asn1.py | 17 ++++- .../bindings/_rust/declarative_asn1.pyi | 8 ++ src/rust/src/declarative_asn1/decode.rs | 68 ++++++++++++----- src/rust/src/declarative_asn1/encode.rs | 38 +++++++--- src/rust/src/declarative_asn1/types.rs | 66 ++++++++++++----- src/rust/src/lib.rs | 4 +- tests/hazmat/asn1/test_serialization.py | 73 +++++++++++++++++++ 8 files changed, 226 insertions(+), 52 deletions(-) diff --git a/src/cryptography/hazmat/asn1/__init__.py b/src/cryptography/hazmat/asn1/__init__.py index e2bfc6537606..b4cb2a61dbc7 100644 --- a/src/cryptography/hazmat/asn1/__init__.py +++ b/src/cryptography/hazmat/asn1/__init__.py @@ -4,7 +4,9 @@ from cryptography.hazmat.asn1.asn1 import ( Default, + Explicit, GeneralizedTime, + Implicit, PrintableString, UtcTime, decode_der, @@ -14,7 +16,9 @@ __all__ = [ "Default", + "Explicit", "GeneralizedTime", + "Implicit", "PrintableString", "UtcTime", "decode_der", diff --git a/src/cryptography/hazmat/asn1/asn1.py b/src/cryptography/hazmat/asn1/asn1.py index 958d3c33fe9e..27e64dc09d14 100644 --- a/src/cryptography/hazmat/asn1/asn1.py +++ b/src/cryptography/hazmat/asn1/asn1.py @@ -60,13 +60,18 @@ def _is_union(field_type: type) -> bool: def _extract_annotation(metadata: tuple) -> declarative_asn1.Annotation: default = None + encoding = None for raw_annotation in metadata: if isinstance(raw_annotation, Default): default = raw_annotation.value + elif isinstance(raw_annotation, Explicit): + encoding = declarative_asn1.Encoding.Explicit(raw_annotation.tag) + elif isinstance(raw_annotation, Implicit): + encoding = declarative_asn1.Encoding.Implicit(raw_annotation.tag) else: raise TypeError(f"unsupported annotation: {raw_annotation}") - return declarative_asn1.Annotation(default=default) + return declarative_asn1.Annotation(default=default, encoding=encoding) def _normalize_field_type( @@ -194,6 +199,16 @@ class Default(typing.Generic[U]): value: U +@dataclasses.dataclass(frozen=True) +class Explicit: + tag: int + + +@dataclasses.dataclass(frozen=True) +class Implicit: + tag: int + + PrintableString = declarative_asn1.PrintableString UtcTime = declarative_asn1.UtcTime GeneralizedTime = declarative_asn1.GeneralizedTime diff --git a/src/cryptography/hazmat/bindings/_rust/declarative_asn1.pyi b/src/cryptography/hazmat/bindings/_rust/declarative_asn1.pyi index fe9256944230..4bddb043294d 100644 --- a/src/cryptography/hazmat/bindings/_rust/declarative_asn1.pyi +++ b/src/cryptography/hazmat/bindings/_rust/declarative_asn1.pyi @@ -21,12 +21,20 @@ class Type: class Annotation: default: typing.Any | None + encoding: Encoding | None def __new__( cls, default: typing.Any | None = None, + encoding: Encoding | None = None, ) -> Annotation: ... def is_empty(self) -> bool: ... +# Encoding is a Rust enum with tuple variants. For now, we express the type +# annotations like this: +class Encoding: + Implicit: typing.ClassVar[type] + Explicit: typing.ClassVar[type] + class AnnotatedType: inner: Type annotation: Annotation diff --git a/src/rust/src/declarative_asn1/decode.rs b/src/rust/src/declarative_asn1/decode.rs index ad850f088e1d..a72b2b739d8e 100644 --- a/src/rust/src/declarative_asn1/decode.rs +++ b/src/rust/src/declarative_asn1/decode.rs @@ -7,25 +7,41 @@ use pyo3::types::PyAnyMethods; use crate::asn1::big_byte_slice_to_py_int; use crate::declarative_asn1::types::{ - type_to_tag, AnnotatedType, GeneralizedTime, PrintableString, Type, UtcTime, + type_to_tag, AnnotatedType, Encoding, GeneralizedTime, PrintableString, Type, UtcTime, }; use crate::error::CryptographyError; type ParseResult = Result; +fn read_value<'a, T: asn1::SimpleAsn1Readable<'a>>( + parser: &mut Parser<'a>, + encoding: &Option>, +) -> ParseResult { + let value = match encoding { + Some(e) => match e.get() { + Encoding::Implicit(n) => parser.read_implicit_element::(*n), + Encoding::Explicit(n) => parser.read_explicit_element::(*n), + }, + None => parser.read_element::(), + }?; + Ok(value) +} + fn decode_pybool<'a>( py: pyo3::Python<'a>, parser: &mut Parser<'a>, + encoding: &Option>, ) -> ParseResult> { - let value = parser.read_element::()?; + let value = read_value::(parser, encoding)?; Ok(pyo3::types::PyBool::new(py, value).to_owned()) } fn decode_pyint<'a>( py: pyo3::Python<'a>, parser: &mut Parser<'a>, + encoding: &Option>, ) -> ParseResult> { - let value = parser.read_element::>()?; + let value = read_value::>(parser, encoding)?; let pyint = big_byte_slice_to_py_int(py, value.as_bytes())?.cast_into::()?; Ok(pyint) @@ -34,24 +50,27 @@ fn decode_pyint<'a>( fn decode_pybytes<'a>( py: pyo3::Python<'a>, parser: &mut Parser<'a>, + encoding: &Option>, ) -> ParseResult> { - let value = parser.read_element::<&[u8]>()?; + let value = read_value::<&[u8]>(parser, encoding)?; Ok(pyo3::types::PyBytes::new(py, value)) } fn decode_pystr<'a>( py: pyo3::Python<'a>, parser: &mut Parser<'a>, + encoding: &Option>, ) -> ParseResult> { - let value = parser.read_element::>()?; + let value = read_value::>(parser, encoding)?; Ok(pyo3::types::PyString::new(py, value.as_str())) } fn decode_printable_string<'a>( py: pyo3::Python<'a>, parser: &mut Parser<'a>, + encoding: &Option>, ) -> ParseResult> { - let value = parser.read_element::>()?.as_str(); + let value = read_value::>(parser, encoding)?.as_str(); let inner = pyo3::types::PyString::new(py, value).unbind(); Ok(pyo3::Bound::new(py, PrintableString { inner })?) } @@ -59,8 +78,9 @@ fn decode_printable_string<'a>( fn decode_utc_time<'a>( py: pyo3::Python<'a>, parser: &mut Parser<'a>, + encoding: &Option>, ) -> ParseResult> { - let value = parser.read_element::()?; + let value = read_value::(parser, encoding)?; let dt = value.as_datetime(); let inner = crate::x509::datetime_to_py_utc(py, dt)? @@ -73,8 +93,9 @@ fn decode_utc_time<'a>( fn decode_generalized_time<'a>( py: pyo3::Python<'a>, parser: &mut Parser<'a>, + encoding: &Option>, ) -> ParseResult> { - let value = parser.read_element::()?; + let value = read_value::(parser, encoding)?; let dt = value.as_datetime(); let microseconds = match value.nanoseconds() { @@ -102,11 +123,12 @@ pub(crate) fn decode_annotated_type<'a>( ann_type: &AnnotatedType, ) -> ParseResult> { let inner = ann_type.inner.get(); + let encoding = &ann_type.annotation.get().encoding; // Handle DEFAULT annotation if field is not present (by // returning the default value) if let Some(default) = &ann_type.annotation.get().default { - let expected_tag = type_to_tag(inner); + let expected_tag = type_to_tag(inner, encoding); let next_tag = parser.peek_tag(); if next_tag != Some(expected_tag) { return Ok(default.clone_ref(py).into_bound(py)); @@ -115,7 +137,7 @@ pub(crate) fn decode_annotated_type<'a>( let decoded = match &inner { Type::Sequence(cls, fields) => { - let seq_parse_result = parser.read_element::>()?; + let seq_parse_result = read_value::>(parser, encoding)?; seq_parse_result.parse(|d| -> ParseResult> { let kwargs = pyo3::types::PyDict::new(py); @@ -130,19 +152,27 @@ pub(crate) fn decode_annotated_type<'a>( })? } Type::Option(cls) => { - let inner_tag = type_to_tag(cls.get().inner.get()); + let inner_tag = type_to_tag(cls.get().inner.get(), encoding); match parser.peek_tag() { - Some(t) if t == inner_tag => decode_annotated_type(py, parser, cls.get())?, + Some(t) if t == inner_tag => { + // Since for optional types the annotations are enforced to be associated with the Option + // (instead of the inner type), when decoding the inner type we add the annotations of the Option + let inner_ann_type = AnnotatedType { + inner: cls.get().inner.clone_ref(py), + annotation: ann_type.annotation.clone_ref(py), + }; + decode_annotated_type(py, parser, &inner_ann_type)? + } _ => pyo3::types::PyNone::get(py).to_owned().into_any(), } } - Type::PyBool() => decode_pybool(py, parser)?.into_any(), - Type::PyInt() => decode_pyint(py, parser)?.into_any(), - Type::PyBytes() => decode_pybytes(py, parser)?.into_any(), - Type::PyStr() => decode_pystr(py, parser)?.into_any(), - Type::PrintableString() => decode_printable_string(py, parser)?.into_any(), - Type::UtcTime() => decode_utc_time(py, parser)?.into_any(), - Type::GeneralizedTime() => decode_generalized_time(py, parser)?.into_any(), + Type::PyBool() => decode_pybool(py, parser, encoding)?.into_any(), + Type::PyInt() => decode_pyint(py, parser, encoding)?.into_any(), + Type::PyBytes() => decode_pybytes(py, parser, encoding)?.into_any(), + Type::PyStr() => decode_pystr(py, parser, encoding)?.into_any(), + Type::PrintableString() => decode_printable_string(py, parser, encoding)?.into_any(), + Type::UtcTime() => decode_utc_time(py, parser, encoding)?.into_any(), + Type::GeneralizedTime() => decode_generalized_time(py, parser, encoding)?.into_any(), }; match &ann_type.annotation.get().default { diff --git a/src/rust/src/declarative_asn1/encode.rs b/src/rust/src/declarative_asn1/encode.rs index 9d15a0fa58af..fee30f18df18 100644 --- a/src/rust/src/declarative_asn1/encode.rs +++ b/src/rust/src/declarative_asn1/encode.rs @@ -6,14 +6,21 @@ use asn1::{SimpleAsn1Writable, Writer}; use pyo3::types::PyAnyMethods; use crate::declarative_asn1::types::{ - AnnotatedType, AnnotatedTypeObject, GeneralizedTime, PrintableString, Type, UtcTime, + AnnotatedType, AnnotatedTypeObject, Encoding, GeneralizedTime, PrintableString, Type, UtcTime, }; fn write_value( writer: &mut Writer<'_>, value: &T, + encoding: &Option>, ) -> Result<(), asn1::WriteError> { - writer.write_element(value) + match encoding { + Some(e) => match e.get() { + Encoding::Implicit(tag) => writer.write_implicit_element(value, *tag), + Encoding::Explicit(tag) => writer.write_explicit_element(value, *tag), + }, + None => writer.write_element(value), + } } impl asn1::Asn1Writable for AnnotatedTypeObject<'_> { @@ -37,6 +44,7 @@ impl asn1::Asn1Writable for AnnotatedTypeObject<'_> { } } + let encoding = &annotated_type.annotation.get().encoding; let inner = annotated_type.inner.get(); match &inner { Type::Sequence(_cls, fields) => write_value( @@ -60,14 +68,20 @@ impl asn1::Asn1Writable for AnnotatedTypeObject<'_> { } Ok(()) }), + encoding, ), Type::Option(cls) => { if !value.is_none() { - let object = AnnotatedTypeObject { - annotated_type: cls.get(), + let inner_object = AnnotatedTypeObject { + annotated_type: &AnnotatedType { + inner: cls.get().inner.clone_ref(py), + // Since for optional types the annotations are enforced to be associated with the Option + // (instead of the inner type), when encoding the inner type we add the annotations of the Option + annotation: annotated_type.annotation.clone_ref(py), + }, value, }; - object.write(writer) + inner_object.write(writer) } else { // Missing OPTIONAL values are omitted from DER encoding Ok(()) @@ -77,26 +91,26 @@ impl asn1::Asn1Writable for AnnotatedTypeObject<'_> { let val: bool = value .extract() .map_err(|_| asn1::WriteError::AllocationError)?; - write_value(writer, &val) + write_value(writer, &val, encoding) } Type::PyInt() => { let val: i64 = value .extract() .map_err(|_| asn1::WriteError::AllocationError)?; - write_value(writer, &val) + write_value(writer, &val, encoding) } Type::PyBytes() => { let val: &[u8] = value .extract() .map_err(|_| asn1::WriteError::AllocationError)?; - write_value(writer, &val) + write_value(writer, &val, encoding) } Type::PyStr() => { let val: pyo3::pybacked::PyBackedStr = value .extract() .map_err(|_| asn1::WriteError::AllocationError)?; let asn1_string: asn1::Utf8String<'_> = asn1::Utf8String::new(&val); - write_value(writer, &asn1_string) + write_value(writer, &asn1_string, encoding) } Type::PrintableString() => { let val: &pyo3::Bound<'_, PrintableString> = value @@ -110,7 +124,7 @@ impl asn1::Asn1Writable for AnnotatedTypeObject<'_> { let printable_string: asn1::PrintableString<'_> = asn1::PrintableString::new(&inner_str) .ok_or(asn1::WriteError::AllocationError)?; - write_value(writer, &printable_string) + write_value(writer, &printable_string, encoding) } Type::UtcTime() => { let val: &pyo3::Bound<'_, UtcTime> = value @@ -121,7 +135,7 @@ impl asn1::Asn1Writable for AnnotatedTypeObject<'_> { .map_err(|_| asn1::WriteError::AllocationError)?; let utc_time = asn1::UtcTime::new(datetime).map_err(|_| asn1::WriteError::AllocationError)?; - write_value(writer, &utc_time) + write_value(writer, &utc_time, encoding) } Type::GeneralizedTime() => { let val: &pyo3::Bound<'_, GeneralizedTime> = value @@ -134,7 +148,7 @@ impl asn1::Asn1Writable for AnnotatedTypeObject<'_> { let nanoseconds = microseconds.map(|m| m * 1000); let generalized_time = asn1::GeneralizedTime::new(datetime, nanoseconds) .map_err(|_| asn1::WriteError::AllocationError)?; - write_value(writer, &generalized_time) + write_value(writer, &generalized_time, encoding) } } } diff --git a/src/rust/src/declarative_asn1/types.rs b/src/rust/src/declarative_asn1/types.rs index c6ed1d4577c4..b50a354c7d40 100644 --- a/src/rust/src/declarative_asn1/types.rs +++ b/src/rust/src/declarative_asn1/types.rs @@ -70,22 +70,33 @@ pub struct AnnotatedTypeObject<'a> { pub struct Annotation { #[pyo3(get)] pub(crate) default: Option>, + #[pyo3(get)] + pub(crate) encoding: Option>, } #[pyo3::pymethods] impl Annotation { #[new] - #[pyo3(signature = (default = None))] - fn new(default: Option>) -> Self { - Self { default } + #[pyo3(signature = (default = None, encoding = None))] + fn new( + default: Option>, + encoding: Option>, + ) -> Self { + Self { default, encoding } } #[pyo3(signature = ())] fn is_empty(&self) -> bool { - self.default.is_none() + self.default.is_none() && self.encoding.is_none() } } +#[pyo3::pyclass(frozen, module = "cryptography.hazmat.bindings._rust.asn1")] +pub enum Encoding { + Implicit(u32), + Explicit(u32), +} + #[derive(pyo3::FromPyObject)] #[pyo3::pyclass(frozen, module = "cryptography.hazmat.bindings._rust.asn1")] pub struct PrintableString { @@ -247,7 +258,12 @@ fn non_root_type_to_annotated<'p>( let inner = non_root_python_to_rust(py, class)?.unbind(); Ok(AnnotatedType { inner, - annotation: Annotation { default: None }.into_pyobject(py)?.unbind(), + annotation: Annotation { + default: None, + encoding: None, + } + .into_pyobject(py)? + .unbind(), }) } @@ -266,10 +282,10 @@ pub(crate) fn python_class_to_annotated<'p>( } } -pub(crate) fn type_to_tag(t: &Type) -> asn1::Tag { - match t { +pub(crate) fn type_to_tag(t: &Type, encoding: &Option>) -> asn1::Tag { + let inner_tag = match t { Type::Sequence(_, _) => asn1::Sequence::TAG, - Type::Option(t) => type_to_tag(t.get().inner.get()), + Type::Option(t) => type_to_tag(t.get().inner.get(), encoding), Type::PyBool() => bool::TAG, Type::PyInt() => asn1::BigInt::TAG, Type::PyBytes() => <&[u8] as SimpleAsn1Readable>::TAG, @@ -277,6 +293,14 @@ pub(crate) fn type_to_tag(t: &Type) -> asn1::Tag { Type::PrintableString() => asn1::PrintableString::TAG, Type::UtcTime() => asn1::UtcTime::TAG, Type::GeneralizedTime() => asn1::GeneralizedTime::TAG, + }; + + match encoding { + Some(e) => match e.get() { + Encoding::Implicit(n) => asn1::implicit_tag(*n, inner_tag), + Encoding::Explicit(n) => asn1::explicit_tag(*n), + }, + None => inner_tag, } } @@ -298,10 +322,13 @@ mod tests { py, AnnotatedType { inner: pyo3::Py::new(py, Type::PyInt()).unwrap(), - annotation: Annotation { default: None } - .into_pyobject(py) - .unwrap() - .unbind(), + annotation: Annotation { + default: None, + encoding: None, + } + .into_pyobject(py) + .unwrap() + .unbind(), }, ) .unwrap(); @@ -309,15 +336,18 @@ mod tests { py, AnnotatedType { inner: pyo3::Py::new(py, Type::Option(ann_type)).unwrap(), - annotation: Annotation { default: None } - .into_pyobject(py) - .unwrap() - .unbind(), + annotation: Annotation { + default: None, + encoding: None, + } + .into_pyobject(py) + .unwrap() + .unbind(), }, ) .unwrap(); - let expected_tag = type_to_tag(&Type::Option(optional_type)); - assert_eq!(expected_tag, type_to_tag(&Type::PyInt())) + let expected_tag = type_to_tag(&Type::Option(optional_type), &None); + assert_eq!(expected_tag, type_to_tag(&Type::PyInt(), &None)) }) } } diff --git a/src/rust/src/lib.rs b/src/rust/src/lib.rs index 119635bfd489..f08747462d62 100644 --- a/src/rust/src/lib.rs +++ b/src/rust/src/lib.rs @@ -151,8 +151,8 @@ mod _rust { #[pymodule_export] use crate::declarative_asn1::types::{ - non_root_python_to_rust, AnnotatedType, Annotation, GeneralizedTime, PrintableString, - Type, UtcTime, + non_root_python_to_rust, AnnotatedType, Annotation, Encoding, GeneralizedTime, + PrintableString, Type, UtcTime, }; } diff --git a/tests/hazmat/asn1/test_serialization.py b/tests/hazmat/asn1/test_serialization.py index b444d260b812..73d501773ead 100644 --- a/tests/hazmat/asn1/test_serialization.py +++ b/tests/hazmat/asn1/test_serialization.py @@ -4,6 +4,7 @@ import dataclasses import datetime +import re import sys import typing @@ -399,3 +400,75 @@ class Example: match="invalid DER: DEFAULT value was explicitly encoded", ): asn1.decode_der(Example, b"\x30\x03\x01\x01\xff") + + def test_ok_optional_fields_with_implicit_encoding(self) -> None: + @asn1.sequence + @_comparable_dataclass + class Example: + a: Annotated[typing.Union[int, None], asn1.Implicit(0)] + b: Annotated[typing.Union[int, None], asn1.Implicit(1)] + + assert_roundtrips( + [ + (Example(a=9, b=9), b"\x30\x06\x80\x01\x09\x81\x01\x09"), + (Example(a=9, b=None), b"\x30\x03\x80\x01\x09"), + (Example(a=None, b=9), b"\x30\x03\x81\x01\x09"), + (Example(a=None, b=None), b"\x30\x00"), + ] + ) + + def test_ok_optional_fields_with_explicit_encoding(self) -> None: + @asn1.sequence + @_comparable_dataclass + class Example: + a: Annotated[typing.Union[int, None], asn1.Explicit(0)] + b: Annotated[typing.Union[int, None], asn1.Explicit(1)] + + assert_roundtrips( + [ + ( + Example(a=9, b=9), + b"\x30\x0a\xa0\x03\x02\x01\x09\xa1\x03\x02\x01\x09", + ), + ( + Example(a=9, b=None), + b"\x30\x05\xa0\x03\x02\x01\x09", + ), + ( + Example(a=None, b=9), + b"\x30\x05\xa1\x03\x02\x01\x09", + ), + ( + Example(a=None, b=None), + b"\x30\x00", + ), + ] + ) + + def test_fail_unexpected_fields_with_implicit_encoding(self) -> None: + @asn1.sequence + @_comparable_dataclass + class Example: + a: Annotated[int, asn1.Implicit(0)] + + with pytest.raises( + ValueError, + match=re.escape( + "error parsing asn1 value: ParseError { kind: UnexpectedTag" + ), + ): + asn1.decode_der(Example, b"\x30\x03\x82\x01\x09") + + def test_fail_unexpected_fields_with_explicit_encoding(self) -> None: + @asn1.sequence + @_comparable_dataclass + class Example: + a: Annotated[int, asn1.Explicit(0)] + + with pytest.raises( + ValueError, + match=re.escape( + "error parsing asn1 value: ParseError { kind: UnexpectedTag" + ), + ): + asn1.decode_der(Example, b"\x30\x05\xa2\x03\x02\x01\x09") From 4b7663097c1da9a639883738bf4550971d8145e5 Mon Sep 17 00:00:00 2001 From: Facundo Tuesca Date: Sat, 25 Oct 2025 01:12:34 +0200 Subject: [PATCH 2/2] asn1: Fix coverage for encoding getters Signed-off-by: Facundo Tuesca --- tests/hazmat/asn1/test_api.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/hazmat/asn1/test_api.py b/tests/hazmat/asn1/test_api.py index ab8d361a20a4..e6cb3eae6815 100644 --- a/tests/hazmat/asn1/test_api.py +++ b/tests/hazmat/asn1/test_api.py @@ -239,3 +239,13 @@ def test_fields_of_variant_type(self) -> None: ) opt = declarative_asn1.Type.Option(ann_type) assert opt._0 == ann_type + + def test_fields_of_variant_encoding(self) -> None: + from cryptography.hazmat.bindings._rust import declarative_asn1 + + # Needed for coverage of the `_0`, `_1`, etc fields generated + # for tuple enum variants + implicit = declarative_asn1.Encoding.Implicit(0) + explicit = declarative_asn1.Encoding.Explicit(0) + assert implicit._0 == 0 + assert explicit._0 == 0