diff --git a/.coveragerc b/.coveragerc index b41a21408..58dad8ac3 100644 --- a/.coveragerc +++ b/.coveragerc @@ -17,6 +17,11 @@ omit = pydis_site/settings.py pydis_site/utils/resources.py pydis_site/apps/home/views.py + # XXX: Will be covered later when FormsUser is used properly. + pydis_site/apps/forms/authentication.py + pydis_site/apps/forms/discord.py + # XXX: Will be tested later + pydis_site/apps/forms/permissions.py [report] fail_under = 100 diff --git a/pydis_site/apps/forms/__init__.py b/pydis_site/apps/forms/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/pydis_site/apps/forms/admin.py b/pydis_site/apps/forms/admin.py new file mode 100644 index 000000000..b97a94f64 --- /dev/null +++ b/pydis_site/apps/forms/admin.py @@ -0,0 +1,2 @@ + +# Register your models here. diff --git a/pydis_site/apps/forms/apps.py b/pydis_site/apps/forms/apps.py new file mode 100644 index 000000000..e0d642a3a --- /dev/null +++ b/pydis_site/apps/forms/apps.py @@ -0,0 +1,7 @@ +from django.apps import AppConfig + + +class FormsConfig(AppConfig): + """Django AppConfig for the forms app.""" + + name = "pydis_site.apps.forms" diff --git a/pydis_site/apps/forms/authentication.py b/pydis_site/apps/forms/authentication.py new file mode 100644 index 000000000..c613d160e --- /dev/null +++ b/pydis_site/apps/forms/authentication.py @@ -0,0 +1,167 @@ +"""Custom authentication for the forms backend.""" + +import typing + +import jwt +from django.conf import settings +from django.http import HttpRequest +from rest_framework.authentication import BaseAuthentication +from rest_framework.exceptions import AuthenticationFailed + +from . import discord +from . import models + + +def encode_jwt(info: dict, *, signing_secret_key: str = settings.SECRET_KEY) -> str: + """Encode JWT information with either the configured signing key or a passed one.""" + return jwt.encode(info, signing_secret_key, algorithm="HS256") + + +class FormsUser: + """Stores authentication information for a forms user.""" + + # This allows us to safely use the same checks that we could use on a Django user. + is_authenticated: bool = True + + def __init__( + self, + token: str, + payload: dict[str, typing.Any], + member: models.DiscordMember | None, + ) -> None: + """Set up a forms user.""" + self.token = token + self.payload = payload + self.admin = False + self.member = member + + @property + def display_name(self) -> str: + """Return username and discriminator as display name.""" + return f"{self.payload['username']}#{self.payload['discriminator']}" + + @property + def discord_mention(self) -> str: + """Return a mention for this user on Discord.""" + return f"<@{self.payload['id']}>" + + @property + def user_id(self) -> str: + """Return this user's ID as a string.""" + return str(self.payload["id"]) + + @property + def decoded_token(self) -> dict[str, any]: + """Decode the information stored in this user's JWT token.""" + return jwt.decode(self.token, settings.SECRET_KEY, algorithms=["HS256"]) + + def get_roles(self) -> tuple[str, ...]: + """Get a tuple of the user's discord roles by name.""" + if not self.member: + return [] + + server_roles = discord.get_roles() + roles = [role.name for role in server_roles if role.id in self.member.roles] + + if "admin" in roles: + # Protect against collision with the forms admin role + roles.remove("admin") + roles.append("discord admin") + + return tuple(roles) + + def is_admin(self) -> bool: + """Return whether this user is an administrator.""" + self.admin = models.Admin.objects.filter(id=self.payload["id"]).exists() + return self.admin + + def refresh_data(self) -> None: + """Fetches user data from discord, and updates the instance.""" + self.member = discord.get_member(self.payload["id"]) + + if self.member: + self.payload = self.member.user.dict() + else: + self.payload = discord.fetch_user_details(self.decoded_token.get("token")) + + updated_info = self.decoded_token + updated_info["user_details"] = self.payload + + self.token = encode_jwt(updated_info) + + +class AuthenticationResult(typing.NamedTuple): + """Return scopes that the user has authenticated with.""" + + scopes: tuple[str, ...] + + +# See https://www.django-rest-framework.org/api-guide/authentication/#custom-authentication +class JWTAuthentication(BaseAuthentication): + """Custom DRF authentication backend for JWT.""" + + @staticmethod + def get_token_from_cookie(cookie: str) -> str: + """Parse JWT token from cookie.""" + try: + prefix, token = cookie.split() + except ValueError: + msg = "Unable to split prefix and token from authorization cookie." + raise AuthenticationFailed(msg) + + if prefix.upper() != "JWT": + msg = f"Invalid authorization cookie prefix '{prefix}'." + raise AuthenticationFailed(msg) + + return token + + def authenticate( + self, + request: HttpRequest, + ) -> tuple[FormsUser, None] | None: + """Handles JWT authentication process.""" + cookie = request.COOKIES.get("token") + if not cookie: + return None + + token = self.get_token_from_cookie(cookie) + + try: + # New key. + payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"]) + except jwt.InvalidTokenError: + try: + # Old key. Should be removed at a certain point. + payload = jwt.decode(token, settings.FORMS_SECRET_KEY, algorithms=["HS256"]) + except jwt.InvalidTokenError as e: + raise AuthenticationFailed(str(e)) + + scopes = ["authenticated"] + + if not payload.get("token"): + msg = "Token is missing from JWT." + raise AuthenticationFailed(msg) + if not payload.get("refresh"): + msg = "Refresh token is missing from JWT." + raise AuthenticationFailed(msg) + + try: + user_details = payload.get("user_details") + if not user_details or not user_details.get("id"): + msg = "Improper user details." + raise AuthenticationFailed(msg) + except Exception: + msg = "Could not parse user details." + raise AuthenticationFailed(msg) + + user = FormsUser( + token, + user_details, + discord.get_member(user_details["id"]), + ) + if user.is_admin(): + scopes.append("admin") + + scopes.extend(user.get_roles()) + + return user, AuthenticationResult(scopes=tuple(scopes)) diff --git a/pydis_site/apps/forms/discord.py b/pydis_site/apps/forms/discord.py new file mode 100644 index 000000000..3e7e80fb2 --- /dev/null +++ b/pydis_site/apps/forms/discord.py @@ -0,0 +1,143 @@ +"""API functions for Discord access.""" + +import httpx +from django.conf import settings + +from . import models +from . import util + + +__all__ = ("get_member", "get_roles") + + +def fetch_and_update_roles() -> tuple[models.DiscordRole, ...]: + """Get information about roles from Discord.""" + with httpx.Client() as client: + r = client.get( + f"{settings.DISCORD_API_BASE_URL}/guilds/{settings.DISCORD_GUILD_ID}/roles", + headers={"Authorization": f"Bot {settings.DISCORD_BOT_TOKEN}"}, + ) + + r.raise_for_status() + return tuple(models.DiscordRole(**role) for role in r.json()) + + +def fetch_member_details(member_id: int) -> models.DiscordMember | None: + """Get a member by ID from the configured guild using the discord API.""" + with httpx.Client() as client: + r = client.get( + f"{settings.DISCORD_API_BASE_URL}/guilds/{settings.DISCORD_GUILD_ID}/members/{member_id}", + headers={"Authorization": f"Bot {settings.DISCORD_BOT_TOKEN}"}, + ) + + if r.status_code == 404: + return None + + r.raise_for_status() + return models.DiscordMember(**r.json()) + + +def fetch_user_details(bearer_token: str) -> dict: + """Fetch information about the Discord user associated with the given ``bearer_token``.""" + with httpx.Client() as client: + r = client.get( + f"{settings.DISCORD_API_BASE_URL}/users/@me", + headers={ + "Authorization": f"Bearer {bearer_token}", + }, + ) + + r.raise_for_status() + + return r.json() + + +def fetch_bearer_token(code: str, redirect: str, *, refresh: bool) -> dict: + """ + Fetch an OAuth2 bearer token. + + ## Arguments + + - ``code``: The code or refresh token for the operation. Usually provided by Discord. + - ``redirect``: Where to redirect the client after successful login. + + ## Keyword arguments + + - ``refresh``: Whether to fetch a refresh token. + """ + with httpx.Client() as client: + data = { + "client_id": settings.DISCORD_OAUTH2_CLIENT_ID, + "client_secret": settings.DISCORD_OAUTH2_CLIENT_SECRET, + "redirect_uri": f"{redirect}/callback", + } + + if refresh: + data["grant_type"] = "refresh_token" + data["refresh_token"] = code + else: + data["grant_type"] = "authorization_code" + data["code"] = code + + r = client.post( + f"{settings.DISCORD_API_BASE_URL}/oauth2/token", + headers={ + "Content-Type": "application/x-www-form-urlencoded", + }, + data=data, + ) + + r.raise_for_status() + + return r.json() + + +def get_roles(*, force_refresh: bool = False, stale_after: int = 60 * 60 * 24) -> tuple[models.DiscordRole, ...]: + """ + Get a tuple of all roles from the cache, or discord API if not available. + + ## Keyword arguments + + - `force_refresh` (`bool`): Skip the cache and always update the roles from + Discord. + - `stale_after` (`int`): Seconds after which to consider the stored roles + as stale and to refresh them. + """ + if not force_refresh: + roles = models.DiscordRole.objects.all() + oldest = min(role.last_update for role in roles) + if not util.is_stale(oldest, 60 * 60 * 24): # 1 day + return tuple(roles) + + return fetch_and_update_roles() + + +def get_member( + user_id: int, + *, + force_refresh: bool = False, +) -> models.DiscordMember | None: + """ + Get a member from the cache, or from the discord API. + + ## Keyword arguments + + - `force_refresh` (`bool`): Skip the cache and always update the roles from + Discord. + - `stale_after` (`int`): Seconds after which to consider the stored roles + as stale and to refresh them. + + ## Return value + + Returns `None` if the member object does not exist. + """ + if not force_refresh: + member = models.DiscordMember.objects.get(id=user_id) + if not util.is_stale(member.last_update, 60 * 60): + return member + + member = fetch_member_details(user_id) + if member: + member.save() + + return member diff --git a/pydis_site/apps/forms/migrations/0001_initial_updated.py b/pydis_site/apps/forms/migrations/0001_initial_updated.py new file mode 100644 index 000000000..4ce285bbd --- /dev/null +++ b/pydis_site/apps/forms/migrations/0001_initial_updated.py @@ -0,0 +1,240 @@ +# Generated by Django 5.1.7 on 2025-03-27 18:02 + +import django.contrib.postgres.fields +import django.core.validators +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + initial = True + + dependencies = [] + + operations = [ + migrations.CreateModel( + name="Admin", + fields=[ + ("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ], + ), + migrations.CreateModel( + name="DiscordRole", + fields=[ + ("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("name", models.CharField(help_text="The role name, taken from Discord.", max_length=100)), + ( + "colour", + models.IntegerField( + help_text="The integer value of the colour of this role from Discord.", + validators=[ + django.core.validators.MinValueValidator( + limit_value=0, message="Colour hex cannot be negative." + ) + ], + ), + ), + ("hoist", models.BooleanField(help_text="Whether this role is hoisted.")), + ("icon", models.CharField(help_text="Icon hash of the role.", max_length=250, null=True)), + ("unicode_emoji", models.CharField(help_text="Unicode emoji of the role.", max_length=250, null=True)), + ( + "position", + models.IntegerField( + help_text="The position of the role in the role hierarchy of the Discord Guild." + ), + ), + ( + "permissions", + models.BigIntegerField( + help_text="The integer value of the permission bitset of this role from Discord.", + validators=[ + django.core.validators.MinValueValidator( + limit_value=0, message="Role permissions cannot be negative." + ) + ], + ), + ), + ("managed", models.BooleanField(help_text="Whether this role is managed by an integration.")), + ("mentionable", models.BooleanField(help_text="Whether this role is mentionable.")), + ("role_tags", models.JSONField(help_text="Further metadata about this role.", null=True)), + ( + "last_update", + models.DateTimeField( + auto_now=True, help_text="When this role was most recently refreshed from Discord." + ), + ), + ], + ), + migrations.CreateModel( + name="DiscordUser", + fields=[ + ( + "id", + models.BigIntegerField( + help_text="The ID of this user.", + primary_key=True, + serialize=False, + validators=[ + django.core.validators.MinValueValidator( + limit_value=0, message="User IDs can not be negative." + ) + ], + verbose_name="ID", + ), + ), + ( + "username", + models.CharField(help_text="The name of this user.", max_length=32, verbose_name="Username"), + ), + ( + "discriminator", + models.PositiveSmallIntegerField( + help_text="The discriminator of this user, taken from Discord.", + validators=[ + django.core.validators.MaxValueValidator( + limit_value=9999, message="Discriminators may not exceed `9999`." + ) + ], + ), + ), + ( + "avatar", + models.CharField( + help_text="The avatar hash of this user.", max_length=100, null=True, verbose_name="Avatar hash" + ), + ), + ("bot", models.BooleanField(help_text="Whether this user is a bot.", null=True, verbose_name="Is bot")), + ( + "system", + models.BooleanField( + help_text="Whether this user is a system user.", null=True, verbose_name="Is system user" + ), + ), + ( + "locale", + models.CharField( + help_text="The identifier of the locale that this user is using.", + null=True, + verbose_name="Locale identifier", + ), + ), + ( + "verified", + models.BooleanField( + help_text="Whether this user's email address is verified.", + null=True, + verbose_name="Verified email address", + ), + ), + ( + "email", + models.CharField( + help_text="The e-mail address of this user.", null=True, verbose_name="E-mail address" + ), + ), + ( + "flags", + models.IntegerField(help_text="User account flags as a bitfield.", null=True, verbose_name="Flags"), + ), + ( + "premium_type", + models.IntegerField( + help_text="The type of nitro subscription on a user's account.", + null=True, + verbose_name="Nitro type", + ), + ), + ( + "public_flags", + models.IntegerField( + help_text="The public flags on a user's account.", null=True, verbose_name="Flags" + ), + ), + ], + ), + migrations.CreateModel( + name="DiscordMember", + fields=[ + ("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ( + "nick", + models.CharField(help_text="The nickname that the member is using.", max_length=100, null=True), + ), + ( + "avatar", + models.CharField( + help_text="The avatar hash of this member for this server.", + max_length=100, + null=True, + verbose_name="Avatar hash", + ), + ), + ( + "roles", + django.contrib.postgres.fields.ArrayField( + base_field=models.BigIntegerField( + help_text="The snowflake ID of a role this member is part of.", + validators=[ + django.core.validators.MinValueValidator( + limit_value=0, message="Role IDs cannot be negative." + ) + ], + ), + help_text="Roles this member is part of.", + size=None, + verbose_name="Roles", + ), + ), + ( + "joined_at", + models.DateTimeField(help_text="When this member has joined the guild.", verbose_name="Join date"), + ), + ( + "premium_since", + models.DateTimeField( + help_text="When this member started boosting the guild.", + null=True, + verbose_name="Boosting since", + ), + ), + ("deaf", models.BooleanField(help_text="Whether this user is deaf.", verbose_name="Deaf")), + ("mute", models.BooleanField(help_text="Whether this user is mute.", verbose_name="Mute")), + ( + "pending", + models.BooleanField( + help_text="Whether this user has not yet passed membership screening.", + null=True, + verbose_name="Pending screening", + ), + ), + ( + "permissions", + models.BigIntegerField( + help_text="Total permissions of the member in the channel.", + null=True, + verbose_name="Pending screening", + ), + ), + ( + "communication_disabled_until", + models.DateTimeField( + help_text="Until when the user is server-muted.", null=True, verbose_name="Timeout until" + ), + ), + ( + "last_update", + models.DateTimeField( + auto_now=True, help_text="When this member was most recently refreshed from Discord." + ), + ), + ( + "user", + models.OneToOneField( + help_text="The user associated with this member.", + on_delete=django.db.models.deletion.CASCADE, + to="forms.discorduser", + ), + ), + ], + ), + ] diff --git a/pydis_site/apps/forms/migrations/__init__.py b/pydis_site/apps/forms/migrations/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/pydis_site/apps/forms/models.py b/pydis_site/apps/forms/models.py new file mode 100644 index 000000000..6b781efe6 --- /dev/null +++ b/pydis_site/apps/forms/models.py @@ -0,0 +1,197 @@ + +from django.core.validators import MinValueValidator, MaxValueValidator +from django.contrib.postgres.fields import ArrayField +from django.db import models + + +class Admin(models.Model): + """Represents an administrator of the forms backend.""" + + id: models.BigIntegerField( + primary_key=True, + validators=(MinValueValidator(limit_value=0, message="Admin IDs can not be negative."),), + help_text="The user ID of this administrator.", + verbose_name="ID", + ) + + +# XXX: This duplicates the role object from the API app. The role object in the +# API app carries less data. We should unify these. +class DiscordRole(models.Model): + """Represents a role as returned by the Discord API.""" + + id: models.BigIntegerField( + primary_key=True, + validators=(MinValueValidator(limit_value=0, message="Role IDs can not be negative."),), + help_text="The ID of this role.", + verbose_name="ID", + ) + name = models.CharField(max_length=100, help_text="The role name, taken from Discord.") + colour = models.IntegerField( + validators=(MinValueValidator(limit_value=0, message="Colour hex cannot be negative."),), + help_text="The integer value of the colour of this role from Discord.", + ) + hoist = models.BooleanField(help_text="Whether this role is hoisted.") + icon = models.CharField( + max_length=250, + help_text="Icon hash of the role.", + null=True, + ) + unicode_emoji = models.CharField( + max_length=250, + help_text="Unicode emoji of the role.", + null=True, + ) + position = models.IntegerField(help_text="The position of the role in the role hierarchy of the Discord Guild.") + permissions = models.BigIntegerField( + validators=(MinValueValidator(limit_value=0, message="Role permissions cannot be negative."),), + help_text="The integer value of the permission bitset of this role from Discord.", + ) + managed = models.BooleanField(help_text="Whether this role is managed by an integration.") + mentionable = models.BooleanField(help_text="Whether this role is mentionable.") + role_tags = models.JSONField( + help_text="Further metadata about this role.", + null=True, + ) + last_update = models.DateTimeField( + help_text="When this role was most recently refreshed from Discord.", + auto_now=True, + ) + + +# XXX: We should try to get rid of this. +class DiscordUser(models.Model): + """Represents a user as returned by the Discord API.""" + + id = models.BigIntegerField( + primary_key=True, + validators=(MinValueValidator(limit_value=0, message="User IDs can not be negative."),), + help_text="The ID of this user.", + verbose_name="ID", + ) + username = models.CharField( + help_text="The name of this user.", + verbose_name="Username", + max_length=32, + ) + discriminator = models.PositiveSmallIntegerField( + validators=(MaxValueValidator(limit_value=9999, message="Discriminators may not exceed `9999`."),), + help_text="The discriminator of this user, taken from Discord.", + ) + avatar = models.CharField( + help_text="The avatar hash of this user.", + verbose_name="Avatar hash", + max_length=100, + null=True, + ) + bot = models.BooleanField( + help_text="Whether this user is a bot.", + verbose_name="Is bot", + null=True, + ) + system = models.BooleanField( + help_text="Whether this user is a system user.", + verbose_name="Is system user", + null=True, + ) + locale = models.CharField( + help_text="The identifier of the locale that this user is using.", + verbose_name="Locale identifier", + null=True, + ) + verified = models.BooleanField( + help_text="Whether this user's email address is verified.", + verbose_name="Verified email address", + null=True, + ) + email = models.CharField( + help_text="The e-mail address of this user.", + verbose_name="E-mail address", + null=True, + ) + flags = models.IntegerField( + help_text="User account flags as a bitfield.", + verbose_name="Flags", + null=True, + ) + premium_type = models.IntegerField( + help_text="The type of nitro subscription on a user's account.", + verbose_name="Nitro type", + null=True, + ) + public_flags = models.IntegerField( + help_text="The public flags on a user's account.", + verbose_name="Flags", + null=True, + ) + + +# XXX: This duplicates the member object from the API app. The member object in +# the API app carries less data. We should unify these, although admittedly +# this one has the extra use of being able to filter members that use the forms +# backend, and extra data. +class DiscordMember(models.Model): + """Represents a guild member as returned by the Discord API.""" + + user = models.OneToOneField( + DiscordUser, + help_text="The user associated with this member.", + on_delete=models.CASCADE, + ) + nick = models.CharField( + max_length=100, + help_text="The nickname that the member is using.", + null=True, + ) + avatar = models.CharField( + help_text="The avatar hash of this member for this server.", + verbose_name="Avatar hash", + max_length=100, + null=True, + ) + roles = ArrayField( + models.BigIntegerField( + validators=(MinValueValidator(limit_value=0, message="Role IDs cannot be negative."),), + help_text="The snowflake ID of a role this member is part of.", + ), + help_text="Roles this member is part of.", + verbose_name="Roles", + ) + joined_at = models.DateTimeField( + help_text="When this member has joined the guild.", + verbose_name="Join date", + ) + premium_since = models.DateTimeField( + help_text="When this member started boosting the guild.", + verbose_name="Boosting since", + null=True, + ) + # XXX: These should probably be removed. + deaf = models.BooleanField( + help_text="Whether this user is deaf.", + verbose_name="Deaf", + ) + mute = models.BooleanField( + help_text="Whether this user is mute.", + verbose_name="Mute", + ) + pending = models.BooleanField( + help_text="Whether this user has not yet passed membership screening.", + verbose_name="Pending screening", + null=True, + ) + # XXX: This should probably be removed, seems only relevant to interactions. + permissions = models.BigIntegerField( + help_text="Total permissions of the member in the channel.", + verbose_name="Pending screening", + null=True, + ) + communication_disabled_until = models.DateTimeField( + help_text="Until when the user is server-muted.", + verbose_name="Timeout until", + null=True, + ) + last_update = models.DateTimeField( + help_text="When this member was most recently refreshed from Discord.", + auto_now=True, + ) diff --git a/pydis_site/apps/forms/permissions.py b/pydis_site/apps/forms/permissions.py new file mode 100644 index 000000000..01b9563ae --- /dev/null +++ b/pydis_site/apps/forms/permissions.py @@ -0,0 +1,18 @@ +from collections.abc import Iterable + +from django.request import HttpRequest +from django.views import View +from rest_framework.permissions import BasePermission + + +class HasJWTScopes(BasePermission): + """Ensure that requesting users have the JWT scopes specified in the constructor.""" + + def __init__(self, scopes: Iterable[str]) -> None: + """Configure the required scopes to access a resource.""" + self.scopes = frozenset(scopes) + + def has_permission(self, request: HttpRequest, view: View) -> bool: + """Only allow authenticated users with the configured set of scopes to access this resource.""" + # XXX: this should check for superset, not strict equality. + return request.user.is_authenticated and request.auth and frozenset(request.auth.scopes) == self.scopes diff --git a/pydis_site/apps/forms/tests/__init__.py b/pydis_site/apps/forms/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/pydis_site/apps/forms/tests/base.py b/pydis_site/apps/forms/tests/base.py new file mode 100644 index 000000000..d4f54184c --- /dev/null +++ b/pydis_site/apps/forms/tests/base.py @@ -0,0 +1,93 @@ +import secrets +import unittest.mock + +from django.test import TestCase +from django.utils import timezone + +from pydis_site.apps.forms import authentication +from pydis_site.apps.forms import models + + +def fake_user() -> models.DiscordUser: + """Return a fake user for testing.""" + return models.DiscordUser( + id=1234, + username="Joe 'CIA' Banks", + discriminator=1234, + avatar=None, + bot=True, + system=True, + locale="tr", + verified=False, + email="shredder@jeremiahboby.me", + flags=0, + ) + + +def fake_member(user: models.DiscordUser, role_ids: tuple[int, ...]) -> models.DiscordMember: + """Return a fake member for testing.""" + return models.DiscordMember( + user=user, + roles=role_ids, + joined_at=timezone.now(), + deaf=True, + mute=False, + ) + + +class AuthenticatedTestCase(TestCase): + """Allows testing the forms API as an authenticated user.""" + + authenticate_as_admin = False + """Whether to authenticate the test member as an administrator.""" + + roles: tuple[str, ...] = () + """Which Discord role names to return for the given user.""" + + @classmethod + def setUpClass(cls) -> None: + """Set up the user as configured for authentication.""" + cls.user = fake_user() + cls.user.save() + cls.addClassCleanup(cls.user.delete) + + roles_with_ids = tuple(models.DiscordRole(id=idx, name=name) for idx, name in enumerate(cls.roles)) + role_ids = tuple(role.id for role in roles_with_ids) + cls.member = fake_member(cls.user, role_ids) + cls.member.save() + cls.addClassCleanup(cls.member.delete) + + get_roles_patcher = unittest.mock.patch("pydis_site.apps.forms.discord.get_roles") + cls.patched_get_roles = get_roles_patcher.start() + cls.patched_get_roles.return_value = roles_with_ids + cls.addClassCleanup(get_roles_patcher.stop) + + if cls.authenticate_as_admin: + admin = models.Admin(id=cls.member.id) + admin.save() + cls.addClassCleanup(admin.delete) + + cls.jwt_cookie = cls.create_jwt_cookie(cls.member) + super().setUpClass() + + def setUp(self) -> None: + """Log the user in to the test client.""" + self.jwt_login(self.member) + super().setUp() + + @staticmethod + def create_jwt_cookie(member: models.DiscordMember) -> str: + """Create a cookie as expected by the forms authentication.""" + data = { + "token": secrets.token_urlsafe(6), + "refresh": secrets.token_urlsafe(6), + "user_details": { + "id": member.id, + "name": member.user.username, + }, + } + return "JWT " + authentication.encode_jwt(data) + + def jwt_login(self, member: models.DiscordMember) -> None: + """Log the user in to the test client.""" + self.client.cookies["token"] = self.jwt_cookie diff --git a/pydis_site/apps/forms/tests/test_api.py b/pydis_site/apps/forms/tests/test_api.py new file mode 100644 index 000000000..88cdca1c7 --- /dev/null +++ b/pydis_site/apps/forms/tests/test_api.py @@ -0,0 +1,145 @@ + +from django.conf import settings +from django.test import TestCase, override_settings +from django.urls import reverse + +from pydis_site.apps.forms import authentication +from pydis_site.apps.forms.tests.base import AuthenticatedTestCase + + +class TestIndex(TestCase): + def test_index_returns_200(self) -> None: + """The index page should return a HTTP 200 response.""" + + url = reverse("forms:index") + resp = self.client.get(url) + self.assertEqual(resp.status_code, 200) + + +class TestAuthentication(TestCase): + def tearDown(self) -> None: + # Removes all cookies from the test client. + self.client.logout() + + def run_test_with_cookie(self, cookie: str, error: str) -> None: + url = reverse("forms:index") + self.client.cookies["token"] = cookie + resp = self.client.get(url) + content = resp.json() + self.assertEqual(resp.status_code, 403) + self.assertEqual(content, {"detail": error}) + + def test_authentication_invalid_cookie_format(self) -> None: + self.run_test_with_cookie( + cookie="invalid prefix and token format", + error="Unable to split prefix and token from authorization cookie.", + ) + + def test_authentication_invalid_cookie_prefix(self) -> None: + self.run_test_with_cookie(cookie="mnesia: token", error="Invalid authorization cookie prefix 'mnesia:'.") + + @override_settings(SECRET_KEY="some-garbage", FORMS_SECRET_KEY="forms-key") + def test_authentication_via_forms_secret_key(self) -> None: + cookie = "JWT " + authentication.encode_jwt({}, signing_secret_key=settings.FORMS_SECRET_KEY) + self.run_test_with_cookie(cookie=cookie, error="Token is missing from JWT.") + + @override_settings(SECRET_KEY="some-garbage", FORMS_SECRET_KEY="forms-key") + def test_authentication_via_secret_key(self) -> None: + cookie = "JWT " + authentication.encode_jwt({}, signing_secret_key=settings.SECRET_KEY) + self.run_test_with_cookie(cookie=cookie, error="Token is missing from JWT.") + + def test_authentication_via_unknown_key(self) -> None: + cookie = "JWT " + authentication.encode_jwt({}, signing_secret_key="JOEBANKS") + self.run_test_with_cookie(cookie=cookie, error="Signature verification failed") + + def test_missing_refresh_token(self) -> None: + content = {"token": "token"} + cookie = "JWT " + authentication.encode_jwt(content, signing_secret_key=settings.SECRET_KEY) + self.run_test_with_cookie(cookie=cookie, error="Refresh token is missing from JWT.") + + def test_missing_user_details(self) -> None: + content = {"token": "token", "refresh": "refresh", "user_details": {"id": False}} + cookie = "JWT " + authentication.encode_jwt(content, signing_secret_key=settings.SECRET_KEY) + self.run_test_with_cookie(cookie=cookie, error="Could not parse user details.") + + def test_bad_user_details(self) -> None: + content = {"token": "token", "refresh": "refresh", "user_details": ["erlang", "otp"]} + cookie = "JWT " + authentication.encode_jwt(content, signing_secret_key=settings.SECRET_KEY) + self.run_test_with_cookie(cookie=cookie, error="Could not parse user details.") + + +class NonAdminAuthenticationTest(AuthenticatedTestCase): + def test_non_admin_user(self) -> None: + url = reverse("forms:index") + + resp = self.client.get(url) + content = resp.json() + self.assertEqual(resp.status_code, 200) + self.assertEqual( + content["user"], + { + "authenticated": True, + "user": {"id": self.member.id, "name": self.member.user.username}, + "scopes": ["authenticated"], + }, + ) + + +class AdminAuthenticationTest(AuthenticatedTestCase): + authenticate_as_admin = True + + def test_admin_user(self) -> None: + url = reverse("forms:index") + + resp = self.client.get(url) + content = resp.json() + self.assertEqual(resp.status_code, 200) + self.assertEqual( + content["user"], + { + "authenticated": True, + "user": {"id": self.member.id, "name": self.member.user.username}, + "scopes": ["authenticated", "admin"], + }, + ) + + +class DiscordRolesTest(AuthenticatedTestCase): + authenticate_as_admin = False + roles = ("admin", "High Sharder") + + def test_admin_user(self) -> None: + url = reverse("forms:index") + + resp = self.client.get(url) + content = resp.json() + self.assertEqual(resp.status_code, 200) + self.assertEqual( + content["user"], + { + "authenticated": True, + "user": {"id": self.member.id, "name": self.member.user.username}, + "scopes": ["authenticated", "High Sharder", "discord admin"], + }, + ) + + +# the ultimate power trip... +class DiscordAdminAndFormsAdminTest(AuthenticatedTestCase): + authenticate_as_admin = True + roles = ("admin",) + + def test_admin_user(self) -> None: + url = reverse("forms:index") + + resp = self.client.get(url) + content = resp.json() + self.assertEqual(resp.status_code, 200) + self.assertEqual( + content["user"], + { + "authenticated": True, + "user": {"id": self.member.id, "name": self.member.user.username}, + "scopes": ["authenticated", "admin", "discord admin"], + }, + ) diff --git a/pydis_site/apps/forms/urls.py b/pydis_site/apps/forms/urls.py new file mode 100644 index 000000000..b54629605 --- /dev/null +++ b/pydis_site/apps/forms/urls.py @@ -0,0 +1,7 @@ +from django.urls import path + +from .views import IndexView + + +app_name = "forms" +urlpatterns = (path("", IndexView.as_view(), name="index"),) diff --git a/pydis_site/apps/forms/util.py b/pydis_site/apps/forms/util.py new file mode 100644 index 000000000..e4831e9b8 --- /dev/null +++ b/pydis_site/apps/forms/util.py @@ -0,0 +1,8 @@ +import datetime + +from django.utils import timezone + + +def is_stale(last_update: datetime.datetime, expire_seconds: int) -> bool: + """Check if the given timestamp is stale, if it is considered expired after `expire_seconds` seconds.""" + return (timezone.now() - last_update).total_seconds() > expire_seconds diff --git a/pydis_site/apps/forms/views.py b/pydis_site/apps/forms/views.py new file mode 100644 index 000000000..4c9da8cc2 --- /dev/null +++ b/pydis_site/apps/forms/views.py @@ -0,0 +1,58 @@ +import platform + +from django.conf import settings +from django.http import HttpRequest +from rest_framework.views import APIView +from rest_framework.response import Response + +from .authentication import JWTAuthentication + + +class IndexView(APIView): + """ + Return a generic hello world message with some information to the client. + + Can be used as a healthcheck for Kubernetes or a frontend connection check. + + ## Response format + + The response is a JSON map with the following fields: + + - `message` (`str`): A hello message. + - `client` (`str`): IP address of the connecting client. This might be an + internal load balancer IP. + - `sha` (`str`): Git hash of the current release. + - `node` (`str`): Hostname of the node that processed the request. + - `user`? (`dict`): Carries information about the requesting client. Only + present when the client is authenticated. The following keys are + included: + - `authenticated` (`bool`): Always `True`. + - `user` (`dict`): All user information stored in the requesting JWT + token. + - `scopes` (`list[str]`): A list of JWT scopes the user is authenticated + with. + """ + + authentication_classes = (JWTAuthentication,) + permission_classes = () + + def get(self, request: HttpRequest, format: str | None = None) -> Response: + """Return a hello from Python Discord forms!""" + response_data = { + "message": "Hello, world!", + "client": request.META["REMOTE_ADDR"], + "user": { + "authenticated": False, + }, + "sha": settings.GIT_SHA, + "node": platform.uname().node, + } + + if request.user.is_authenticated: + response_data["user"] = { + "authenticated": True, + "user": request.user.payload, + "scopes": request.auth.scopes, + } + + return Response(response_data) diff --git a/pydis_site/settings.py b/pydis_site/settings.py index e5ae7cbdb..9c8c1917f 100644 --- a/pydis_site/settings.py +++ b/pydis_site/settings.py @@ -33,6 +33,10 @@ GITHUB_TOKEN=(str, None), GITHUB_APP_ID=(str, None), GITHUB_APP_KEY=(str, None), + DISCORD_GUILD_ID=(int, 267624335836053506), + DISCORD_BOT_TOKEN=(str, None), + DISCORD_OAUTH2_CLIENT_ID=(str, None), + DISCORD_OAUTH2_CLIENT_SECRET=(str, None), ) GIT_SHA = env("GIT_SHA") @@ -43,6 +47,15 @@ GITHUB_TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%SZ" """The datetime string format GitHub uses.""" +DISCORD_API_BASE_URL = "https://discord.com/api/v8" +"""Used by forms for Discord API calls.""" + +DISCORD_GUILD_ID = env("DISCORD_GUILD_ID") +DISCORD_BOT_TOKEN = env("DISCORD_BOT_TOKEN") + +DISCORD_OAUTH2_CLIENT_ID = env("DISCORD_OAUTH2_CLIENT_ID") +DISCORD_OAUTH2_CLIENT_SECRET = env("DISCORD_OAUTH2_CLIENT_SECRET") + STATIC_BUILD: bool = env("STATIC_BUILD") if GITHUB_APP_KEY and (key_file := Path(GITHUB_APP_KEY)).is_file(): @@ -72,6 +85,7 @@ if DEBUG: ALLOWED_HOSTS = env.list('ALLOWED_HOSTS', default=['*']) SECRET_KEY = "yellow polkadot bikini" # noqa: S105 + FORMS_SECRET_KEY = SECRET_KEY # Prevent verbose warnings emitted when passing a non-timezone aware # datetime object to the database, whilst we have time zone support @@ -85,6 +99,7 @@ elif 'CI' in os.environ: ALLOWED_HOSTS = ['*'] SECRET_KEY = secrets.token_urlsafe(32) + FORMS_SECRET_KEY = SECRET_KEY # See above. We run with `CI=true`, but debug unset in GitHub Actions, # so we also want to filter it there. @@ -105,6 +120,8 @@ ], ) SECRET_KEY = env('SECRET_KEY') + # TODO: Should be deprecated once all JWTs were rotated. + FORMS_SECRET_KEY = env('FORMS_SECRET_KEY') # Application definition NON_STATIC_APPS = [ @@ -119,6 +136,7 @@ 'pydis_site.apps.resources', 'pydis_site.apps.content', 'pydis_site.apps.events', + 'pydis_site.apps.forms', 'pydis_site.apps.redirect', 'django.contrib.admin', diff --git a/pydis_site/urls.py b/pydis_site/urls.py index 799e86000..85f2adeda 100644 --- a/pydis_site/urls.py +++ b/pydis_site/urls.py @@ -8,6 +8,7 @@ # External API ingress (over the net) path('api/', include('pydis_site.apps.api.urls', namespace='api')), + path('forms/', include('pydis_site.apps.forms.urls', namespace='forms')), # Internal API ingress (cluster local) path('pydis-api/', include('pydis_site.apps.api.urls', namespace='internal_api')), diff --git a/pyproject.toml b/pyproject.toml index a9c04d427..85382827e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -73,6 +73,7 @@ select = ["ANN", "B", "C4", "D", "DJ", "DTZ", "E", "F", "ISC", "INT", "N", "PGH" "pydis_site/apps/**/migrations/*.py" = ["ALL"] "manage.py" = ["T201"] "pydis_site/apps/api/tests/base.py" = ["S106"] +"pydis_site/apps/forms/tests/test_api.py" = ["S106"] "pydis_site/apps/**/tests/test_*.py" = ["ANN", "D"] "static-builds/netlify_build.py" = ["T201"] "pydis_site/apps/api/tests/test_off_topic_channel_names.py" = ["RUF001"]