diff --git a/src/backend/tests/unit/services/authorization/_policy_double.py b/src/backend/tests/unit/services/authorization/_policy_double.py new file mode 100644 index 000000000000..b66e6d097916 --- /dev/null +++ b/src/backend/tests/unit/services/authorization/_policy_double.py @@ -0,0 +1,327 @@ +"""In-test authorization enforcer for OSS RBAC integration tests. + +The OSS :class:`LangflowAuthorizationService` is a pass-through: ``enforce()`` +always returns ``True`` and ``SUPPORTS_CROSS_USER_FETCH`` is ``False``. That makes +it impossible to assert allow/deny semantics against the real routes — every +request is allowed and cross-user fetch never widens. This module supplies a +minimal, dependency-free stand-in that derives allow/deny from the seeded +``authz_role`` / ``authz_role_assignment`` / ``authz_share`` rows. It is enough +to validate that the OSS guard wiring, domain resolution, share-aware fetch, and +``deny_to_404`` masking behave correctly under a *real* allow/deny signal — +without pulling in the EE Casbin package. + +Install it for the duration of a test with :func:`install_policy_authz`, which +swaps the service registered on the service manager so every +``get_authorization_service()`` call site (guards, fetch, listing, helpers) sees +it, and flips ``AUTHZ_ENABLED=True`` / ``AUTHZ_SUPERUSER_BYPASS=False``. +""" + +from __future__ import annotations + +import contextlib +from typing import TYPE_CHECKING, Any +from uuid import UUID + +from langflow.services.database.models.auth import AuthzRole, AuthzRoleAssignment, AuthzShare +from lfx.services.authorization.base import BaseAuthorizationService +from sqlmodel import col, select + +if TYPE_CHECKING: + from collections.abc import Iterator, Sequence + + from lfx.services.settings.service import SettingsService + from sqlmodel.ext.asyncio.session import AsyncSession + +# Actions granted by each AuthzShare.permission_level. ``write`` and ``execute`` +# are independent (each grants ``read`` plus itself); ``admin`` grants the lot. +# Mirrors SharePermissionLevel without baking a questionable write None: + super().__init__() + self.settings_service = settings_service + self.set_ready() + + async def is_enabled(self) -> bool: + return bool(self.settings_service.auth_settings.AUTHZ_ENABLED) + + async def enforce( + self, + *, + user_id: UUID, + domain: str, + obj: str, + act: str, + context: dict[str, Any] | None = None, + ) -> bool: + auth_settings = self.settings_service.auth_settings + if context and context.get("is_superuser") and getattr(auth_settings, "AUTHZ_SUPERUSER_BYPASS", False): + return True + + resource_type, _, resource_id_str = obj.partition(":") + # Each enforce opens its own session — a real plugin reads policy from its + # own connection, independent of the request's transaction. + from langflow.services.deps import session_scope + + async with session_scope() as session: + if await self._role_allows(session, user_id, domain, resource_type, act): + return True + if resource_id_str and resource_id_str != "*": + with contextlib.suppress(ValueError): + resource_id = UUID(str(resource_id_str)) + if await self._share_allows(session, resource_id, user_id, resource_type, act): + return True + return False + + async def batch_enforce( + self, + *, + user_id: UUID, + domain: str, + requests: Sequence[tuple[str, str]], + context: dict[str, Any] | None = None, + ) -> list[bool]: + return [ + await self.enforce(user_id=user_id, domain=domain, obj=obj, act=act, context=context) + for obj, act in requests + ] + + async def _role_allows( + self, + session: AsyncSession, + user_id: UUID, + domain: str, + resource_type: str, + act: str, + ) -> bool: + assignments = ( + await session.exec(select(AuthzRoleAssignment).where(AuthzRoleAssignment.user_id == user_id)) + ).all() + role_ids = [a.role_id for a in assignments if _assignment_covers(a, domain)] + if not role_ids: + return False + roles = (await session.exec(select(AuthzRole).where(col(AuthzRole.id).in_(role_ids)))).all() + needed = f"{resource_type}:{act}" + wildcard = f"{resource_type}:*" + return any( + needed in (role.permissions or []) + or wildcard in (role.permissions or []) + or "*:*" in (role.permissions or []) + for role in roles + ) + + async def _share_allows( + self, + session: AsyncSession, + resource_id: UUID, + user_id: UUID, + resource_type: str, + act: str, + ) -> bool: + shares = ( + await session.exec( + select(AuthzShare).where( + AuthzShare.resource_type == resource_type, + AuthzShare.resource_id == resource_id, + ) + ) + ).all() + for share in shares: + if not _share_targets_user(share, user_id): + continue + if act in _SHARE_LEVEL_ACTIONS.get(share.permission_level, frozenset()): + return True + return False + + +def _assignment_covers(assignment: AuthzRoleAssignment, request_domain: str) -> bool: + """A global assignment covers every domain; a scoped one must match exactly. + + The scoped form is intentionally exact (``{domain_type}:{domain_id}`` == + request domain) so a regression that changes the domain the route resolves + flips the decision and trips the test. A scoped assignment missing its + ``domain_id`` is malformed and never widened to global coverage. + """ + if assignment.domain_type == "global": + return True + if assignment.domain_id is None: + return False + return f"{assignment.domain_type}:{assignment.domain_id}" == request_domain + + +def _share_targets_user(share: AuthzShare, user_id: UUID) -> bool: + if share.scope == "public": + return True + if share.scope == "user": + return str(share.target_id) == str(user_id) + # team scope would need a membership lookup; unused by the current tests. + return False + + +# --------------------------------------------------------------------------- # +# Seeding helpers — write policy rows the enforcer reads. +# --------------------------------------------------------------------------- # + + +async def seed_system_roles(session: AsyncSession) -> dict[str, UUID]: + """Get-or-create viewer/developer/admin roles; return ``{name: role_id}``. + + Preexisting rows (migration seed or another fixture) are overwritten so the + policy matrix asserted by these tests is exactly ``SYSTEM_ROLE_PERMISSIONS``. + """ + ids: dict[str, UUID] = {} + for name, permissions in SYSTEM_ROLE_PERMISSIONS.items(): + existing = (await session.exec(select(AuthzRole).where(AuthzRole.name == name))).first() + if existing is None: + existing = AuthzRole( + name=name, description=f"{name} (test seed)", is_system=True, permissions=list(permissions) + ) + session.add(existing) + await session.flush() + else: + existing.permissions = list(permissions) + existing.is_system = True + ids[name] = existing.id + await session.commit() + return ids + + +async def assign_role( + session: AsyncSession, + *, + user_id: UUID, + role_id: UUID, + domain_type: str = "global", + domain_id: UUID | None = None, +) -> None: + """Bind ``user_id`` to ``role_id`` within an optional domain. + + Non-global assignments must name their domain — a scoped grant without a + ``domain_id`` would silently behave like (or be rejected as) something else + and hide domain-resolution regressions. + """ + if domain_type != "global" and domain_id is None: + msg = f"domain_id is required for non-global assignments (domain_type={domain_type!r})" + raise ValueError(msg) + session.add(AuthzRoleAssignment(user_id=user_id, role_id=role_id, domain_type=domain_type, domain_id=domain_id)) + await session.commit() + + +async def create_user_share( + session: AsyncSession, + *, + resource_type: str, + resource_id: UUID, + target_user_id: UUID, + permission_level: str, + created_by: UUID, +) -> AuthzShare: + """Create a ``scope='user'`` AuthzShare granting ``target_user_id`` access.""" + share = AuthzShare( + resource_type=resource_type, + resource_id=resource_id, + scope="user", + target_id=target_user_id, + permission_level=permission_level, + created_by=created_by, + ) + session.add(share) + await session.commit() + return share + + +@contextlib.contextmanager +def install_policy_authz(settings_service: SettingsService) -> Iterator[PolicyTestAuthorizationService]: + """Install the policy test-double + enable enforcement for the block; restore on exit. + + Swaps the cached authorization service on the service manager (string-enum + keys make ``ServiceType.AUTHORIZATION_SERVICE`` interchangeable across the + lfx/langflow enums) so every ``get_authorization_service()`` resolves to the + double, and flips the auth settings the guards read. + """ + from langflow.services.schema import ServiceType + from lfx.services.manager import get_service_manager + + auth_settings = settings_service.auth_settings + saved_enabled = auth_settings.AUTHZ_ENABLED + saved_bypass = auth_settings.AUTHZ_SUPERUSER_BYPASS + + service_manager = get_service_manager() + previous_service = service_manager.services.get(ServiceType.AUTHORIZATION_SERVICE) + + auth_settings.AUTHZ_ENABLED = True + auth_settings.AUTHZ_SUPERUSER_BYPASS = False + double = PolicyTestAuthorizationService(settings_service) + service_manager.services[ServiceType.AUTHORIZATION_SERVICE] = double + try: + yield double + finally: + if previous_service is not None: + service_manager.services[ServiceType.AUTHORIZATION_SERVICE] = previous_service + else: + service_manager.services.pop(ServiceType.AUTHORIZATION_SERVICE, None) + auth_settings.AUTHZ_ENABLED = saved_enabled + auth_settings.AUTHZ_SUPERUSER_BYPASS = saved_bypass diff --git a/src/backend/tests/unit/services/authorization/test_rbac_enforcement_integration.py b/src/backend/tests/unit/services/authorization/test_rbac_enforcement_integration.py new file mode 100644 index 000000000000..a8fdf9c1a379 --- /dev/null +++ b/src/backend/tests/unit/services/authorization/test_rbac_enforcement_integration.py @@ -0,0 +1,257 @@ +"""End-to-end RBAC enforcement tests driven by an in-test allow/deny enforcer. + +The OSS authorization service is a pass-through (``enforce()`` always allows and +``supports_cross_user_fetch()`` is False), so allow/deny semantics cannot be +asserted against it directly. These tests install :class:`PolicyTestAuthorizationService` +(see ``_policy_double``) with ``AUTHZ_ENABLED=True`` / ``AUTHZ_SUPERUSER_BYPASS=False`` +and exercise the *real* flow routes over HTTP, validating that: + +* the per-route guards (``ensure_flow_permission`` via the ``Authorized*Flow`` + dependencies) actually gate read/write/delete/create/execute by role, +* cross-user denials are masked as 404 (not 403) on fetch routes, +* the share-aware fetch + ``authz_share`` rows grant cross-user access, and +* domain resolution (``_resolve_authz_domain``) scopes a domain-bound grant. + +Removing a guard or regressing domain resolution flips one of these assertions. +Everything runs against the OSS package only — no EE Casbin enforcer required. +""" + +from __future__ import annotations + +from uuid import UUID, uuid4 + +from langflow.services.database.models.flow.model import Flow +from langflow.services.database.models.user.model import User +from langflow.services.deps import get_auth_service, get_settings_service, session_scope + +from ._policy_double import ( + assign_role, + create_user_share, + install_policy_authz, + seed_system_roles, +) + +_PASSWORD = "testpassword" # noqa: S105 — test-only credential # pragma: allowlist secret + + +async def _make_user(username: str) -> UUID: + """Insert an active, non-superuser user and return its id.""" + async with session_scope() as session: + user = User(username=username, password=get_auth_service().get_password_hash(_PASSWORD), is_active=True) + session.add(user) + await session.flush() + user_id = user.id + await session.commit() + return user_id + + +async def _login(client, username: str) -> dict[str, str]: + """Log in and return an Authorization header for ``username``.""" + response = await client.post("api/v1/login", data={"username": username, "password": _PASSWORD}) + assert response.status_code == 200, f"login failed for {username}: {response.text}" + return {"Authorization": f"Bearer {response.json()['access_token']}"} + + +async def _make_flow(owner_id: UUID, name: str, *, workspace_id: UUID | None = None) -> UUID: + """Insert a minimal flow owned by ``owner_id`` and return its id.""" + async with session_scope() as session: + flow = Flow(name=name, user_id=owner_id, workspace_id=workspace_id, data={"nodes": [], "edges": []}) + session.add(flow) + await session.flush() + flow_id = flow.id + await session.commit() + return flow_id + + +async def _seed_roles() -> dict[str, UUID]: + async with session_scope() as session: + return await seed_system_roles(session) + + +async def _role_user( + client, + role_name: str, + role_ids: dict[str, UUID], + *, + domain_type: str = "global", + domain_id: UUID | None = None, +) -> tuple[UUID, dict[str, str]]: + """Create a user, assign ``role_name`` (optionally domain-scoped), return (id, headers).""" + username = f"{role_name}_{uuid4().hex}" + user_id = await _make_user(username) + async with session_scope() as session: + await assign_role( + session, + user_id=user_id, + role_id=role_ids[role_name], + domain_type=domain_type, + domain_id=domain_id, + ) + headers = await _login(client, username) + return user_id, headers + + +# --------------------------------------------------------------------------- # +# Role matrix (Phase 1.11): viewer / developer / admin on flow routes. +# Flows are owned by a separate user so the guards' owner-override does not mask +# the role decision — these assertions exercise the *role*, not ownership. +# --------------------------------------------------------------------------- # + + +async def test_viewer_can_read_and_execute_but_not_write_delete_or_create(client): + role_ids = await _seed_roles() + owner_id = await _make_user(f"owner_{uuid4().hex}") + flow_id = await _make_flow(owner_id, f"flow_{uuid4().hex}") + _viewer_id, headers = await _role_user(client, "viewer", role_ids) + + with install_policy_authz(get_settings_service()): + # read -> allowed + assert (await client.get(f"api/v1/flows/{flow_id}", headers=headers)).status_code == 200 + # execute (build) -> allowed (viewer has flow:execute) + build = await client.post(f"api/v1/build/{flow_id}/flow", headers=headers, json={}) + assert build.status_code == 200, build.text + # write -> denied, masked as 404 (NOT 403) on a fetch route + patch = await client.patch(f"api/v1/flows/{flow_id}", headers=headers, json={"name": f"x_{uuid4().hex}"}) + assert patch.status_code == 404 + # delete -> denied, masked as 404 + assert (await client.delete(f"api/v1/flows/{flow_id}", headers=headers)).status_code == 404 + # create -> denied; 403 is correct here (no existing resource UUID to protect) + create = await client.post( + "api/v1/flows/", headers=headers, json={"name": f"new_{uuid4().hex}", "data": {"nodes": [], "edges": []}} + ) + assert create.status_code == 403 + + +async def test_developer_can_write_and_create_but_not_delete(client): + role_ids = await _seed_roles() + owner_id = await _make_user(f"owner_{uuid4().hex}") + flow_id = await _make_flow(owner_id, f"flow_{uuid4().hex}") + _dev_id, headers = await _role_user(client, "developer", role_ids) + + with install_policy_authz(get_settings_service()): + assert (await client.get(f"api/v1/flows/{flow_id}", headers=headers)).status_code == 200 + # write someone else's flow -> allowed via the developer role (not ownership) + patch = await client.patch(f"api/v1/flows/{flow_id}", headers=headers, json={"name": f"renamed_{uuid4().hex}"}) + assert patch.status_code == 200, patch.text + # create -> allowed + create = await client.post( + "api/v1/flows/", headers=headers, json={"name": f"dev_{uuid4().hex}", "data": {"nodes": [], "edges": []}} + ) + assert create.status_code == 201, create.text + # delete -> denied (developer lacks flow:delete) -> 404 + assert (await client.delete(f"api/v1/flows/{flow_id}", headers=headers)).status_code == 404 + + +async def test_admin_has_full_flow_access(client): + role_ids = await _seed_roles() + owner_id = await _make_user(f"owner_{uuid4().hex}") + flow_id = await _make_flow(owner_id, f"flow_{uuid4().hex}") + _admin_id, headers = await _role_user(client, "admin", role_ids) + + with install_policy_authz(get_settings_service()): + assert (await client.get(f"api/v1/flows/{flow_id}", headers=headers)).status_code == 200 + patch = await client.patch(f"api/v1/flows/{flow_id}", headers=headers, json={"name": f"a_{uuid4().hex}"}) + assert patch.status_code == 200, patch.text + create = await client.post( + "api/v1/flows/", headers=headers, json={"name": f"adm_{uuid4().hex}", "data": {"nodes": [], "edges": []}} + ) + assert create.status_code == 201, create.text + # delete -> allowed (admin has flow:delete) + assert (await client.delete(f"api/v1/flows/{flow_id}", headers=headers)).status_code == 200 + # the flow is gone -> now 404 for everyone (sanity) + assert (await client.get(f"api/v1/flows/{flow_id}", headers=headers)).status_code == 404 + + +# --------------------------------------------------------------------------- # +# Share lifecycle (Phase 3.13): Alice shares a flow with Bob. +# --------------------------------------------------------------------------- # + + +async def test_share_grants_cross_user_access_and_absence_is_404(client): + settings = get_settings_service() + alice_id = await _make_user(f"alice_{uuid4().hex}") + bob_username = f"bob_{uuid4().hex}" + bob_id = await _make_user(bob_username) + flow_id = await _make_flow(alice_id, f"aliceflow_{uuid4().hex}") + bob_headers = await _login(client, bob_username) + + # Without a share, Bob cannot reach Alice's flow at all — and the denial is a + # 404 (UUID-privacy mask), not a 403, on every fetch route. + with install_policy_authz(settings): + assert (await client.get(f"api/v1/flows/{flow_id}", headers=bob_headers)).status_code == 404 + assert ( + await client.patch(f"api/v1/flows/{flow_id}", headers=bob_headers, json={"name": "x"}) + ).status_code == 404 + assert (await client.post(f"api/v1/build/{flow_id}/flow", headers=bob_headers, json={})).status_code == 404 + + # Alice grants Bob an admin-level share (read + write + execute). + async with session_scope() as session: + await create_user_share( + session, + resource_type="flow", + resource_id=flow_id, + target_user_id=bob_id, + permission_level="admin", + created_by=alice_id, + ) + + with install_policy_authz(settings): + assert (await client.get(f"api/v1/flows/{flow_id}", headers=bob_headers)).status_code == 200 + patch = await client.patch(f"api/v1/flows/{flow_id}", headers=bob_headers, json={"name": f"bob_{uuid4().hex}"}) + assert patch.status_code == 200, patch.text + build = await client.post(f"api/v1/build/{flow_id}/flow", headers=bob_headers, json={}) + assert build.status_code == 200, build.text + + +async def test_read_only_share_allows_get_but_denies_write_and_execute(client): + """A read-level share grants GET but neither PATCH nor build — permission_level is enforced, not mere presence.""" + settings = get_settings_service() + alice_id = await _make_user(f"alice_{uuid4().hex}") + bob_username = f"bob_{uuid4().hex}" + bob_id = await _make_user(bob_username) + flow_id = await _make_flow(alice_id, f"aliceflow_{uuid4().hex}") + bob_headers = await _login(client, bob_username) + + async with session_scope() as session: + await create_user_share( + session, + resource_type="flow", + resource_id=flow_id, + target_user_id=bob_id, + permission_level="read", + created_by=alice_id, + ) + + with install_policy_authz(settings): + assert (await client.get(f"api/v1/flows/{flow_id}", headers=bob_headers)).status_code == 200 + # write is not granted by a read-level share -> deny -> 404 + patch = await client.patch(f"api/v1/flows/{flow_id}", headers=bob_headers, json={"name": "nope"}) + assert patch.status_code == 404 + # execute is modeled independently from write — a read-level share must + # not grant build either -> deny -> 404 + build = await client.post(f"api/v1/build/{flow_id}/flow", headers=bob_headers, json={}) + assert build.status_code == 404 + + +# --------------------------------------------------------------------------- # +# Domain resolution: a workspace-scoped grant must only apply in its workspace. +# --------------------------------------------------------------------------- # + + +async def test_domain_scoped_role_applies_only_in_matching_domain(client): + role_ids = await _seed_roles() + owner_id = await _make_user(f"owner_{uuid4().hex}") + workspace_a = uuid4() + workspace_b = uuid4() + flow_a = await _make_flow(owner_id, f"a_{uuid4().hex}", workspace_id=workspace_a) + flow_b = await _make_flow(owner_id, f"b_{uuid4().hex}", workspace_id=workspace_b) + # viewer scoped to workspace A only. + _viewer_id, headers = await _role_user(client, "viewer", role_ids, domain_type="workspace", domain_id=workspace_a) + + with install_policy_authz(get_settings_service()): + # flow A resolves to domain workspace:{A} -> grant covers -> read allowed. + assert (await client.get(f"api/v1/flows/{flow_a}", headers=headers)).status_code == 200 + # flow B resolves to workspace:{B} -> grant does NOT cover -> denied -> 404. + # (If domain resolution regressed to '*', the workspace-A grant would stop + # matching flow A and the assertion above would fail instead.) + assert (await client.get(f"api/v1/flows/{flow_b}", headers=headers)).status_code == 404