diff --git a/src/sentry/features/temporary.py b/src/sentry/features/temporary.py index 2a51ebb37c159a..38963f1cd98a7e 100644 --- a/src/sentry/features/temporary.py +++ b/src/sentry/features/temporary.py @@ -88,6 +88,8 @@ def register_temporary_features(manager: FeatureManager): manager.add("projects:continuous-profiling-vroomrs-processing", ProjectFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False) # Enable transaction profiles processing with vroomrs manager.add("projects:transaction-profiling-vroomrs-processing", ProjectFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False) + # Enable querying profile candidates with exponentially growing datetime range chunks + manager.add("organizations:profiling-flamegraph-use-increased-chunks-query-strategy", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False) # Enable daily summary manager.add("organizations:daily-summary", OrganizationFeature, FeatureHandlerStrategy.INTERNAL, api_expose=False) # Enables import/export functionality for dashboards diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index af1b90f1abfafa..caf38dcffab976 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -2556,6 +2556,24 @@ flags=FLAG_AUTOMATOR_MODIFIABLE, ) +# the duration of the first datetime chunk of data queried +# expressed in hours. +register( + "profiling.flamegraph.query.initial_chunk_delta.hours", + type=Int, + default=12, + flags=FLAG_AUTOMATOR_MODIFIABLE, +) + +# the max duration of any datetime chunk of data queried +# expressed in hours. +register( + "profiling.flamegraph.query.max_delta.hours", + type=Int, + default=48, + flags=FLAG_AUTOMATOR_MODIFIABLE, +) + # list of platform names for which we allow using unsampled profiles for the purpose # of improving profile (function) metrics register( diff --git a/src/sentry/profiles/flamegraph.py b/src/sentry/profiles/flamegraph.py index 745bb723daf98c..523cb1866bfe00 100644 --- a/src/sentry/profiles/flamegraph.py +++ b/src/sentry/profiles/flamegraph.py @@ -102,8 +102,18 @@ def get_profile_candidates(self) -> ProfileCandidates: if self.data_source == "functions": return self.get_profile_candidates_from_functions() elif self.data_source == "transactions": + if features.has( + "organizations:profiling-flamegraph-use-increased-chunks-query-strategy", + self.snuba_params.organization, + ): + return self.get_profile_candidates_from_transactions_v2() return self.get_profile_candidates_from_transactions() elif self.data_source == "profiles": + if features.has( + "organizations:profiling-flamegraph-use-increased-chunks-query-strategy", + self.snuba_params.organization, + ): + return self.get_profile_candidates_from_profiles_v2() return self.get_profile_candidates_from_profiles() elif self.data_source == "spans": return self.get_profile_candidates_from_spans() @@ -222,6 +232,76 @@ def get_profile_candidates_from_transactions(self) -> ProfileCandidates: "continuous": continuous_profile_candidates, } + def get_profile_candidates_from_transactions_v2(self) -> ProfileCandidates: + max_profiles = options.get("profiling.flamegraph.profile-set.size") + initial_chunk_delta_hours = options.get( + "profiling.flamegraph.query.initial_chunk_delta.hours" + ) + max_chunk_delta_hours = options.get("profiling.flamegraph.query.max_delta.hours") + + initial_chunk_delta = timedelta(hours=initial_chunk_delta_hours) + max_chunk_delta = timedelta(hours=max_chunk_delta_hours) + + transaction_profile_candidates: list[TransactionProfileCandidate] = [] + profiler_metas: list[ProfilerMeta] = [] + + assert self.snuba_params.start is not None and self.snuba_params.end is not None + original_start, original_end = self.snuba_params.start, self.snuba_params.end + + for chunk_start, chunk_end in split_datetime_range_exponential( + original_start, original_end, initial_chunk_delta, max_chunk_delta + ): + self.snuba_params.start = chunk_start + self.snuba_params.end = chunk_end + + builder = self.get_transactions_based_candidate_query( + query=self.query, limit=max_profiles + ) + + results = builder.run_query( + Referrer.API_PROFILING_PROFILE_FLAMEGRAPH_TRANSACTION_CANDIDATES.value, + ) + results = builder.process_results(results) + + for row in results["data"]: + if row["profile.id"] is not None: + transaction_profile_candidates.append( + { + "project_id": row["project.id"], + "profile_id": row["profile.id"], + } + ) + elif row["profiler.id"] is not None and row["thread.id"]: + profiler_metas.append( + ProfilerMeta( + project_id=row["project.id"], + profiler_id=row["profiler.id"], + thread_id=row["thread.id"], + start=row["precise.start_ts"], + end=row["precise.finish_ts"], + transaction_id=row["id"], + ) + ) + if len(transaction_profile_candidates) >= max_profiles: + break + + max_continuous_profile_candidates = max( + max_profiles - len(transaction_profile_candidates), 0 + ) + + continuous_profile_candidates: list[ContinuousProfileCandidate] = [] + + if max_continuous_profile_candidates > 0: + continuous_profile_candidates, _ = self.get_chunks_for_profilers( + profiler_metas, + max_continuous_profile_candidates, + ) + + return { + "transaction": transaction_profile_candidates, + "continuous": continuous_profile_candidates, + } + def get_transactions_based_candidate_query( self, query: str | None, limit: int ) -> DiscoverQueryBuilder: @@ -547,6 +627,171 @@ def get_profile_candidates_from_profiles(self) -> ProfileCandidates: "continuous": continuous_profile_candidates, } + def get_profile_candidates_from_profiles_v2(self) -> ProfileCandidates: + if self.snuba_params.organization is None: + raise ValueError("`organization` is required and cannot be `None`") + + max_profiles = options.get("profiling.flamegraph.profile-set.size") + initial_chunk_delta_hours = options.get( + "profiling.flamegraph.query.initial_chunk_delta.hours" + ) + max_chunk_delta_hours = options.get("profiling.flamegraph.query.max_delta.hours") + + initial_chunk_delta = timedelta(hours=initial_chunk_delta_hours) + max_chunk_delta = timedelta(hours=max_chunk_delta_hours) + + referrer = Referrer.API_PROFILING_PROFILE_FLAMEGRAPH_PROFILE_CANDIDATES.value + transaction_profile_candidates: list[TransactionProfileCandidate] = [] + profiler_metas: list[ProfilerMeta] = [] + + assert self.snuba_params.start is not None and self.snuba_params.end is not None + original_start, original_end = self.snuba_params.start, self.snuba_params.end + + for chunk_start, chunk_end in split_datetime_range_exponential( + original_start, original_end, initial_chunk_delta, max_chunk_delta + ): + self.snuba_params.start = chunk_start + self.snuba_params.end = chunk_end + + builder = self.get_transactions_based_candidate_query( + query=self.query, limit=max_profiles + ) + results = builder.run_query(referrer) + results = builder.process_results(results) + + for row in results["data"]: + if row["profile.id"] is not None: + transaction_profile_candidates.append( + { + "project_id": row["project.id"], + "profile_id": row["profile.id"], + } + ) + elif row["profiler.id"] is not None and row["thread.id"]: + profiler_metas.append( + ProfilerMeta( + project_id=row["project.id"], + profiler_id=row["profiler.id"], + thread_id=row["thread.id"], + start=row["precise.start_ts"], + end=row["precise.finish_ts"], + transaction_id=row["id"], + ) + ) + + if len(transaction_profile_candidates) + len(profiler_metas) >= max_profiles: + break + + max_continuous_profile_candidates = max( + max_profiles - len(transaction_profile_candidates), 0 + ) + + continuous_profile_candidates: list[ContinuousProfileCandidate] = [] + continuous_duration = 0.0 + + # If there are continuous profiles attached to transactions, we prefer those as + # the active thread id gives us more user friendly flamegraphs than without. + if profiler_metas and max_continuous_profile_candidates > 0: + continuous_profile_candidates, continuous_duration = self.get_chunks_for_profilers( + profiler_metas, max_continuous_profile_candidates + ) + + seen_chunks = { + (candidate["profiler_id"], candidate["chunk_id"]) + for candidate in continuous_profile_candidates + } + + always_use_direct_chunks = features.has( + "organizations:profiling-flamegraph-always-use-direct-chunks", + self.snuba_params.organization, + actor=self.request.user, + ) + + # If we still don't have enough continuous profile candidates + transaction profile candidates, + # we'll fall back to directly using the continuous profiling data + if ( + len(continuous_profile_candidates) + len(transaction_profile_candidates) < max_profiles + and always_use_direct_chunks + ): + total_duration = continuous_duration if always_use_direct_chunks else 0.0 + max_duration = options.get("profiling.continuous-profiling.flamegraph.max-seconds") + + conditions = [] + conditions.append(Condition(Column("project_id"), Op.IN, self.snuba_params.project_ids)) + conditions.append( + Condition(Column("start_timestamp"), Op.LT, resolve_datetime64(original_end)) + ) + conditions.append( + Condition(Column("end_timestamp"), Op.GTE, resolve_datetime64(original_start)) + ) + environments = self.snuba_params.environment_names + if environments: + conditions.append(Condition(Column("environment"), Op.IN, environments)) + + continuous_profiles_query = Query( + match=Storage(StorageKey.ProfileChunks.value), + select=[ + Column("project_id"), + Column("profiler_id"), + Column("chunk_id"), + Column("start_timestamp"), + Column("end_timestamp"), + ], + where=conditions, + orderby=[OrderBy(Column("start_timestamp"), Direction.DESC)], + limit=Limit(max_profiles), + ) + + all_results = bulk_snuba_queries( + [ + Request( + dataset=Dataset.Profiles.value, + app_id="default", + query=continuous_profiles_query, + tenant_ids={ + "referrer": referrer, + "organization_id": self.snuba_params.organization.id, + }, + ), + ], + referrer, + ) + + continuous_profile_results = all_results[0] + + for row in continuous_profile_results["data"]: + + # Make sure to dedupe profile chunks so we don't reuse chunks + if (row["profiler_id"], row["chunk_id"]) in seen_chunks: + continue + + start_timestamp = datetime.fromisoformat(row["start_timestamp"]).timestamp() + end_timestamp = datetime.fromisoformat(row["end_timestamp"]).timestamp() + + candidate: ContinuousProfileCandidate = { + "project_id": row["project_id"], + "profiler_id": row["profiler_id"], + "chunk_id": row["chunk_id"], + "start": str(int(start_timestamp * 1e9)), + "end": str(int(end_timestamp * 1e9)), + } + + continuous_profile_candidates.append(candidate) + + total_duration += end_timestamp - start_timestamp + + # can set max duration to negative to skip this check + if (max_duration >= 0 and total_duration >= max_duration) or ( + len(continuous_profile_candidates) + len(transaction_profile_candidates) + >= max_profiles + ): + break + + return { + "transaction": transaction_profile_candidates, + "continuous": continuous_profile_candidates, + } + def get_profile_candidates_from_spans(self) -> ProfileCandidates: max_profiles = options.get("profiling.flamegraph.profile-set.size") results = self.get_spans_based_candidates(query=self.query, limit=max_profiles)