Skip to content

feat(aci): Update API to allow filter monitors by assignee #95501

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from collections.abc import Iterable, Sequence
from functools import partial
from typing import assert_never

from django.db import router, transaction
from django.db.models import Count, Q
Expand All @@ -17,6 +19,7 @@
from sentry.api.event_search import SearchConfig, SearchFilter, SearchKey, default_config
from sentry.api.event_search import parse_search_query as base_parse_search_query
from sentry.api.exceptions import ResourceDoesNotExist
from sentry.api.issue_search import convert_actor_or_none_value
from sentry.api.paginator import OffsetPaginator
from sentry.api.serializers import serialize
from sentry.apidocs.constants import (
Expand All @@ -30,6 +33,9 @@
from sentry.issues import grouptype
from sentry.models.organization import Organization
from sentry.models.project import Project
from sentry.models.team import Team
from sentry.users.models.user import User
from sentry.users.services.user import RpcUser
from sentry.workflow_engine.endpoints.serializers import DetectorSerializer
from sentry.workflow_engine.endpoints.utils.filters import apply_filter
from sentry.workflow_engine.endpoints.utils.sortby import SortByParam
Expand All @@ -43,12 +49,33 @@
detector_search_config = SearchConfig.create_from(
default_config,
text_operator_keys={"name", "type"},
allowed_keys={"name", "type"},
allowed_keys={"name", "type", "assignee"},
allow_boolean=False,
free_text_key="query",
)
parse_detector_query = partial(base_parse_search_query, config=detector_search_config)


def convert_assignee_values(value: Iterable[str], projects: Sequence[Project], user: User) -> Q:
"""
Convert an assignee search value to a Django Q object for filtering detectors.
"""
actors_or_none: list[RpcUser | Team | None] = convert_actor_or_none_value(
value, projects, user, None
)
assignee_query = Q()
for actor in actors_or_none:
if isinstance(actor, (User, RpcUser)):
assignee_query |= Q(owner_user_id=actor.id)
elif isinstance(actor, Team):
assignee_query |= Q(owner_team_id=actor.id)
elif actor is None:
assignee_query |= Q(owner_team_id__isnull=True, owner_user_id__isnull=True)
else:
assert_never(actor)
return assignee_query


# Maps API field name to database field name, with synthetic aggregate fields keeping
# to our field naming scheme for consistency.
SORT_ATTRS = {
Expand Down Expand Up @@ -118,6 +145,9 @@ def get(self, request: Request, organization: Organization) -> Response:
`````````````````````````````
Return a list of detectors for a given organization.
"""
if not request.user.is_authenticated:
return self.respond(status=status.HTTP_401_UNAUTHORIZED)

projects = self.get_projects(request, organization)
queryset: QuerySet[Detector] = Detector.objects.filter(
project_id__in=projects,
Expand All @@ -138,6 +168,19 @@ def get(self, request: Request, organization: Organization) -> Response:
queryset = apply_filter(queryset, filter, "name")
case SearchFilter(key=SearchKey("type"), operator=("=" | "IN" | "!=")):
queryset = apply_filter(queryset, filter, "type")
case SearchFilter(key=SearchKey("assignee"), operator=("=" | "IN" | "!=")):
# Filter values can be emails, team slugs, "me", "my_teams", "none"
values = (
filter.value.value
if isinstance(filter.value.value, list)
else [filter.value.value]
)
assignee_q = convert_assignee_values(values, projects, request.user)

if filter.operator == "!=":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: is '!=' defined as a literal anywhere? just a little nervous about 'magic' strings.

Copy link
Member Author

@leedongwei leedongwei Jul 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately not. It's a repeated pattern in the codebase seen in other parts of the product too.

queryset = queryset.exclude(assignee_q)
else:
queryset = queryset.filter(assignee_q)
case SearchFilter(key=SearchKey("query"), operator="="):
# 'query' is our free text key; all free text gets returned here
# as '=', and we search any relevant fields for it.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from unittest import mock

from django.db.models import Q
from rest_framework.exceptions import ErrorDetail

from sentry.api.serializers import serialize
from sentry.grouping.grouptype import ErrorGroupType
from sentry.incidents.grouptype import MetricIssue
from sentry.incidents.models.alert_rule import AlertRuleDetectionType
from sentry.models.environment import Environment
from sentry.search.utils import _HACKY_INVALID_USER
from sentry.snuba.dataset import Dataset
from sentry.snuba.models import (
QuerySubscription,
Expand All @@ -19,6 +21,7 @@
from sentry.testutils.silo import region_silo_test
from sentry.uptime.grouptype import UptimeDomainCheckFailure
from sentry.uptime.types import DATA_SOURCE_UPTIME_SUBSCRIPTION
from sentry.workflow_engine.endpoints.organization_detector_index import convert_assignee_values
from sentry.workflow_engine.models import DataCondition, DataConditionGroup, DataSource, Detector
from sentry.workflow_engine.models.data_condition import Condition
from sentry.workflow_engine.models.detector_workflow import DetectorWorkflow
Expand Down Expand Up @@ -275,6 +278,191 @@ def test_general_query(self):
)
assert {d["name"] for d in response3.data} == {detector.name, detector2.name}

def test_query_by_assignee_user_email(self):
user = self.create_user(email="[email protected]")
self.create_member(organization=self.organization, user=user)

assigned_detector = self.create_detector(
project_id=self.project.id,
name="Assigned Detector",
type=MetricIssue.slug,
owner_user_id=user.id,
)
self.create_detector(
project_id=self.project.id,
name="Unassigned Detector",
type=MetricIssue.slug,
)

response = self.get_success_response(
self.organization.slug,
qs_params={"project": self.project.id, "query": f"assignee:{user.email}"},
)
assert {d["name"] for d in response.data} == {assigned_detector.name}

def test_query_by_assignee_user_username(self):
user = self.create_user(username="testuser")
self.create_member(organization=self.organization, user=user)

assigned_detector = self.create_detector(
project_id=self.project.id,
name="Assigned Detector",
type=MetricIssue.slug,
owner_user_id=user.id,
)
self.create_detector(
project_id=self.project.id,
name="Unassigned Detector",
type=MetricIssue.slug,
)

response = self.get_success_response(
self.organization.slug,
qs_params={"project": self.project.id, "query": f"assignee:{user.username}"},
)
assert {d["name"] for d in response.data} == {assigned_detector.name}

def test_query_by_assignee_team(self):
team = self.create_team(organization=self.organization, slug="test-team")
self.project.add_team(team)

assigned_detector = self.create_detector(
project_id=self.project.id,
name="Team Detector",
type=MetricIssue.slug,
owner_team_id=team.id,
)
self.create_detector(
project_id=self.project.id,
name="Unassigned Detector",
type=MetricIssue.slug,
)

response = self.get_success_response(
self.organization.slug,
qs_params={"project": self.project.id, "query": f"assignee:#{team.slug}"},
)
assert {d["name"] for d in response.data} == {assigned_detector.name}

def test_query_by_assignee_me(self):
self.login_as(user=self.user)

assigned_detector = self.create_detector(
project_id=self.project.id,
name="My Detector",
type=MetricIssue.slug,
owner_user_id=self.user.id,
)
self.create_detector(
project_id=self.project.id,
name="Other Detector",
type=MetricIssue.slug,
)

response = self.get_success_response(
self.organization.slug,
qs_params={"project": self.project.id, "query": "assignee:me"},
)
assert {d["name"] for d in response.data} == {assigned_detector.name}

def test_query_by_assignee_none(self):
user = self.create_user()
self.create_member(organization=self.organization, user=user)
team = self.create_team(organization=self.organization)

self.create_detector(
project_id=self.project.id,
name="User Assigned",
type=MetricIssue.slug,
owner_user_id=user.id,
)
self.create_detector(
project_id=self.project.id,
name="Team Assigned",
type=MetricIssue.slug,
owner_team_id=team.id,
)
unassigned_detector = self.create_detector(
project_id=self.project.id,
name="Unassigned Detector",
type=MetricIssue.slug,
)

response = self.get_success_response(
self.organization.slug,
qs_params={"project": self.project.id, "query": "assignee:none"},
)
assert {d["name"] for d in response.data} == {unassigned_detector.name}

def test_query_by_assignee_multiple_values(self):
user = self.create_user(email="[email protected]")
self.create_member(organization=self.organization, user=user)
team = self.create_team(organization=self.organization, slug="test-team")
self.project.add_team(team)

detector1 = self.create_detector(
project_id=self.project.id,
name="Detector 1",
type=MetricIssue.slug,
owner_user_id=user.id,
)
detector2 = self.create_detector(
project_id=self.project.id,
name="Detector 2",
type=MetricIssue.slug,
owner_team_id=team.id,
)
self.create_detector(
project_id=self.project.id,
name="Other Detector",
type=MetricIssue.slug,
)

response = self.get_success_response(
self.organization.slug,
qs_params={
"project": self.project.id,
"query": f"assignee:[{user.email}, #{team.slug}]",
},
)
assert {d["name"] for d in response.data} == {detector1.name, detector2.name}

def test_query_by_assignee_negation(self):
user = self.create_user(email="[email protected]")
self.create_member(organization=self.organization, user=user)

self.create_detector(
project_id=self.project.id,
name="Excluded Detector",
type=MetricIssue.slug,
owner_user_id=user.id,
)
included_detector = self.create_detector(
project_id=self.project.id,
name="Included Detector",
type=MetricIssue.slug,
)

response = self.get_success_response(
self.organization.slug,
qs_params={"project": self.project.id, "query": f"!assignee:{user.email}"},
)
assert {d["name"] for d in response.data} == {included_detector.name}

def test_query_by_assignee_invalid_user(self):
self.create_detector(
project_id=self.project.id,
name="Valid Detector",
type=MetricIssue.slug,
)

# Query with non-existent user should return no results
response = self.get_success_response(
self.organization.slug,
qs_params={"project": self.project.id, "query": "assignee:[email protected]"},
)
assert len(response.data) == 0


@region_silo_test
@apply_feature_flag_on_cls("organizations:incidents")
Expand Down Expand Up @@ -610,3 +798,69 @@ def test_owner_not_in_organization(self):
status_code=400,
)
assert "owner" in response.data


@region_silo_test
class ConvertAssigneeValuesTest(APITestCase):
"""Test the convert_assignee_values function"""

def setUp(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for us to have a test that actually filters by assignee?

super().setUp()
self.user = self.create_user()
self.team = self.create_team(organization=self.organization)
self.other_user = self.create_user()
self.create_member(organization=self.organization, user=self.other_user)
self.projects = [self.project]

def test_convert_assignee_values_user_email(self):
result = convert_assignee_values([self.user.email], self.projects, self.user)
expected = Q(owner_user_id=self.user.id)
self.assertEqual(str(result), str(expected))

def test_convert_assignee_values_user_username(self):
result = convert_assignee_values([self.user.username], self.projects, self.user)
expected = Q(owner_user_id=self.user.id)
self.assertEqual(str(result), str(expected))

def test_convert_assignee_values_team_slug(self):
result = convert_assignee_values([f"#{self.team.slug}"], self.projects, self.user)
expected = Q(owner_team_id=self.team.id)
self.assertEqual(str(result), str(expected))

def test_convert_assignee_values_me(self):
result = convert_assignee_values(["me"], self.projects, self.user)
expected = Q(owner_user_id=self.user.id)
self.assertEqual(str(result), str(expected))

def test_convert_assignee_values_none(self):
result = convert_assignee_values(["none"], self.projects, self.user)
expected = Q(owner_team_id__isnull=True, owner_user_id__isnull=True)
self.assertEqual(str(result), str(expected))

def test_convert_assignee_values_multiple(self):
result = convert_assignee_values(
[str(self.user.email), f"#{self.team.slug}"], self.projects, self.user
)
expected = Q(owner_user_id=self.user.id) | Q(owner_team_id=self.team.id)
self.assertEqual(str(result), str(expected))

def test_convert_assignee_values_mixed(self):
result = convert_assignee_values(
["me", "none", f"#{self.team.slug}"], self.projects, self.user
)
expected = (
Q(owner_user_id=self.user.id)
| Q(owner_team_id__isnull=True, owner_user_id__isnull=True)
| Q(owner_team_id=self.team.id)
)
self.assertEqual(str(result), str(expected))

def test_convert_assignee_values_invalid(self):
result = convert_assignee_values(["999999"], self.projects, self.user)
expected = Q(owner_user_id=_HACKY_INVALID_USER.id)
self.assertEqual(str(result), str(expected))

def test_convert_assignee_values_empty(self):
result = convert_assignee_values([], self.projects, self.user)
expected = Q()
self.assertEqual(str(result), str(expected))
Loading