3
3
import dataclasses
4
4
import json
5
5
import logging
6
- from copy import deepcopy
7
6
from typing import TYPE_CHECKING , Any , Callable , Mapping , MutableMapping , Sequence
8
7
from urllib .parse import parse_qs
9
8
15
14
_normalize_errors ,
16
15
_regenerate_error_with_loc ,
17
16
get_missing_field_error ,
17
+ is_sequence_field ,
18
18
)
19
19
from aws_lambda_powertools .event_handler .openapi .dependant import is_scalar_field
20
20
from aws_lambda_powertools .event_handler .openapi .encoders import jsonable_encoder
@@ -150,11 +150,10 @@ def _parse_form_data(self, app: EventHandlerInstance) -> dict[str, Any]:
150
150
"""Parse URL-encoded form data from the request body."""
151
151
try :
152
152
body = app .current_event .decoded_body or ""
153
- # parse_qs returns dict[str, list[str]], but we want dict[str, str] for single values
153
+ # NOTE: Keep values as lists; we'll normalize per-field later based on the expected type.
154
+ # This avoids breaking List[...] fields when only a single value is provided.
154
155
parsed = parse_qs (body , keep_blank_values = True )
155
-
156
- result : dict [str , Any ] = {key : values [0 ] if len (values ) == 1 else values for key , values in parsed .items ()}
157
- return result
156
+ return parsed
158
157
159
158
except Exception as e : # pragma: no cover
160
159
raise RequestValidationError ( # pragma: no cover
@@ -314,12 +313,12 @@ def _prepare_response_content(
314
313
def _request_params_to_args (
315
314
required_params : Sequence [ModelField ],
316
315
received_params : Mapping [str , Any ],
317
- ) -> tuple [dict [str , Any ], list [Any ]]:
316
+ ) -> tuple [dict [str , Any ], list [dict [ str , Any ] ]]:
318
317
"""
319
318
Convert the request params to a dictionary of values using validation, and returns a list of errors.
320
319
"""
321
- values = {}
322
- errors = []
320
+ values : dict [ str , Any ] = {}
321
+ errors : list [ dict [ str , Any ]] = []
323
322
324
323
for field in required_params :
325
324
field_info = field .field_info
@@ -328,16 +327,12 @@ def _request_params_to_args(
328
327
if not isinstance (field_info , Param ):
329
328
raise AssertionError (f"Expected Param field_info, got { field_info } " )
330
329
331
- value = received_params .get (field .alias )
332
-
333
330
loc = (field_info .in_ .value , field .alias )
331
+ value = received_params .get (field .alias )
334
332
335
333
# If we don't have a value, see if it's required or has a default
336
334
if value is None :
337
- if field .required :
338
- errors .append (get_missing_field_error (loc = loc ))
339
- else :
340
- values [field .name ] = deepcopy (field .default )
335
+ _handle_missing_field_value (field , values , errors , loc )
341
336
continue
342
337
343
338
# Finally, validate the value
@@ -363,39 +358,64 @@ def _request_body_to_args(
363
358
)
364
359
365
360
for field in required_params :
366
- # This sets the location to:
367
- # { "user": { object } } if field.alias == user
368
- # { { object } if field_alias is omitted
369
- loc : tuple [str , ...] = ("body" , field .alias )
370
- if field_alias_omitted :
371
- loc = ("body" ,)
361
+ loc = _get_body_field_location (field , field_alias_omitted )
362
+ value = _extract_field_value_from_body (field , received_body , loc , errors )
372
363
373
- value : Any | None = None
374
-
375
- # Now that we know what to look for, try to get the value from the received body
376
- if received_body is not None :
377
- try :
378
- value = received_body .get (field .alias )
379
- except AttributeError :
380
- errors .append (get_missing_field_error (loc ))
381
- continue
382
-
383
- # Determine if the field is required
364
+ # If we don't have a value, see if it's required or has a default
384
365
if value is None :
385
- if field .required :
386
- errors .append (get_missing_field_error (loc ))
387
- else :
388
- values [field .name ] = deepcopy (field .default )
366
+ _handle_missing_field_value (field , values , errors , loc )
389
367
continue
390
368
391
- # MAINTENANCE: Handle byte and file fields
392
-
393
- # Finally, validate the value
369
+ value = _normalize_field_value (field , value )
394
370
values [field .name ] = _validate_field (field = field , value = value , loc = loc , existing_errors = errors )
395
371
396
372
return values , errors
397
373
398
374
375
+ def _get_body_field_location (field : ModelField , field_alias_omitted : bool ) -> tuple [str , ...]:
376
+ """Get the location tuple for a body field based on whether the field alias is omitted."""
377
+ if field_alias_omitted :
378
+ return ("body" ,)
379
+ return ("body" , field .alias )
380
+
381
+
382
+ def _extract_field_value_from_body (
383
+ field : ModelField ,
384
+ received_body : dict [str , Any ] | None ,
385
+ loc : tuple [str , ...],
386
+ errors : list [dict [str , Any ]],
387
+ ) -> Any | None :
388
+ """Extract field value from the received body, handling potential AttributeError."""
389
+ if received_body is None :
390
+ return None
391
+
392
+ try :
393
+ return received_body .get (field .alias )
394
+ except AttributeError :
395
+ errors .append (get_missing_field_error (loc ))
396
+ return None
397
+
398
+
399
+ def _handle_missing_field_value (
400
+ field : ModelField ,
401
+ values : dict [str , Any ],
402
+ errors : list [dict [str , Any ]],
403
+ loc : tuple [str , ...],
404
+ ) -> None :
405
+ """Handle the case when a field value is missing."""
406
+ if field .required :
407
+ errors .append (get_missing_field_error (loc ))
408
+ else :
409
+ values [field .name ] = field .get_default ()
410
+
411
+
412
+ def _normalize_field_value (field : ModelField , value : Any ) -> Any :
413
+ """Normalize field value, converting lists to single values for non-sequence fields."""
414
+ if isinstance (value , list ) and not is_sequence_field (field ):
415
+ return value [0 ]
416
+ return value
417
+
418
+
399
419
def _validate_field (
400
420
* ,
401
421
field : ModelField ,
0 commit comments