diff --git a/benefits/core/admin/enrollment.py b/benefits/core/admin/enrollment.py index 309cdce994..8f189e2ecb 100644 --- a/benefits/core/admin/enrollment.py +++ b/benefits/core/admin/enrollment.py @@ -46,6 +46,23 @@ def has_view_permission(self, request: HttpRequest, obj=None): return False +@admin.register(models.EligibilityApiVerificationRequest) +class EligibilityApiVerificationRequestAdmin(admin.ModelAdmin): + list_display = ("label", "api_url") + + def has_view_permission(self, request: HttpRequest, obj=None): + if request.user and request.user.is_superuser: + return True + else: + return False + + def has_add_permission(self, request: HttpRequest, obj=None): + if request.user and request.user.is_superuser: + return True + else: + return False + + class EnrollmentFlowForm(forms.ModelForm): def has_field(self, field_name): return self.fields.get(field_name) is not None @@ -81,24 +98,10 @@ def clean(self): if transit_agency: # these fields might not be on the form, so use helper method to correctly get the value - eligibility_api_url = self.get(cleaned_data, "eligibility_api_url") - - if eligibility_api_url: - message = "Required for Eligibility API verification." - needed = dict( - eligibility_api_auth_header=self.get(cleaned_data, "eligibility_api_auth_header"), - eligibility_api_auth_key_secret_name=self.get(cleaned_data, "eligibility_api_auth_key_secret_name"), - eligibility_api_jwe_cek_enc=self.get(cleaned_data, "eligibility_api_jwe_cek_enc"), - eligibility_api_jwe_encryption_alg=self.get(cleaned_data, "eligibility_api_jwe_encryption_alg"), - eligibility_api_jws_signing_alg=self.get(cleaned_data, "eligibility_api_jws_signing_alg"), - eligibility_api_public_key=self.get(cleaned_data, "eligibility_api_public_key"), - ) - for k, v in needed.items(): - if self.has_field(k) and not v: - field_errors.update({k: ValidationError(f"{message}.")}) - elif not v: - non_field_errors.append(ValidationError(f"{message}: {k}")) - elif not cleaned_data.get("claims_request"): + eligibility_api_request = self.get(cleaned_data, "api_request") + claims_request = self.get(cleaned_data, "claims_request") + + if not (claims_request or eligibility_api_request): message = ( "Must configure either claims verification or Eligibility API verification before" + " adding to a transit agency." @@ -116,30 +119,12 @@ class SortableEnrollmentFlowAdmin(SortableAdminMixin, admin.ModelAdmin): list_display = ("label", "transit_agency", "supported_enrollment_methods") form = EnrollmentFlowForm - def get_exclude(self, request, obj=None): - fields = [] - - if not request.user.is_superuser: - fields.extend( - [ - "eligibility_api_auth_header", - "eligibility_api_auth_key_secret_name", - "eligibility_api_public_key", - "eligibility_api_jwe_cek_enc", - "eligibility_api_jwe_encryption_alg", - "eligibility_api_jws_signing_alg", - ] - ) - - return fields or super().get_exclude(request, obj) - def get_readonly_fields(self, request, obj=None): fields = [] if not request.user.is_superuser: fields.extend( [ - "eligibility_api_url", "selection_label_template_override", ] ) diff --git a/benefits/core/admin/transit.py b/benefits/core/admin/transit.py index b7e03220c5..01b8d157e5 100644 --- a/benefits/core/admin/transit.py +++ b/benefits/core/admin/transit.py @@ -6,6 +6,21 @@ from .users import is_staff_member_or_superuser +@admin.register(models.EligibilityApiConfig) +class EligibilityApiConfigAdmin(admin.ModelAdmin): + def has_add_permission(self, request): + if request.user and request.user.is_superuser: + return True + else: + return False + + def has_view_permission(self, request, *args, **kwargs): + if request.user and request.user.is_superuser: + return True + else: + return False + + @admin.register(models.TransitAgency) class TransitAgencyAdmin(admin.ModelAdmin): def get_exclude(self, request, obj=None): @@ -14,26 +29,12 @@ def get_exclude(self, request, obj=None): if not request.user.is_superuser: fields.extend( [ - "eligibility_api_private_key", - "eligibility_api_public_key", "sso_domain", ] ) return fields or super().get_exclude(request, obj) - def get_readonly_fields(self, request, obj=None): - fields = [] - - if not request.user.is_superuser: - fields.extend( - [ - "eligibility_api_id", - ] - ) - - return fields or super().get_readonly_fields(request, obj) - def has_add_permission(self, request): if settings.RUNTIME_ENVIRONMENT() != settings.RUNTIME_ENVS.PROD: return True diff --git a/benefits/core/migrations/0065_refactor_eligibilityapiverificationrequest.py b/benefits/core/migrations/0065_refactor_eligibilityapiverificationrequest.py new file mode 100644 index 0000000000..4a1178912c --- /dev/null +++ b/benefits/core/migrations/0065_refactor_eligibilityapiverificationrequest.py @@ -0,0 +1,205 @@ +# Generated by Django 5.2.7 on 2025-10-08 17:40 + +import benefits.core.models.common +import benefits.secrets +import django.db.models.deletion +from django.db import migrations, models + + +def migrate_agency_data(apps, schema_editor): + TransitAgency = apps.get_model("core", "TransitAgency") + EligibilityApiConfig = apps.get_model("core", "EligibilityApiConfig") + + agencies = ["cst", "mst", "sbmtd"] + + for agency in TransitAgency.objects.all(): + if agency.slug in agencies: + config = EligibilityApiConfig.objects.create( + api_id=agency.eligibility_api_id, + api_private_key=agency.eligibility_api_private_key, + api_public_key=agency.eligibility_api_public_key, + ) + config.save() + + agency.eligibility_api_config = config + agency.save() + + +def migrate_flow_data(apps, schema_editor): + EnrollmentFlow = apps.get_model("core", "EnrollmentFlow") + EligibilityApiVerificationRequest = apps.get_model("core", "EligibilityApiVerificationRequest") + + api_systems = ["agency_card", "courtesy_card", "mobility_pass"] + + for flow in EnrollmentFlow.objects.all(): + if flow.system_name in api_systems: + api_request = EligibilityApiVerificationRequest.objects.create( + label=flow.system_name, + api_url=flow.eligibility_api_url, + api_auth_header=flow.eligibility_api_auth_header, + api_auth_key_secret_name=flow.eligibility_api_auth_key_secret_name, + api_jwe_cek_enc=flow.eligibility_api_jwe_cek_enc, + api_jwe_encryption_alg=flow.eligibility_api_jwe_encryption_alg, + api_jws_signing_alg=flow.eligibility_api_jws_signing_alg, + api_public_key=flow.eligibility_api_public_key, + ) + api_request.save() + + flow.api_request = api_request + flow.save() + + +class Migration(migrations.Migration): + + dependencies = [ + ("core", "0064_transitagency_supported_card_schemes"), + ] + + operations = [ + migrations.CreateModel( + name="EligibilityApiVerificationRequest", + fields=[ + ("id", models.AutoField(primary_key=True, serialize=False)), + ( + "label", + models.SlugField(help_text="A human readable label, used as the display text in Admin."), + ), + ( + "api_url", + models.URLField(help_text="Fully qualified URL for an Eligibility API server."), + ), + ( + "api_auth_header", + models.CharField(help_text="The auth header to send in Eligibility API requests.", max_length=50), + ), + ( + "api_auth_key_secret_name", + benefits.core.models.common.SecretNameField( + help_text="The name of a secret containing the value of the auth header to send in Eligibility API requests.", # noqa: E501 + max_length=127, + validators=[benefits.secrets.SecretNameValidator()], + ), + ), + ( + "api_jwe_cek_enc", + models.CharField( + help_text="The JWE-compatible Content Encryption Key (CEK) key-length and mode to use in Eligibility API requests.", # noqa: E501 + max_length=50, + ), + ), + ( + "api_jwe_encryption_alg", + models.CharField( + help_text="The JWE-compatible encryption algorithm to use in Eligibility API requests.", + max_length=50, + ), + ), + ( + "api_jws_signing_alg", + models.CharField( + help_text="The JWS-compatible signing algorithm to use in Eligibility API requests.", + max_length=50, + ), + ), + ( + "api_public_key", + models.ForeignKey( + help_text="The public key used to encrypt Eligibility API requests and to verify signed Eligibility API responses.", # noqa: E501 + on_delete=django.db.models.deletion.PROTECT, + related_name="+", + to="core.pemdata", + ), + ), + ], + ), + migrations.AddField( + model_name="enrollmentflow", + name="api_request", + field=models.ForeignKey( + blank=True, + help_text="The Eligibility API request details for this flow.", + null=True, + on_delete=django.db.models.deletion.PROTECT, + to="core.eligibilityapiverificationrequest", + ), + ), + migrations.RunPython(migrate_flow_data), + migrations.RemoveField( + model_name="enrollmentflow", + name="eligibility_api_auth_header", + ), + migrations.RemoveField( + model_name="enrollmentflow", + name="eligibility_api_auth_key_secret_name", + ), + migrations.RemoveField( + model_name="enrollmentflow", + name="eligibility_api_jwe_cek_enc", + ), + migrations.RemoveField( + model_name="enrollmentflow", + name="eligibility_api_jwe_encryption_alg", + ), + migrations.RemoveField( + model_name="enrollmentflow", + name="eligibility_api_jws_signing_alg", + ), + migrations.RemoveField( + model_name="enrollmentflow", + name="eligibility_api_public_key", + ), + migrations.RemoveField( + model_name="enrollmentflow", + name="eligibility_api_url", + ), + migrations.CreateModel( + name="EligibilityApiConfig", + fields=[ + ("id", models.AutoField(primary_key=True, serialize=False)), + ("api_id", models.SlugField(help_text="The identifier for this agency used in Eligibility API calls.")), + ( + "api_private_key", + models.ForeignKey( + help_text="Private key used to sign Eligibility API tokens created on behalf of this Agency.", + on_delete=django.db.models.deletion.PROTECT, + related_name="+", + to="core.pemdata", + ), + ), + ( + "api_public_key", + models.ForeignKey( + help_text="Public key corresponding to the agency's private key, used by Eligibility Verification servers to encrypt responses.", # noqa: E501 + on_delete=django.db.models.deletion.PROTECT, + related_name="+", + to="core.pemdata", + ), + ), + ], + ), + migrations.AddField( + model_name="transitagency", + name="eligibility_api_config", + field=models.ForeignKey( + blank=True, + default=None, + help_text="The Eligibility API configuration for this transit agency.", + null=True, + on_delete=django.db.models.deletion.PROTECT, + to="core.eligibilityapiconfig", + ), + ), + migrations.RunPython(migrate_agency_data), + migrations.RemoveField( + model_name="transitagency", + name="eligibility_api_id", + ), + migrations.RemoveField( + model_name="transitagency", + name="eligibility_api_private_key", + ), + migrations.RemoveField( + model_name="transitagency", + name="eligibility_api_public_key", + ), + ] diff --git a/benefits/core/migrations/local_fixtures.json b/benefits/core/migrations/local_fixtures.json index bd7e549242..cc3c3aaa10 100644 --- a/benefits/core/migrations/local_fixtures.json +++ b/benefits/core/migrations/local_fixtures.json @@ -142,6 +142,15 @@ "private_key": 6 } }, + { + "model": "core.eligibilityapiconfig", + "pk": 1, + "fields": { + "api_id": "cst", + "api_private_key": 2, + "api_public_key": 3 + } + }, { "model": "core.transitagency", "pk": 1, @@ -152,9 +161,7 @@ "long_name": "California State Transit (local)", "info_url": "https://www.agency-website.com", "phone": "1-800-555-5555", - "eligibility_api_id": "cst", - "eligibility_api_private_key": 2, - "eligibility_api_public_key": 3, + "eligibility_api_config": 1, "staff_group": 2, "customer_service_group": 2, "logo_large": "agencies/cst-lg.png", @@ -211,6 +218,20 @@ "pk": 5, "fields": { "group_id": "30027174-bc59-40bb-935c-3bd9b1c26a63" } }, + { + "model": "core.eligibilityapiverificationrequest", + "pk": 1, + "fields": { + "label": "agency_card", + "api_url": "http://server:8000/verify", + "api_auth_header": "X-Server-API-Key", + "api_auth_key_secret_name": "agency-card-flow-api-auth-key", + "api_public_key": 1, + "api_jwe_cek_enc": "A256CBC-HS512", + "api_jwe_encryption_alg": "RSA-OAEP", + "api_jws_signing_alg": "RS256" + } + }, { "model": "core.enrollmentflow", "pk": 1, @@ -247,14 +268,8 @@ "fields": { "system_name": "agency_card", "label": "Agency Cardholder", + "api_request": 1, "display_order": 4, - "eligibility_api_url": "http://server:8000/verify", - "eligibility_api_auth_header": "X-Server-API-Key", - "eligibility_api_auth_key_secret_name": "agency-card-flow-api-auth-key", - "eligibility_api_public_key": 1, - "eligibility_api_jwe_cek_enc": "A256CBC-HS512", - "eligibility_api_jwe_encryption_alg": "RSA-OAEP", - "eligibility_api_jws_signing_alg": "RS256", "supported_enrollment_methods": ["digital"], "transit_agency": 1 } diff --git a/benefits/core/models/__init__.py b/benefits/core/models/__init__.py index 497e2356b7..ac02fa4533 100644 --- a/benefits/core/models/__init__.py +++ b/benefits/core/models/__init__.py @@ -4,9 +4,10 @@ agency_logo_small, CardSchemes, TransitProcessorConfig, + EligibilityApiConfig, TransitAgency, ) -from .enrollment import EnrollmentMethods, EnrollmentFlow, EnrollmentGroup, EnrollmentEvent +from .enrollment import EnrollmentMethods, EligibilityApiVerificationRequest, EnrollmentFlow, EnrollmentGroup, EnrollmentEvent __all__ = [ "agency_logo_large", @@ -15,11 +16,13 @@ "CardSchemes", "Environment", "EnrollmentMethods", + "EligibilityApiVerificationRequest", "EnrollmentFlow", "EnrollmentGroup", "EnrollmentEvent", "PemData", "SecretNameField", + "EligibilityApiConfig", "TransitAgency", "TransitProcessorConfig", ] diff --git a/benefits/core/models/enrollment.py b/benefits/core/models/enrollment.py index 7fe869300b..4b671ac6c1 100644 --- a/benefits/core/models/enrollment.py +++ b/benefits/core/models/enrollment.py @@ -28,6 +28,55 @@ class EnrollmentMethods: ) +class EligibilityApiVerificationRequest(models.Model): + """Represents configuration for eligibility verification via Eligibility API calls.""" + + id = models.AutoField(primary_key=True) + label = models.SlugField( + help_text="A human readable label, used as the display text in Admin.", + ) + api_url = models.URLField(help_text="Fully qualified URL for an Eligibility API server.") + api_auth_header = models.CharField( + help_text="The auth header to send in Eligibility API requests.", + max_length=50, + ) + api_auth_key_secret_name = SecretNameField( + help_text="The name of a secret containing the value of the auth header to send in Eligibility API requests.", + ) + api_public_key = models.ForeignKey( + PemData, + related_name="+", + on_delete=models.PROTECT, + help_text="The public key used to encrypt Eligibility API requests and to verify signed Eligibility API responses.", + ) + api_jwe_cek_enc = models.CharField( + help_text="The JWE-compatible Content Encryption Key (CEK) key-length and mode to use in Eligibility API requests.", + max_length=50, + ) + api_jwe_encryption_alg = models.CharField( + help_text="The JWE-compatible encryption algorithm to use in Eligibility API requests.", + max_length=50, + ) + api_jws_signing_alg = models.CharField( + help_text="The JWS-compatible signing algorithm to use in Eligibility API requests.", + max_length=50, + ) + + def __str__(self): + return self.label + + @property + def api_auth_key(self): + """The Eligibility API auth key as a string.""" + secret_field = self._meta.get_field("api_auth_key_secret_name") + return secret_field.secret_value(self) + + @property + def api_public_key_data(self): + """The Eligibility API public key as a string.""" + return self.api_public_key.data + + class EnrollmentFlow(models.Model): """Represents a user journey through the Benefits app for a single eligibility type.""" @@ -65,41 +114,12 @@ class EnrollmentFlow(models.Model): blank=True, help_text="The claims request details for this flow.", ) - eligibility_api_url = models.TextField( - blank=True, default="", help_text="Fully qualified URL for an Eligibility API server used by this flow." - ) - eligibility_api_auth_header = models.TextField( - blank=True, - default="", - help_text="The auth header to send in Eligibility API requests for this flow.", - ) - eligibility_api_auth_key_secret_name = SecretNameField( - blank=True, - default="", - help_text="The name of a secret containing the value of the auth header to send in Eligibility API requests for this flow.", # noqa: 501 - ) - eligibility_api_public_key = models.ForeignKey( - PemData, - related_name="+", + api_request = models.ForeignKey( + EligibilityApiVerificationRequest, on_delete=models.PROTECT, null=True, blank=True, - help_text="The public key used to encrypt Eligibility API requests and to verify signed Eligibility API responses for this flow.", # noqa: E501 - ) - eligibility_api_jwe_cek_enc = models.TextField( - blank=True, - default="", - help_text="The JWE-compatible Content Encryption Key (CEK) key-length and mode to use in Eligibility API requests for this flow.", # noqa: E501 - ) - eligibility_api_jwe_encryption_alg = models.TextField( - blank=True, - default="", - help_text="The JWE-compatible encryption algorithm to use in Eligibility API requests for this flow.", - ) - eligibility_api_jws_signing_alg = models.TextField( - blank=True, - default="", - help_text="The JWS-compatible signing algorithm to use in Eligibility API requests for this flow.", + help_text="The Eligibility API request details for this flow.", ) selection_label_template_override = models.TextField( blank=True, @@ -150,16 +170,18 @@ def agency_card_name(self): @property def eligibility_api_auth_key(self): - if self.eligibility_api_auth_key_secret_name is not None: - secret_field = self._meta.get_field("eligibility_api_auth_key_secret_name") - return secret_field.secret_value(self) + if self.uses_api_verification: + return self.api_request.api_auth_key else: return None @property def eligibility_api_public_key_data(self): """This flow's Eligibility API public key as a string.""" - return self.eligibility_api_public_key.data + if self.uses_api_verification: + return self.api_request.api_public_key_data + else: + return None @property def selection_label_template(self): @@ -188,11 +210,14 @@ def uses_claims_verification(self): @property def uses_api_verification(self): """True if this flow verifies via the Eligibility API. False otherwise.""" - return bool(self.eligibility_api_url) + return self.api_request is not None @property def claims_scheme(self): - return self.claims_request.scheme or self.oauth_config.scheme + if self.uses_claims_verification: + return self.claims_request.scheme or self.oauth_config.scheme + else: + return None @property def eligibility_verifier(self): @@ -202,8 +227,10 @@ def eligibility_verifier(self): """ if self.uses_claims_verification: return self.oauth_config.client_name + elif self.uses_api_verification: + return self.api_request.api_url else: - return self.eligibility_api_url + return "undefined" @property def enrollment_index_context(self): diff --git a/benefits/core/models/transit.py b/benefits/core/models/transit.py index 461c9dd344..7a802ada1e 100644 --- a/benefits/core/models/transit.py +++ b/benefits/core/models/transit.py @@ -69,6 +69,30 @@ def __str__(self): return f"({environment_label}) {agency_slug}" +class EligibilityApiConfig(models.Model): + """Per-agency configuration for Eligibility Server integrations via the Eligibility API.""" + + id = models.AutoField(primary_key=True) + api_id = models.SlugField( + help_text="The identifier for this agency used in Eligibility API calls.", + ) + api_private_key = models.ForeignKey( + PemData, + related_name="+", + on_delete=models.PROTECT, + help_text="Private key used to sign Eligibility API tokens created on behalf of this Agency.", + ) + api_public_key = models.ForeignKey( + PemData, + related_name="+", + on_delete=models.PROTECT, + help_text="Public key corresponding to the agency's private key, used by Eligibility Verification servers to encrypt responses.", # noqa: E501 + ) + + def __str__(self): + return self.api_id + + class TransitAgency(models.Model): """An agency offering transit service.""" @@ -102,28 +126,13 @@ class Meta: default=[CardSchemes.VISA, CardSchemes.MASTERCARD], help_text="The contactless card schemes this agency supports.", ) - eligibility_api_id = models.TextField( - help_text="The identifier for this agency used in Eligibility API calls.", - blank=True, - default="", - ) - eligibility_api_private_key = models.ForeignKey( - PemData, - related_name="+", + eligibility_api_config = models.ForeignKey( + EligibilityApiConfig, on_delete=models.PROTECT, - help_text="Private key used to sign Eligibility API tokens created on behalf of this Agency.", - null=True, - blank=True, - default=None, - ) - eligibility_api_public_key = models.ForeignKey( - PemData, - related_name="+", - on_delete=models.PROTECT, - help_text="Public key corresponding to the agency's private key, used by Eligibility Verification servers to encrypt responses.", # noqa: E501 null=True, blank=True, default=None, + help_text="The Eligibility API configuration for this transit agency.", ) staff_group = models.OneToOneField( Group, @@ -181,12 +190,16 @@ def eligibility_index_url(self): @property def eligibility_api_private_key_data(self): """This Agency's private key as a string.""" - return self.eligibility_api_private_key.data + if self.eligibility_api_config: + return self.eligibility_api_config.api_private_key.data + return None @property def eligibility_api_public_key_data(self): """This Agency's public key as a string.""" - return self.eligibility_api_public_key.data + if self.eligibility_api_config: + return self.eligibility_api_config.api_public_key.data + return None @property def littlepay_config(self): diff --git a/benefits/core/views.py b/benefits/core/views.py index d1b43297dd..295650a591 100644 --- a/benefits/core/views.py +++ b/benefits/core/views.py @@ -62,7 +62,7 @@ def agency_card(request, agency: models.TransitAgency): session.reset(request) session.update(request, agency=agency, origin=agency.index_url) - eligibility_api_flow = agency.enrollment_flows.exclude(eligibility_api_url="").order_by("id").last() + eligibility_api_flow = agency.enrollment_flows.exclude(api_request=None).order_by("id").last() if eligibility_api_flow: session.update(request, flow=eligibility_api_flow) diff --git a/benefits/eligibility/verify.py b/benefits/eligibility/verify.py index 6840f8f5b0..5f10227414 100644 --- a/benefits/eligibility/verify.py +++ b/benefits/eligibility/verify.py @@ -9,15 +9,15 @@ def eligibility_from_api(flow: models.EnrollmentFlow, form, agency: models.Trans sub, name = form.cleaned_data.get("sub"), form.cleaned_data.get("name") client = Client( - verify_url=flow.eligibility_api_url, - headers={flow.eligibility_api_auth_header: flow.eligibility_api_auth_key}, + verify_url=flow.api_request.api_url, + headers={flow.api_request.api_auth_header: flow.api_request.api_auth_key}, issuer=settings.ALLOWED_HOSTS[0], - agency=agency.eligibility_api_id, - jws_signing_alg=flow.eligibility_api_jws_signing_alg, + agency=agency.eligibility_api_config.api_id, + jws_signing_alg=flow.api_request.api_jws_signing_alg, client_private_key=agency.eligibility_api_private_key_data, - jwe_encryption_alg=flow.eligibility_api_jwe_encryption_alg, - jwe_cek_enc=flow.eligibility_api_jwe_cek_enc, - server_public_key=flow.eligibility_api_public_key_data, + jwe_encryption_alg=flow.api_request.api_jwe_encryption_alg, + jwe_cek_enc=flow.api_request.api_jwe_cek_enc, + server_public_key=flow.api_request.api_public_key_data, timeout=settings.REQUESTS_TIMEOUT, ) diff --git a/benefits/oauth/middleware.py b/benefits/oauth/middleware.py index 3277486a64..1fb3bd905a 100644 --- a/benefits/oauth/middleware.py +++ b/benefits/oauth/middleware.py @@ -27,7 +27,7 @@ def process_request(self, request): if flow.uses_claims_verification: # all good, the chosen flow is configured correctly return None - elif not (flow.eligibility_api_url): + elif not (flow.uses_api_verification): # the chosen flow doesn't have Eligibility API config OR claims provider config # this is likely a misconfiguration on the backend, not a user error message = f"Flow with no API or claims config: {flow.system_name} (id={flow.id})" diff --git a/tests/pytest/conftest.py b/tests/pytest/conftest.py index ebba3a419a..d81e3076ba 100644 --- a/tests/pytest/conftest.py +++ b/tests/pytest/conftest.py @@ -11,11 +11,13 @@ from benefits.core import session from benefits.core.models import ( + EligibilityApiVerificationRequest, EnrollmentFlow, PemData, TransitAgency, Environment, ) +from benefits.core.models.transit import EligibilityApiConfig from benefits.enrollment_littlepay.models import LittlepayConfig, LittlepayGroup from benefits.enrollment_switchio.models import SwitchioConfig, SwitchioGroup @@ -102,6 +104,22 @@ def model_ClaimsVerificationRequest(): return claims_verification_request +@pytest.fixture +def model_EligibilityApiVerificationRequest(model_PemData): + api_request = EligibilityApiVerificationRequest.objects.create( + label="agency_card", + api_auth_header="X-API-AUTH", + api_auth_key_secret_name="secret-key", + api_jwe_cek_enc="cek-enc", + api_jwe_encryption_alg="alg", + api_jws_signing_alg="alg", + api_public_key=model_PemData, + api_url="https://example.com/verify", + ) + + return api_request + + @pytest.fixture def model_EnrollmentFlow(model_TransitAgency): flow = EnrollmentFlow.objects.create( @@ -131,15 +149,9 @@ def model_SwitchioGroup(model_EnrollmentFlow): @pytest.fixture -def model_EnrollmentFlow_with_eligibility_api(model_EnrollmentFlow, model_PemData): +def model_EnrollmentFlow_with_eligibility_api(model_EnrollmentFlow, model_EligibilityApiVerificationRequest): model_EnrollmentFlow.system_name = "agency_card" - model_EnrollmentFlow.eligibility_api_auth_header = "X-API-AUTH" - model_EnrollmentFlow.eligibility_api_auth_key_secret_name = "secret-key" - model_EnrollmentFlow.eligibility_api_jwe_cek_enc = "cek-enc" - model_EnrollmentFlow.eligibility_api_jwe_encryption_alg = "alg" - model_EnrollmentFlow.eligibility_api_jws_signing_alg = "alg" - model_EnrollmentFlow.eligibility_api_public_key = model_PemData - model_EnrollmentFlow.eligibility_api_url = "https://example.com/verify" + model_EnrollmentFlow.api_request = model_EligibilityApiVerificationRequest model_EnrollmentFlow.save() return model_EnrollmentFlow @@ -222,7 +234,14 @@ def model_SwitchioConfig(model_PemData, model_TransitAgency): @pytest.fixture -def model_TransitAgency(model_PemData): +def model_EligibilityApiConfig(model_PemData): + config = EligibilityApiConfig.objects.create(api_id="test123", api_private_key=model_PemData, api_public_key=model_PemData) + + return config + + +@pytest.fixture +def model_TransitAgency(model_EligibilityApiConfig): agency = TransitAgency.objects.create( slug="cst", short_name="TEST", @@ -230,9 +249,7 @@ def model_TransitAgency(model_PemData): info_url="https://example.com/test-agency", phone="800-555-5555", active=True, - eligibility_api_id="test123", - eligibility_api_private_key=model_PemData, - eligibility_api_public_key=model_PemData, + eligibility_api_config=model_EligibilityApiConfig, logo_large="agencies/cst-lg.png", logo_small="agencies/cst-sm.png", ) diff --git a/tests/pytest/core/admin/test_enrollment.py b/tests/pytest/core/admin/test_enrollment.py index c6768c4471..51474915e3 100644 --- a/tests/pytest/core/admin/test_enrollment.py +++ b/tests/pytest/core/admin/test_enrollment.py @@ -90,38 +90,6 @@ class TestEnrollmentFlowAdmin: ( "staff", [ - "eligibility_api_auth_header", - "eligibility_api_auth_key_secret_name", - "eligibility_api_public_key", - "eligibility_api_jwe_cek_enc", - "eligibility_api_jwe_encryption_alg", - "eligibility_api_jws_signing_alg", - ], - ), - ("super", None), - ], - ) - def test_get_exclude(self, admin_user_request, flow_admin_model, user_type, expected): - if expected: - model_fields = [f.name for f in flow_admin_model.model._meta.get_fields()] - assert all(field in model_fields for field in expected) - - request = admin_user_request(user_type) - - excluded = flow_admin_model.get_exclude(request) - - if expected: - assert set(excluded) == set(expected) - else: - assert excluded is None - - @pytest.mark.parametrize( - "user_type,expected", - [ - ( - "staff", - [ - "eligibility_api_url", "selection_label_template_override", ], ), @@ -209,11 +177,8 @@ def test_EnrollmentFlowForm_staff_member_with_transit_agency( ) assert not form.is_valid() - def test_EnrollmentFlowForm_clean_eligibility_api_verification( - self, - admin_user_request, - flow_admin_model, - model_TransitAgency, + def test_EnrollmentFlowForm_clean_no_request_config( + self, admin_user_request, flow_admin_model, model_TransitAgency, model_EligibilityApiVerificationRequest ): model_TransitAgency.slug = "cst" # use value that will map to existing templates model_TransitAgency.save() @@ -221,39 +186,31 @@ def test_EnrollmentFlowForm_clean_eligibility_api_verification( request = admin_user_request("super") # fill out the form without a transit agency - request.POST = dict( + post_data = dict( system_name="senior", # use value that will map to existing templates supported_enrollment_methods=[models.EnrollmentMethods.DIGITAL, models.EnrollmentMethods.IN_PERSON], - eligibility_api_url="http://server:8000/verify", - eligibility_api_auth_header="", - eligibility_api_auth_key_secret_name="", - eligibility_api_jwe_cek_enc="", - eligibility_api_jwe_encryption_alg="", - eligibility_api_jws_signing_alg="", - eligibility_api_public_key=None, ) + request.POST = post_data form_class = flow_admin_model.get_form(request) - form = form_class(request.POST) # clean is OK assert not form.errors assert form.is_valid() - # reassign agency - request.POST.update(dict(transit_agency=model_TransitAgency.id)) + # assign agency + post_data.update(dict(transit_agency=model_TransitAgency.id)) + request.POST = post_data form = form_class(request.POST) assert not form.is_valid() error_dict = form.errors - assert "eligibility_api_auth_header" in error_dict - assert "eligibility_api_auth_key_secret_name" in error_dict - assert "eligibility_api_jwe_cek_enc" in error_dict - assert "eligibility_api_jwe_encryption_alg" in error_dict - assert "eligibility_api_jws_signing_alg" in error_dict - assert "eligibility_api_public_key" in error_dict + assert ( + "Must configure either claims verification or Eligibility API verification before adding to a transit agency." + in error_dict["__all__"] + ) def test_EnrollmentFlowForm_clean_supports_expiration_superuser( self, diff --git a/tests/pytest/core/admin/test_transit.py b/tests/pytest/core/admin/test_transit.py index 79548ee473..05949f4bff 100644 --- a/tests/pytest/core/admin/test_transit.py +++ b/tests/pytest/core/admin/test_transit.py @@ -3,16 +3,35 @@ from django.contrib import admin from benefits.core import models -from benefits.core.admin.transit import TransitAgencyAdmin +from benefits.core.admin.transit import EligibilityApiConfigAdmin, TransitAgencyAdmin -@pytest.fixture -def agency_admin_model(): - return TransitAgencyAdmin(models.TransitAgency, admin.site) +@pytest.mark.django_db +class TestEligibilityApiConfigAdmin: + @pytest.fixture(autouse=True) + def init(self): + self.model = EligibilityApiConfigAdmin(models.EligibilityApiConfig, admin.site) + + @pytest.mark.parametrize("user_type,expected", [("staff", False), ("super", True)]) + def test_has_add_permissions(self, admin_user_request, user_type, expected): + request = admin_user_request(user_type) + + add_permissions = self.model.has_add_permission(request) + assert add_permissions == expected + + @pytest.mark.parametrize("user_type,expected", [("staff", False), ("super", True)]) + def test_has_view_permissions(self, admin_user_request, user_type, expected): + request = admin_user_request(user_type) + + add_permissions = self.model.has_view_permission(request) + assert add_permissions == expected @pytest.mark.django_db class TestTransitAgencyAdmin: + @pytest.fixture(autouse=True) + def init(self): + self.model = TransitAgencyAdmin(models.TransitAgency, admin.site) @pytest.mark.parametrize( "user_type,expected", @@ -20,49 +39,26 @@ class TestTransitAgencyAdmin: ( "staff", [ - "eligibility_api_private_key", - "eligibility_api_public_key", "sso_domain", ], ), ("super", ()), ], ) - def test_get_exclude(self, admin_user_request, agency_admin_model, user_type, expected): + def test_get_exclude(self, admin_user_request, user_type, expected): if expected: - model_fields = [f.name for f in agency_admin_model.model._meta.get_fields()] + model_fields = [f.name for f in self.model.model._meta.get_fields()] assert all(field in model_fields for field in expected) request = admin_user_request(user_type) - excluded = agency_admin_model.get_exclude(request) + excluded = self.model.get_exclude(request) if expected: assert set(excluded) == set(expected) else: assert excluded is None - @pytest.mark.parametrize( - "user_type,expected", - [ - ( - "staff", - ["eligibility_api_id"], - ), - ("super", ()), - ], - ) - def test_get_readonly_fields(self, admin_user_request, agency_admin_model, user_type, expected): - if expected: - model_fields = [f.name for f in agency_admin_model.model._meta.get_fields()] - assert all(field in model_fields for field in expected) - - request = admin_user_request(user_type) - - readonly = agency_admin_model.get_readonly_fields(request) - - assert set(readonly) == set(expected) - @pytest.mark.parametrize( "runtime_env,user_type,expected", [ @@ -72,9 +68,9 @@ def test_get_readonly_fields(self, admin_user_request, agency_admin_model, user_ (settings.RUNTIME_ENVS.DEV, "super", True), ], ) - def test_has_add_permission(self, admin_user_request, agency_admin_model, settings, runtime_env, user_type, expected): + def test_has_add_permission(self, admin_user_request, settings, runtime_env, user_type, expected): settings.RUNTIME_ENVIRONMENT = lambda: runtime_env request = admin_user_request(user_type) - assert agency_admin_model.has_add_permission(request) == expected + assert self.model.has_add_permission(request) == expected diff --git a/tests/pytest/core/models/test_enrollment.py b/tests/pytest/core/models/test_enrollment.py index ae21cea985..d0a042e5a3 100644 --- a/tests/pytest/core/models/test_enrollment.py +++ b/tests/pytest/core/models/test_enrollment.py @@ -5,7 +5,23 @@ import pytest -from benefits.core.models import EnrollmentFlow, EnrollmentEvent, EnrollmentMethods +from benefits.core.models import EligibilityApiVerificationRequest, EnrollmentFlow, EnrollmentEvent, EnrollmentMethods + + +@pytest.mark.django_db +class TestEligibilityApiVerificationRequest: + @pytest.fixture(autouse=True) + def init(self, model_EligibilityApiVerificationRequest: EligibilityApiVerificationRequest): + self.model = model_EligibilityApiVerificationRequest + + def test_api_auth_key(self, mock_field_secret_value): + mock_field = mock_field_secret_value(self.model, "api_auth_key_secret_name") + + assert self.model.api_auth_key == mock_field.secret_value.return_value + mock_field.secret_value.assert_called_once_with(self.model) + + def test_api_public_key_data(self, model_PemData): + assert self.model.api_public_key_data == model_PemData.data @pytest.mark.django_db @@ -77,11 +93,19 @@ def test_EnrollmentFlow_no_scope_and_claim_no_sign_out(model_EnrollmentFlow): @pytest.mark.django_db -def test_EnrollmentFlow_eligibility_api_auth_key(mock_field_secret_value, model_EnrollmentFlow_with_eligibility_api): - mock_field = mock_field_secret_value(model_EnrollmentFlow_with_eligibility_api, "eligibility_api_auth_key_secret_name") +def test_EnrollmentFlow_eligibility_api_auth_key(model_EnrollmentFlow_with_eligibility_api): + assert ( + model_EnrollmentFlow_with_eligibility_api.eligibility_api_auth_key + == model_EnrollmentFlow_with_eligibility_api.api_request.api_auth_key + ) + - assert model_EnrollmentFlow_with_eligibility_api.eligibility_api_auth_key == mock_field.secret_value.return_value - mock_field.secret_value.assert_called_once_with(model_EnrollmentFlow_with_eligibility_api) +@pytest.mark.django_db +def test_EnrollmentFlow_eligibility_api_public_key_data(model_EnrollmentFlow_with_eligibility_api): + assert ( + model_EnrollmentFlow_with_eligibility_api.eligibility_api_public_key_data + == model_EnrollmentFlow_with_eligibility_api.api_request.api_public_key_data + ) @pytest.mark.django_db diff --git a/tests/pytest/oauth/test_middleware.py b/tests/pytest/oauth/test_middleware.py index e9d152b161..380f52b5d7 100644 --- a/tests/pytest/oauth/test_middleware.py +++ b/tests/pytest/oauth/test_middleware.py @@ -44,7 +44,6 @@ def test_flow_using_claims_verification_required__no_identitygatewayconfig(app_r @pytest.mark.django_db -@pytest.mark.parametrize("api_url", [None, ""]) def test_flow_using_claims_verification_required__misconfigured_flow( app_request, mocked_view, @@ -52,10 +51,9 @@ def test_flow_using_claims_verification_required__misconfigured_flow( mocked_session_flow_does_not_use_claims_verification, mocked_analytics_module, mocked_sentry_sdk_module, - api_url, ): # fake a misconfigured flow - mocked_session_flow_does_not_use_claims_verification.return_value.eligibility_api_url = api_url + mocked_session_flow_does_not_use_claims_verification.return_value.api_request = None response = decorated_view(app_request)