Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 34 additions & 4 deletions openwisp_controller/config/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@

logger = logging.getLogger(__name__)
prefix = "config/"
uuid_regex = (
r"[0-9a-fA-F]{8}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{12}"
)
Config = load_model("config", "Config")
Device = load_model("config", "Device")
DeviceGroup = load_model("config", "DeviceGroup")
Expand Down Expand Up @@ -166,9 +169,19 @@ def change_view(self, request, object_id, form_url="", extra_context=None):
def get_urls(self):
options = getattr(self.model, "_meta")
url_prefix = "{0}_{1}".format(options.app_label, options.model_name)
return [
default_urls = super().get_urls()
safe_urls = []
for url in default_urls:
if url.name and not (
url.name.endswith("_change")
or url.name.endswith("_history")
or url.name.endswith("_delete")
):
safe_urls.append(url)
strict_urls = [
# custom app URLs
re_path(
r"^download/(?P<pk>[^/]+)/$",
rf"^download/(?P<pk>{uuid_regex})/$",
self.admin_site.admin_view(self.download_view),
name="{0}_download".format(url_prefix),
),
Expand All @@ -178,11 +191,28 @@ def get_urls(self):
name="{0}_preview".format(url_prefix),
),
re_path(
r"^(?P<pk>[^/]+)/context\.json$",
rf"^(?P<pk>{uuid_regex})/context\.json$",
self.admin_site.admin_view(self.context_view),
name="{0}_context".format(url_prefix),
),
] + super().get_urls()
# strict overrides for the default admin views
re_path(
rf"^(?P<object_id>({uuid_regex}|__fk__))/history/$",
self.admin_site.admin_view(self.history_view),
name=f"{url_prefix}_history",
),
re_path(
rf"^(?P<object_id>({uuid_regex}|__fk__))/delete/$",
self.admin_site.admin_view(self.delete_view),
name=f"{url_prefix}_delete",
),
re_path(
rf"^(?P<object_id>({uuid_regex}|__fk__))/change/$",
self.admin_site.admin_view(self.change_view),
name=f"{url_prefix}_change",
),
]
return strict_urls + safe_urls

def _get_config_model(self):
model = self.model
Expand Down
6 changes: 5 additions & 1 deletion openwisp_controller/config/tests/pytest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@

Device = load_model("config", "Device")

uuid_regex = (
r"[0-9a-fA-F]{8}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{12}"
)


@pytest.mark.asyncio
@pytest.mark.django_db(transaction=True)
Expand All @@ -25,7 +29,7 @@ class TestDeviceConsumer(CreateDeviceMixin):
URLRouter(
[
re_path(
r"^ws/controller/device/(?P<pk>[^/]+)/$",
rf"^ws/controller/device/(?P<pk>{uuid_regex})/$",
BaseDeviceConsumer.as_asgi(),
)
]
Expand Down
46 changes: 46 additions & 0 deletions openwisp_controller/config/tests/test_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1405,6 +1405,52 @@ def test_change_device_malformed_uuid(self):
response = self.client.get(path)
self.assertEqual(response.status_code, 404)

def test_malformed_admin_urls_return_404(self):
device = self._create_device()
template = self._create_template()
vpn = self._create_vpn()
test_cases = [
("device", device.pk, "config_device"),
("template", template.pk, "config_template"),
("vpn", vpn.pk, "config_vpn"),
]
junk_path = "some/junk/path/here/"
original_bug_junk = "history/1564/undefinedadmin/img/icon-deletelink.svg"

for model_name, valid_pk, model_admin_name in test_cases:
with self.subTest(model=model_name):
change_url_name = f"admin:{model_admin_name}_change"
valid_change_url = reverse(change_url_name, args=[valid_pk])
malformed_change_url = f"{valid_change_url}{junk_path}"
response_change = self.client.get(malformed_change_url, follow=False)

self.assertEqual(
response_change.status_code,
404,
(f'Malformed "change" URL for {model_name} did not return 404. '),
)
history_url_name = f"admin:{model_admin_name}_history"
valid_history_url = reverse(history_url_name, args=[valid_pk])
malformed_history_url = f"{valid_history_url}{junk_path}"
response_history = self.client.get(malformed_history_url, follow=False)

self.assertEqual(
response_history.status_code,
404,
(f'Malformed "history" URL for {model_name} did not return 404. '),
)
original_bug_url = f"{valid_history_url}{original_bug_junk}"
response_original_bug = self.client.get(original_bug_url, follow=False)

self.assertEqual(
response_original_bug.status_code,
404,
(
f"Original bug URL for {model_name} did not return 404. "
"This may be a regression of bug #682."
),
)

def test_uuid_field_in_change(self):
t = Template.objects.first()
d = self._create_device()
Expand Down
30 changes: 12 additions & 18 deletions openwisp_controller/config/tests/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,8 @@ def test_device_checksum_requested_signal_is_emitted(self):
def test_device_checksum_bad_uuid(self):
d = self._create_device_config()
pk = "{}-wrong".format(d.pk)
response = self.client.get(
reverse("controller:device_checksum", args=[pk]), {"key": d.key}
)
bad_path = f"/controller/device/checksum/{pk}/"
response = self.client.get(bad_path, {"key": d.key})
self.assertEqual(response.status_code, 404)

def test_device_config_download_requested_signal_is_emitted(self):
Expand Down Expand Up @@ -323,9 +322,8 @@ def test_deactivated_device_download_config(self):
def test_device_download_config_bad_uuid(self):
d = self._create_device_config()
pk = "{}-wrong".format(d.pk)
response = self.client.get(
reverse("controller:device_download_config", args=[pk]), {"key": d.key}
)
bad_path = f"/controller/device/download-config/{pk}/"
response = self.client.get(bad_path, {"key": d.key})
self.assertEqual(response.status_code, 404)

def test_vpn_checksum_requested_signal_is_emitted(self):
Expand Down Expand Up @@ -383,9 +381,8 @@ def test_vpn_checksum(self):
def test_vpn_checksum_bad_uuid(self):
v = self._create_vpn()
pk = "{}-wrong".format(v.pk)
response = self.client.get(
reverse("controller:vpn_checksum", args=[pk]), {"key": v.key}
)
bad_path = f"/controller/vpn/checksum/{pk}/"
response = self.client.get(bad_path, {"key": v.key})
self.assertEqual(response.status_code, 404)

@capture_any_output()
Expand Down Expand Up @@ -500,9 +497,8 @@ def test_vpn_download_config(self):
def test_vpn_download_config_bad_uuid(self):
v = self._create_vpn()
pk = "{}-wrong".format(v.pk)
response = self.client.get(
reverse("controller:vpn_download_config", args=[pk]), {"key": v.key}
)
bad_path = f"/controller/vpn/download-config/{pk}/"
response = self.client.get(bad_path, {"key": v.key})
self.assertEqual(response.status_code, 404)

@capture_any_output()
Expand Down Expand Up @@ -895,9 +891,8 @@ def test_deactivated_device_report_status(self):
def test_device_report_status_bad_uuid(self):
d = self._create_device_config()
pk = "{}-wrong".format(d.pk)
response = self.client.post(
reverse("controller:device_report_status", args=[pk]), {"key": d.key}
)
bad_path = f"/controller/device/report-status/{pk}/"
response = self.client.post(bad_path, {"key": d.key})
self.assertEqual(response.status_code, 404)

@capture_any_output()
Expand Down Expand Up @@ -1012,9 +1007,8 @@ def test_device_update_info_bad_uuid(self):
"os": "OpenWrt 18.06-SNAPSHOT r7312-e60be11330",
"system": "Atheros AR9344 rev 3",
}
response = self.client.post(
reverse("controller:device_update_info", args=[pk]), params
)
bad_path = f"/controller/device/update-info/{pk}/"
response = self.client.post(bad_path, params)
self.assertEqual(response.status_code, 404)

def test_device_update_info_400(self):
Expand Down
23 changes: 13 additions & 10 deletions openwisp_controller/config/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
from openwisp_notifications.utils import _get_object_link

logger = logging.getLogger(__name__)
uuid_regex = (
r"[0-9a-fA-F]{8}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{12}"
)


def get_object_or_404(model, **kwargs):
Expand Down Expand Up @@ -117,22 +120,22 @@ def get_controller_urls(views_module):
"""
urls = [
re_path(
"controller/device/checksum/(?P<pk>[^/]+)/$",
rf"controller/device/checksum/(?P<pk>{uuid_regex})/$",
views_module.device_checksum,
name="device_checksum",
),
re_path(
"controller/device/download-config/(?P<pk>[^/]+)/$",
rf"controller/device/download-config/(?P<pk>{uuid_regex})/$",
views_module.device_download_config,
name="device_download_config",
),
re_path(
"controller/device/update-info/(?P<pk>[^/]+)/$",
rf"controller/device/update-info/(?P<pk>{uuid_regex})/$",
views_module.device_update_info,
name="device_update_info",
),
re_path(
"controller/device/report-status/(?P<pk>[^/]+)/$",
rf"controller/device/report-status/(?P<pk>{uuid_regex})/$",
views_module.device_report_status,
name="device_report_status",
),
Expand All @@ -142,33 +145,33 @@ def get_controller_urls(views_module):
name="device_register",
),
re_path(
"controller/vpn/checksum/(?P<pk>[^/]+)/$",
rf"controller/vpn/checksum/(?P<pk>{uuid_regex})/$",
views_module.vpn_checksum,
name="vpn_checksum",
),
re_path(
"controller/vpn/download-config/(?P<pk>[^/]+)/$",
rf"controller/vpn/download-config/(?P<pk>{uuid_regex})/$",
views_module.vpn_download_config,
name="vpn_download_config",
),
# legacy URLs
re_path(
"controller/checksum/(?P<pk>[^/]+)/$",
rf"controller/checksum/(?P<pk>{uuid_regex})/$",
views_module.device_checksum,
name="checksum_legacy",
),
re_path(
"controller/download-config/(?P<pk>[^/]+)/$",
rf"controller/download-config/(?P<pk>{uuid_regex})/$",
views_module.device_download_config,
name="download_config_legacy",
),
re_path(
"controller/update-info/(?P<pk>[^/]+)/$",
rf"controller/update-info/(?P<pk>{uuid_regex})/$",
views_module.device_update_info,
name="update_info_legacy",
),
re_path(
"controller/report-status/(?P<pk>[^/]+)/$",
rf"controller/report-status/(?P<pk>{uuid_regex})/$",
views_module.device_report_status,
name="report_status_legacy",
),
Expand Down
6 changes: 5 additions & 1 deletion openwisp_controller/connection/channels/routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@

from . import consumers as ow_consumer

uuid_regex = (
r"[0-9a-fA-F]{8}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{12}"
)


def get_routes(consumer=ow_consumer):
return [
re_path(
r"^ws/controller/device/(?P<pk>[^/]+)/command$",
rf"^ws/controller/device/(?P<pk>{uuid_regex})/command$",
consumer.CommandConsumer.as_asgi(),
)
]
Loading