Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 21 additions & 36 deletions benefits/core/admin/enrollment.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,23 @@ def has_view_permission(self, request: HttpRequest, obj=None):
return False


@admin.register(models.EligibilityApiVerificationRequest)
class EligibilityApiVerificationRequestAdmin(admin.ModelAdmin):
list_display = ("label", "api_url")

def has_view_permission(self, request: HttpRequest, obj=None):
if request.user and request.user.is_superuser:
return True
else:
return False

def has_add_permission(self, request: HttpRequest, obj=None):
if request.user and request.user.is_superuser:
return True
else:
return False


class EnrollmentFlowForm(forms.ModelForm):
def has_field(self, field_name):
return self.fields.get(field_name) is not None
Expand Down Expand Up @@ -81,24 +98,10 @@ def clean(self):

if transit_agency:
# these fields might not be on the form, so use helper method to correctly get the value
eligibility_api_url = self.get(cleaned_data, "eligibility_api_url")

if eligibility_api_url:
message = "Required for Eligibility API verification."
needed = dict(
eligibility_api_auth_header=self.get(cleaned_data, "eligibility_api_auth_header"),
eligibility_api_auth_key_secret_name=self.get(cleaned_data, "eligibility_api_auth_key_secret_name"),
eligibility_api_jwe_cek_enc=self.get(cleaned_data, "eligibility_api_jwe_cek_enc"),
eligibility_api_jwe_encryption_alg=self.get(cleaned_data, "eligibility_api_jwe_encryption_alg"),
eligibility_api_jws_signing_alg=self.get(cleaned_data, "eligibility_api_jws_signing_alg"),
eligibility_api_public_key=self.get(cleaned_data, "eligibility_api_public_key"),
)
for k, v in needed.items():
if self.has_field(k) and not v:
field_errors.update({k: ValidationError(f"{message}.")})
elif not v:
non_field_errors.append(ValidationError(f"{message}: {k}"))
elif not cleaned_data.get("claims_request"):
eligibility_api_request = self.get(cleaned_data, "api_request")
claims_request = self.get(cleaned_data, "claims_request")

if not (claims_request or eligibility_api_request):
message = (
"Must configure either claims verification or Eligibility API verification before"
+ " adding to a transit agency."
Expand All @@ -116,30 +119,12 @@ class SortableEnrollmentFlowAdmin(SortableAdminMixin, admin.ModelAdmin):
list_display = ("label", "transit_agency", "supported_enrollment_methods")
form = EnrollmentFlowForm

def get_exclude(self, request, obj=None):
fields = []

if not request.user.is_superuser:
fields.extend(
[
"eligibility_api_auth_header",
"eligibility_api_auth_key_secret_name",
"eligibility_api_public_key",
"eligibility_api_jwe_cek_enc",
"eligibility_api_jwe_encryption_alg",
"eligibility_api_jws_signing_alg",
]
)

return fields or super().get_exclude(request, obj)

def get_readonly_fields(self, request, obj=None):
fields = []

if not request.user.is_superuser:
fields.extend(
[
"eligibility_api_url",
"selection_label_template_override",
]
)
Expand Down
29 changes: 15 additions & 14 deletions benefits/core/admin/transit.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,21 @@
from .users import is_staff_member_or_superuser


@admin.register(models.EligibilityApiConfig)
class EligibilityApiConfigAdmin(admin.ModelAdmin):
def has_add_permission(self, request):
if request.user and request.user.is_superuser:
return True
else:
return False

def has_view_permission(self, request, *args, **kwargs):
if request.user and request.user.is_superuser:
return True
else:
return False


@admin.register(models.TransitAgency)
class TransitAgencyAdmin(admin.ModelAdmin):
def get_exclude(self, request, obj=None):
Expand All @@ -14,26 +29,12 @@ def get_exclude(self, request, obj=None):
if not request.user.is_superuser:
fields.extend(
[
"eligibility_api_private_key",
"eligibility_api_public_key",
"sso_domain",
]
)

return fields or super().get_exclude(request, obj)

def get_readonly_fields(self, request, obj=None):
fields = []

if not request.user.is_superuser:
fields.extend(
[
"eligibility_api_id",
]
)

return fields or super().get_readonly_fields(request, obj)

def has_add_permission(self, request):
if settings.RUNTIME_ENVIRONMENT() != settings.RUNTIME_ENVS.PROD:
return True
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
# Generated by Django 5.2.7 on 2025-10-08 17:40

import benefits.core.models.common
import benefits.secrets
import django.db.models.deletion
from django.db import migrations, models


def migrate_agency_data(apps, schema_editor):
TransitAgency = apps.get_model("core", "TransitAgency")
EligibilityApiConfig = apps.get_model("core", "EligibilityApiConfig")

agencies = ["cst", "mst", "sbmtd"]

for agency in TransitAgency.objects.all():
if agency.slug in agencies:
config = EligibilityApiConfig.objects.create(
api_id=agency.eligibility_api_id,
api_private_key=agency.eligibility_api_private_key,
api_public_key=agency.eligibility_api_public_key,
)
config.save()

agency.eligibility_api_config = config
agency.save()


def migrate_flow_data(apps, schema_editor):
EnrollmentFlow = apps.get_model("core", "EnrollmentFlow")
EligibilityApiVerificationRequest = apps.get_model("core", "EligibilityApiVerificationRequest")

api_systems = ["agency_card", "courtesy_card", "mobility_pass"]

for flow in EnrollmentFlow.objects.all():
if flow.system_name in api_systems:
api_request = EligibilityApiVerificationRequest.objects.create(
label=flow.system_name,
api_url=flow.eligibility_api_url,
api_auth_header=flow.eligibility_api_auth_header,
api_auth_key_secret_name=flow.eligibility_api_auth_key_secret_name,
api_jwe_cek_enc=flow.eligibility_api_jwe_cek_enc,
api_jwe_encryption_alg=flow.eligibility_api_jwe_encryption_alg,
api_jws_signing_alg=flow.eligibility_api_jws_signing_alg,
api_public_key=flow.eligibility_api_public_key,
)
api_request.save()

flow.api_request = api_request
flow.save()


class Migration(migrations.Migration):

dependencies = [
("core", "0064_transitagency_supported_card_schemes"),
]

operations = [
migrations.CreateModel(
name="EligibilityApiVerificationRequest",
fields=[
("id", models.AutoField(primary_key=True, serialize=False)),
(
"label",
models.SlugField(help_text="A human readable label, used as the display text in Admin."),
),
(
"api_url",
models.URLField(help_text="Fully qualified URL for an Eligibility API server."),
),
(
"api_auth_header",
models.CharField(help_text="The auth header to send in Eligibility API requests.", max_length=50),
),
(
"api_auth_key_secret_name",
benefits.core.models.common.SecretNameField(
help_text="The name of a secret containing the value of the auth header to send in Eligibility API requests.", # noqa: E501
max_length=127,
validators=[benefits.secrets.SecretNameValidator()],
),
),
(
"api_jwe_cek_enc",
models.CharField(
help_text="The JWE-compatible Content Encryption Key (CEK) key-length and mode to use in Eligibility API requests.", # noqa: E501
max_length=50,
),
),
(
"api_jwe_encryption_alg",
models.CharField(
help_text="The JWE-compatible encryption algorithm to use in Eligibility API requests.",
max_length=50,
),
),
(
"api_jws_signing_alg",
models.CharField(
help_text="The JWS-compatible signing algorithm to use in Eligibility API requests.",
max_length=50,
),
),
(
"api_public_key",
models.ForeignKey(
help_text="The public key used to encrypt Eligibility API requests and to verify signed Eligibility API responses.", # noqa: E501
on_delete=django.db.models.deletion.PROTECT,
related_name="+",
to="core.pemdata",
),
),
],
),
migrations.AddField(
model_name="enrollmentflow",
name="api_request",
field=models.ForeignKey(
blank=True,
help_text="The Eligibility API request details for this flow.",
null=True,
on_delete=django.db.models.deletion.PROTECT,
to="core.eligibilityapiverificationrequest",
),
),
migrations.RunPython(migrate_flow_data),
migrations.RemoveField(
model_name="enrollmentflow",
name="eligibility_api_auth_header",
),
migrations.RemoveField(
model_name="enrollmentflow",
name="eligibility_api_auth_key_secret_name",
),
migrations.RemoveField(
model_name="enrollmentflow",
name="eligibility_api_jwe_cek_enc",
),
migrations.RemoveField(
model_name="enrollmentflow",
name="eligibility_api_jwe_encryption_alg",
),
migrations.RemoveField(
model_name="enrollmentflow",
name="eligibility_api_jws_signing_alg",
),
migrations.RemoveField(
model_name="enrollmentflow",
name="eligibility_api_public_key",
),
migrations.RemoveField(
model_name="enrollmentflow",
name="eligibility_api_url",
),
migrations.CreateModel(
name="EligibilityApiConfig",
fields=[
("id", models.AutoField(primary_key=True, serialize=False)),
("api_id", models.SlugField(help_text="The identifier for this agency used in Eligibility API calls.")),
(
"api_private_key",
models.ForeignKey(
help_text="Private key used to sign Eligibility API tokens created on behalf of this Agency.",
on_delete=django.db.models.deletion.PROTECT,
related_name="+",
to="core.pemdata",
),
),
(
"api_public_key",
models.ForeignKey(
help_text="Public key corresponding to the agency's private key, used by Eligibility Verification servers to encrypt responses.", # noqa: E501
on_delete=django.db.models.deletion.PROTECT,
related_name="+",
to="core.pemdata",
),
),
],
),
migrations.AddField(
model_name="transitagency",
name="eligibility_api_config",
field=models.ForeignKey(
blank=True,
default=None,
help_text="The Eligibility API configuration for this transit agency.",
null=True,
on_delete=django.db.models.deletion.PROTECT,
to="core.eligibilityapiconfig",
),
),
migrations.RunPython(migrate_agency_data),
migrations.RemoveField(
model_name="transitagency",
name="eligibility_api_id",
),
migrations.RemoveField(
model_name="transitagency",
name="eligibility_api_private_key",
),
migrations.RemoveField(
model_name="transitagency",
name="eligibility_api_public_key",
),
]
Loading
Loading