diff --git a/openwisp_controller/config/admin.py b/openwisp_controller/config/admin.py index b78d048f6..4aa6a5782 100644 --- a/openwisp_controller/config/admin.py +++ b/openwisp_controller/config/admin.py @@ -49,6 +49,9 @@ logger = logging.getLogger(__name__) prefix = "config/" +uuid_regex = ( + r"[0-9a-fA-F]{8}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{12}" +) Config = load_model("config", "Config") Device = load_model("config", "Device") DeviceGroup = load_model("config", "DeviceGroup") @@ -166,9 +169,19 @@ def change_view(self, request, object_id, form_url="", extra_context=None): def get_urls(self): options = getattr(self.model, "_meta") url_prefix = "{0}_{1}".format(options.app_label, options.model_name) - return [ + default_urls = super().get_urls() + safe_urls = [] + for url in default_urls: + if url.name and not ( + url.name.endswith("_change") + or url.name.endswith("_history") + or url.name.endswith("_delete") + ): + safe_urls.append(url) + strict_urls = [ + # custom app URLs re_path( - r"^download/(?P[^/]+)/$", + rf"^download/(?P{uuid_regex})/$", self.admin_site.admin_view(self.download_view), name="{0}_download".format(url_prefix), ), @@ -178,11 +191,28 @@ def get_urls(self): name="{0}_preview".format(url_prefix), ), re_path( - r"^(?P[^/]+)/context\.json$", + rf"^(?P{uuid_regex})/context\.json$", self.admin_site.admin_view(self.context_view), name="{0}_context".format(url_prefix), ), - ] + super().get_urls() + # strict overrides for the default admin views + re_path( + rf"^(?P({uuid_regex}|__fk__))/history/$", + self.admin_site.admin_view(self.history_view), + name=f"{url_prefix}_history", + ), + re_path( + rf"^(?P({uuid_regex}|__fk__))/delete/$", + self.admin_site.admin_view(self.delete_view), + name=f"{url_prefix}_delete", + ), + re_path( + rf"^(?P({uuid_regex}|__fk__))/change/$", + self.admin_site.admin_view(self.change_view), + name=f"{url_prefix}_change", + ), + ] + return strict_urls + safe_urls def _get_config_model(self): model = self.model diff --git a/openwisp_controller/config/tests/pytest.py b/openwisp_controller/config/tests/pytest.py index 36f09205e..3166a2389 100644 --- a/openwisp_controller/config/tests/pytest.py +++ b/openwisp_controller/config/tests/pytest.py @@ -13,6 +13,10 @@ Device = load_model("config", "Device") +uuid_regex = ( + r"[0-9a-fA-F]{8}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{12}" +) + @pytest.mark.asyncio @pytest.mark.django_db(transaction=True) @@ -25,7 +29,7 @@ class TestDeviceConsumer(CreateDeviceMixin): URLRouter( [ re_path( - r"^ws/controller/device/(?P[^/]+)/$", + rf"^ws/controller/device/(?P{uuid_regex})/$", BaseDeviceConsumer.as_asgi(), ) ] diff --git a/openwisp_controller/config/tests/test_admin.py b/openwisp_controller/config/tests/test_admin.py index c33b402c4..be984444c 100644 --- a/openwisp_controller/config/tests/test_admin.py +++ b/openwisp_controller/config/tests/test_admin.py @@ -1405,6 +1405,52 @@ def test_change_device_malformed_uuid(self): response = self.client.get(path) self.assertEqual(response.status_code, 404) + def test_malformed_admin_urls_return_404(self): + device = self._create_device() + template = self._create_template() + vpn = self._create_vpn() + test_cases = [ + ("device", device.pk, "config_device"), + ("template", template.pk, "config_template"), + ("vpn", vpn.pk, "config_vpn"), + ] + junk_path = "some/junk/path/here/" + original_bug_junk = "history/1564/undefinedadmin/img/icon-deletelink.svg" + + for model_name, valid_pk, model_admin_name in test_cases: + with self.subTest(model=model_name): + change_url_name = f"admin:{model_admin_name}_change" + valid_change_url = reverse(change_url_name, args=[valid_pk]) + malformed_change_url = f"{valid_change_url}{junk_path}" + response_change = self.client.get(malformed_change_url, follow=False) + + self.assertEqual( + response_change.status_code, + 404, + (f'Malformed "change" URL for {model_name} did not return 404. '), + ) + history_url_name = f"admin:{model_admin_name}_history" + valid_history_url = reverse(history_url_name, args=[valid_pk]) + malformed_history_url = f"{valid_history_url}{junk_path}" + response_history = self.client.get(malformed_history_url, follow=False) + + self.assertEqual( + response_history.status_code, + 404, + (f'Malformed "history" URL for {model_name} did not return 404. '), + ) + original_bug_url = f"{valid_history_url}{original_bug_junk}" + response_original_bug = self.client.get(original_bug_url, follow=False) + + self.assertEqual( + response_original_bug.status_code, + 404, + ( + f"Original bug URL for {model_name} did not return 404. " + "This may be a regression of bug #682." + ), + ) + def test_uuid_field_in_change(self): t = Template.objects.first() d = self._create_device() diff --git a/openwisp_controller/config/tests/test_controller.py b/openwisp_controller/config/tests/test_controller.py index ad08f4194..0fa2ac6ca 100644 --- a/openwisp_controller/config/tests/test_controller.py +++ b/openwisp_controller/config/tests/test_controller.py @@ -257,9 +257,8 @@ def test_device_checksum_requested_signal_is_emitted(self): def test_device_checksum_bad_uuid(self): d = self._create_device_config() pk = "{}-wrong".format(d.pk) - response = self.client.get( - reverse("controller:device_checksum", args=[pk]), {"key": d.key} - ) + bad_path = f"/controller/device/checksum/{pk}/" + response = self.client.get(bad_path, {"key": d.key}) self.assertEqual(response.status_code, 404) def test_device_config_download_requested_signal_is_emitted(self): @@ -323,9 +322,8 @@ def test_deactivated_device_download_config(self): def test_device_download_config_bad_uuid(self): d = self._create_device_config() pk = "{}-wrong".format(d.pk) - response = self.client.get( - reverse("controller:device_download_config", args=[pk]), {"key": d.key} - ) + bad_path = f"/controller/device/download-config/{pk}/" + response = self.client.get(bad_path, {"key": d.key}) self.assertEqual(response.status_code, 404) def test_vpn_checksum_requested_signal_is_emitted(self): @@ -383,9 +381,8 @@ def test_vpn_checksum(self): def test_vpn_checksum_bad_uuid(self): v = self._create_vpn() pk = "{}-wrong".format(v.pk) - response = self.client.get( - reverse("controller:vpn_checksum", args=[pk]), {"key": v.key} - ) + bad_path = f"/controller/vpn/checksum/{pk}/" + response = self.client.get(bad_path, {"key": v.key}) self.assertEqual(response.status_code, 404) @capture_any_output() @@ -500,9 +497,8 @@ def test_vpn_download_config(self): def test_vpn_download_config_bad_uuid(self): v = self._create_vpn() pk = "{}-wrong".format(v.pk) - response = self.client.get( - reverse("controller:vpn_download_config", args=[pk]), {"key": v.key} - ) + bad_path = f"/controller/vpn/download-config/{pk}/" + response = self.client.get(bad_path, {"key": v.key}) self.assertEqual(response.status_code, 404) @capture_any_output() @@ -895,9 +891,8 @@ def test_deactivated_device_report_status(self): def test_device_report_status_bad_uuid(self): d = self._create_device_config() pk = "{}-wrong".format(d.pk) - response = self.client.post( - reverse("controller:device_report_status", args=[pk]), {"key": d.key} - ) + bad_path = f"/controller/device/report-status/{pk}/" + response = self.client.post(bad_path, {"key": d.key}) self.assertEqual(response.status_code, 404) @capture_any_output() @@ -1012,9 +1007,8 @@ def test_device_update_info_bad_uuid(self): "os": "OpenWrt 18.06-SNAPSHOT r7312-e60be11330", "system": "Atheros AR9344 rev 3", } - response = self.client.post( - reverse("controller:device_update_info", args=[pk]), params - ) + bad_path = f"/controller/device/update-info/{pk}/" + response = self.client.post(bad_path, params) self.assertEqual(response.status_code, 404) def test_device_update_info_400(self): diff --git a/openwisp_controller/config/utils.py b/openwisp_controller/config/utils.py index c0048fa0f..a7af0279d 100644 --- a/openwisp_controller/config/utils.py +++ b/openwisp_controller/config/utils.py @@ -8,6 +8,9 @@ from openwisp_notifications.utils import _get_object_link logger = logging.getLogger(__name__) +uuid_regex = ( + r"[0-9a-fA-F]{8}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{12}" +) def get_object_or_404(model, **kwargs): @@ -117,22 +120,22 @@ def get_controller_urls(views_module): """ urls = [ re_path( - "controller/device/checksum/(?P[^/]+)/$", + rf"controller/device/checksum/(?P{uuid_regex})/$", views_module.device_checksum, name="device_checksum", ), re_path( - "controller/device/download-config/(?P[^/]+)/$", + rf"controller/device/download-config/(?P{uuid_regex})/$", views_module.device_download_config, name="device_download_config", ), re_path( - "controller/device/update-info/(?P[^/]+)/$", + rf"controller/device/update-info/(?P{uuid_regex})/$", views_module.device_update_info, name="device_update_info", ), re_path( - "controller/device/report-status/(?P[^/]+)/$", + rf"controller/device/report-status/(?P{uuid_regex})/$", views_module.device_report_status, name="device_report_status", ), @@ -142,33 +145,33 @@ def get_controller_urls(views_module): name="device_register", ), re_path( - "controller/vpn/checksum/(?P[^/]+)/$", + rf"controller/vpn/checksum/(?P{uuid_regex})/$", views_module.vpn_checksum, name="vpn_checksum", ), re_path( - "controller/vpn/download-config/(?P[^/]+)/$", + rf"controller/vpn/download-config/(?P{uuid_regex})/$", views_module.vpn_download_config, name="vpn_download_config", ), # legacy URLs re_path( - "controller/checksum/(?P[^/]+)/$", + rf"controller/checksum/(?P{uuid_regex})/$", views_module.device_checksum, name="checksum_legacy", ), re_path( - "controller/download-config/(?P[^/]+)/$", + rf"controller/download-config/(?P{uuid_regex})/$", views_module.device_download_config, name="download_config_legacy", ), re_path( - "controller/update-info/(?P[^/]+)/$", + rf"controller/update-info/(?P{uuid_regex})/$", views_module.device_update_info, name="update_info_legacy", ), re_path( - "controller/report-status/(?P[^/]+)/$", + rf"controller/report-status/(?P{uuid_regex})/$", views_module.device_report_status, name="report_status_legacy", ), diff --git a/openwisp_controller/connection/channels/routing.py b/openwisp_controller/connection/channels/routing.py index 1a23357db..f6b692a5a 100644 --- a/openwisp_controller/connection/channels/routing.py +++ b/openwisp_controller/connection/channels/routing.py @@ -2,11 +2,15 @@ from . import consumers as ow_consumer +uuid_regex = ( + r"[0-9a-fA-F]{8}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{12}" +) + def get_routes(consumer=ow_consumer): return [ re_path( - r"^ws/controller/device/(?P[^/]+)/command$", + rf"^ws/controller/device/(?P{uuid_regex})/command$", consumer.CommandConsumer.as_asgi(), ) ]