Skip to content

ref(profiling): implement chunked profile candidate query for flamegraph #95873

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jul 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/sentry/features/temporary.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ def register_temporary_features(manager: FeatureManager):
manager.add("projects:continuous-profiling-vroomrs-processing", ProjectFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False)
# Enable transaction profiles processing with vroomrs
manager.add("projects:transaction-profiling-vroomrs-processing", ProjectFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False)
# Enable querying profile candidates with exponentially growing datetime range chunks
manager.add("organizations:profiling-flamegraph-use-increased-chunks-query-strategy", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False)
# Enable daily summary
manager.add("organizations:daily-summary", OrganizationFeature, FeatureHandlerStrategy.INTERNAL, api_expose=False)
# Enables import/export functionality for dashboards
Expand Down
18 changes: 18 additions & 0 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -2556,6 +2556,24 @@
flags=FLAG_AUTOMATOR_MODIFIABLE,
)

# the duration of the first datetime chunk of data queried
# expressed in hours.
register(
"profiling.flamegraph.query.initial_chunk_delta.hours",
type=Int,
default=12,
flags=FLAG_AUTOMATOR_MODIFIABLE,
)

# the max duration of any datetime chunk of data queried
# expressed in hours.
register(
"profiling.flamegraph.query.max_delta.hours",
type=Int,
default=48,
flags=FLAG_AUTOMATOR_MODIFIABLE,
)

# list of platform names for which we allow using unsampled profiles for the purpose
# of improving profile (function) metrics
register(
Expand Down
245 changes: 245 additions & 0 deletions src/sentry/profiles/flamegraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,18 @@ def get_profile_candidates(self) -> ProfileCandidates:
if self.data_source == "functions":
return self.get_profile_candidates_from_functions()
elif self.data_source == "transactions":
if features.has(
"organizations:profiling-flamegraph-use-increased-chunks-query-strategy",
self.snuba_params.organization,
):
return self.get_profile_candidates_from_transactions_v2()
return self.get_profile_candidates_from_transactions()
elif self.data_source == "profiles":
if features.has(
"organizations:profiling-flamegraph-use-increased-chunks-query-strategy",
self.snuba_params.organization,
):
return self.get_profile_candidates_from_profiles_v2()
return self.get_profile_candidates_from_profiles()
elif self.data_source == "spans":
return self.get_profile_candidates_from_spans()
Expand Down Expand Up @@ -222,6 +232,76 @@ def get_profile_candidates_from_transactions(self) -> ProfileCandidates:
"continuous": continuous_profile_candidates,
}

def get_profile_candidates_from_transactions_v2(self) -> ProfileCandidates:
max_profiles = options.get("profiling.flamegraph.profile-set.size")
initial_chunk_delta_hours = options.get(
"profiling.flamegraph.query.initial_chunk_delta.hours"
)
max_chunk_delta_hours = options.get("profiling.flamegraph.query.max_delta.hours")

initial_chunk_delta = timedelta(hours=initial_chunk_delta_hours)
max_chunk_delta = timedelta(hours=max_chunk_delta_hours)

transaction_profile_candidates: list[TransactionProfileCandidate] = []
profiler_metas: list[ProfilerMeta] = []

assert self.snuba_params.start is not None and self.snuba_params.end is not None
original_start, original_end = self.snuba_params.start, self.snuba_params.end

for chunk_start, chunk_end in split_datetime_range_exponential(
original_start, original_end, initial_chunk_delta, max_chunk_delta
):
self.snuba_params.start = chunk_start
self.snuba_params.end = chunk_end

builder = self.get_transactions_based_candidate_query(
query=self.query, limit=max_profiles
)

results = builder.run_query(
Referrer.API_PROFILING_PROFILE_FLAMEGRAPH_TRANSACTION_CANDIDATES.value,
)
results = builder.process_results(results)

for row in results["data"]:
if row["profile.id"] is not None:
transaction_profile_candidates.append(
{
"project_id": row["project.id"],
"profile_id": row["profile.id"],
}
)
elif row["profiler.id"] is not None and row["thread.id"]:
profiler_metas.append(
ProfilerMeta(
project_id=row["project.id"],
profiler_id=row["profiler.id"],
thread_id=row["thread.id"],
start=row["precise.start_ts"],
end=row["precise.finish_ts"],
transaction_id=row["id"],
)
)
if len(transaction_profile_candidates) >= max_profiles:
break

max_continuous_profile_candidates = max(
max_profiles - len(transaction_profile_candidates), 0
)

continuous_profile_candidates: list[ContinuousProfileCandidate] = []

if max_continuous_profile_candidates > 0:
continuous_profile_candidates, _ = self.get_chunks_for_profilers(
profiler_metas,
max_continuous_profile_candidates,
)

return {
"transaction": transaction_profile_candidates,
"continuous": continuous_profile_candidates,
}

def get_transactions_based_candidate_query(
self, query: str | None, limit: int
) -> DiscoverQueryBuilder:
Expand Down Expand Up @@ -547,6 +627,171 @@ def get_profile_candidates_from_profiles(self) -> ProfileCandidates:
"continuous": continuous_profile_candidates,
}

def get_profile_candidates_from_profiles_v2(self) -> ProfileCandidates:
if self.snuba_params.organization is None:
raise ValueError("`organization` is required and cannot be `None`")

max_profiles = options.get("profiling.flamegraph.profile-set.size")
initial_chunk_delta_hours = options.get(
"profiling.flamegraph.query.initial_chunk_delta.hours"
)
max_chunk_delta_hours = options.get("profiling.flamegraph.query.max_delta.hours")

initial_chunk_delta = timedelta(hours=initial_chunk_delta_hours)
max_chunk_delta = timedelta(hours=max_chunk_delta_hours)

referrer = Referrer.API_PROFILING_PROFILE_FLAMEGRAPH_PROFILE_CANDIDATES.value
transaction_profile_candidates: list[TransactionProfileCandidate] = []
profiler_metas: list[ProfilerMeta] = []

assert self.snuba_params.start is not None and self.snuba_params.end is not None
original_start, original_end = self.snuba_params.start, self.snuba_params.end

for chunk_start, chunk_end in split_datetime_range_exponential(
original_start, original_end, initial_chunk_delta, max_chunk_delta
):
self.snuba_params.start = chunk_start
self.snuba_params.end = chunk_end

builder = self.get_transactions_based_candidate_query(
query=self.query, limit=max_profiles
)
results = builder.run_query(referrer)
results = builder.process_results(results)

for row in results["data"]:
if row["profile.id"] is not None:
transaction_profile_candidates.append(
{
"project_id": row["project.id"],
"profile_id": row["profile.id"],
}
)
elif row["profiler.id"] is not None and row["thread.id"]:
profiler_metas.append(
ProfilerMeta(
project_id=row["project.id"],
profiler_id=row["profiler.id"],
thread_id=row["thread.id"],
start=row["precise.start_ts"],
end=row["precise.finish_ts"],
transaction_id=row["id"],
)
)

if len(transaction_profile_candidates) + len(profiler_metas) >= max_profiles:
break

max_continuous_profile_candidates = max(
max_profiles - len(transaction_profile_candidates), 0
)

continuous_profile_candidates: list[ContinuousProfileCandidate] = []
continuous_duration = 0.0

# If there are continuous profiles attached to transactions, we prefer those as
# the active thread id gives us more user friendly flamegraphs than without.
if profiler_metas and max_continuous_profile_candidates > 0:
continuous_profile_candidates, continuous_duration = self.get_chunks_for_profilers(
profiler_metas, max_continuous_profile_candidates
)

seen_chunks = {
(candidate["profiler_id"], candidate["chunk_id"])
for candidate in continuous_profile_candidates
}

always_use_direct_chunks = features.has(
"organizations:profiling-flamegraph-always-use-direct-chunks",
self.snuba_params.organization,
actor=self.request.user,
)

# If we still don't have enough continuous profile candidates + transaction profile candidates,
# we'll fall back to directly using the continuous profiling data
if (
len(continuous_profile_candidates) + len(transaction_profile_candidates) < max_profiles
and always_use_direct_chunks
):
Comment on lines +710 to +715
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I don't remember how well we dedupe profile chunks so this might result in misleading flamegraphs
  2. I'm not fully convinced this condition is enough in higher throughput situations. Though most of the time, the problem we're trying to resolve by directly looking at chunks is in low throughput situations so it may be okay.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. we don't dedupe in vroom so, if we were to send a duplicate in the list it would be used twice for the aggregation, but here we prevent that by keeping track of seen_chunks. So, if we fall in this branch, we'll still check that set and only add chunks that were not previously added and avoid adding duplicates.
  2. agree. I'm mainly worried about:
  • lower throughput situations
  • still keeping the max number of chunks/profiles <= max_profiles

Anyway as you mentioned in another comment, we can give it a try to see how it works and later iterate to improve it.

total_duration = continuous_duration if always_use_direct_chunks else 0.0
max_duration = options.get("profiling.continuous-profiling.flamegraph.max-seconds")

conditions = []
conditions.append(Condition(Column("project_id"), Op.IN, self.snuba_params.project_ids))
conditions.append(
Condition(Column("start_timestamp"), Op.LT, resolve_datetime64(original_end))
)
conditions.append(
Condition(Column("end_timestamp"), Op.GTE, resolve_datetime64(original_start))
)
environments = self.snuba_params.environment_names
if environments:
conditions.append(Condition(Column("environment"), Op.IN, environments))

continuous_profiles_query = Query(
match=Storage(StorageKey.ProfileChunks.value),
select=[
Column("project_id"),
Column("profiler_id"),
Column("chunk_id"),
Column("start_timestamp"),
Column("end_timestamp"),
],
where=conditions,
orderby=[OrderBy(Column("start_timestamp"), Direction.DESC)],
limit=Limit(max_profiles),
)

all_results = bulk_snuba_queries(
[
Request(
dataset=Dataset.Profiles.value,
app_id="default",
query=continuous_profiles_query,
tenant_ids={
"referrer": referrer,
"organization_id": self.snuba_params.organization.id,
},
),
],
referrer,
)

continuous_profile_results = all_results[0]

for row in continuous_profile_results["data"]:

# Make sure to dedupe profile chunks so we don't reuse chunks
if (row["profiler_id"], row["chunk_id"]) in seen_chunks:
continue

start_timestamp = datetime.fromisoformat(row["start_timestamp"]).timestamp()
end_timestamp = datetime.fromisoformat(row["end_timestamp"]).timestamp()

candidate: ContinuousProfileCandidate = {
"project_id": row["project_id"],
"profiler_id": row["profiler_id"],
"chunk_id": row["chunk_id"],
"start": str(int(start_timestamp * 1e9)),
"end": str(int(end_timestamp * 1e9)),
}

continuous_profile_candidates.append(candidate)

total_duration += end_timestamp - start_timestamp

# can set max duration to negative to skip this check
if (max_duration >= 0 and total_duration >= max_duration) or (
len(continuous_profile_candidates) + len(transaction_profile_candidates)
>= max_profiles
):
break

return {
"transaction": transaction_profile_candidates,
"continuous": continuous_profile_candidates,
}

def get_profile_candidates_from_spans(self) -> ProfileCandidates:
max_profiles = options.get("profiling.flamegraph.profile-set.size")
results = self.get_spans_based_candidates(query=self.query, limit=max_profiles)
Expand Down
Loading