@@ -102,8 +102,18 @@ def get_profile_candidates(self) -> ProfileCandidates:
102
102
if self .data_source == "functions" :
103
103
return self .get_profile_candidates_from_functions ()
104
104
elif self .data_source == "transactions" :
105
+ if features .has (
106
+ "organizations:profiling-flamegraph-use-increased-chunks-query-strategy" ,
107
+ self .snuba_params .organization ,
108
+ ):
109
+ return self .get_profile_candidates_from_transactions_v2 ()
105
110
return self .get_profile_candidates_from_transactions ()
106
111
elif self .data_source == "profiles" :
112
+ if features .has (
113
+ "organizations:profiling-flamegraph-use-increased-chunks-query-strategy" ,
114
+ self .snuba_params .organization ,
115
+ ):
116
+ return self .get_profile_candidates_from_profiles_v2 ()
107
117
return self .get_profile_candidates_from_profiles ()
108
118
elif self .data_source == "spans" :
109
119
return self .get_profile_candidates_from_spans ()
@@ -222,6 +232,76 @@ def get_profile_candidates_from_transactions(self) -> ProfileCandidates:
222
232
"continuous" : continuous_profile_candidates ,
223
233
}
224
234
235
+ def get_profile_candidates_from_transactions_v2 (self ) -> ProfileCandidates :
236
+ max_profiles = options .get ("profiling.flamegraph.profile-set.size" )
237
+ initial_chunk_delta_hours = options .get (
238
+ "profiling.flamegraph.query.initial_chunk_delta.hours"
239
+ )
240
+ max_chunk_delta_hours = options .get ("profiling.flamegraph.query.max_delta.hours" )
241
+
242
+ initial_chunk_delta = timedelta (hours = initial_chunk_delta_hours )
243
+ max_chunk_delta = timedelta (hours = max_chunk_delta_hours )
244
+
245
+ transaction_profile_candidates : list [TransactionProfileCandidate ] = []
246
+ profiler_metas : list [ProfilerMeta ] = []
247
+
248
+ assert self .snuba_params .start is not None and self .snuba_params .end is not None
249
+ original_start , original_end = self .snuba_params .start , self .snuba_params .end
250
+
251
+ for chunk_start , chunk_end in split_datetime_range_exponential (
252
+ original_start , original_end , initial_chunk_delta , max_chunk_delta
253
+ ):
254
+ self .snuba_params .start = chunk_start
255
+ self .snuba_params .end = chunk_end
256
+
257
+ builder = self .get_transactions_based_candidate_query (
258
+ query = self .query , limit = max_profiles
259
+ )
260
+
261
+ results = builder .run_query (
262
+ Referrer .API_PROFILING_PROFILE_FLAMEGRAPH_TRANSACTION_CANDIDATES .value ,
263
+ )
264
+ results = builder .process_results (results )
265
+
266
+ for row in results ["data" ]:
267
+ if row ["profile.id" ] is not None :
268
+ transaction_profile_candidates .append (
269
+ {
270
+ "project_id" : row ["project.id" ],
271
+ "profile_id" : row ["profile.id" ],
272
+ }
273
+ )
274
+ elif row ["profiler.id" ] is not None and row ["thread.id" ]:
275
+ profiler_metas .append (
276
+ ProfilerMeta (
277
+ project_id = row ["project.id" ],
278
+ profiler_id = row ["profiler.id" ],
279
+ thread_id = row ["thread.id" ],
280
+ start = row ["precise.start_ts" ],
281
+ end = row ["precise.finish_ts" ],
282
+ transaction_id = row ["id" ],
283
+ )
284
+ )
285
+ if len (transaction_profile_candidates ) >= max_profiles :
286
+ break
287
+
288
+ max_continuous_profile_candidates = max (
289
+ max_profiles - len (transaction_profile_candidates ), 0
290
+ )
291
+
292
+ continuous_profile_candidates : list [ContinuousProfileCandidate ] = []
293
+
294
+ if max_continuous_profile_candidates > 0 :
295
+ continuous_profile_candidates , _ = self .get_chunks_for_profilers (
296
+ profiler_metas ,
297
+ max_continuous_profile_candidates ,
298
+ )
299
+
300
+ return {
301
+ "transaction" : transaction_profile_candidates ,
302
+ "continuous" : continuous_profile_candidates ,
303
+ }
304
+
225
305
def get_transactions_based_candidate_query (
226
306
self , query : str | None , limit : int
227
307
) -> DiscoverQueryBuilder :
@@ -547,6 +627,171 @@ def get_profile_candidates_from_profiles(self) -> ProfileCandidates:
547
627
"continuous" : continuous_profile_candidates ,
548
628
}
549
629
630
+ def get_profile_candidates_from_profiles_v2 (self ) -> ProfileCandidates :
631
+ if self .snuba_params .organization is None :
632
+ raise ValueError ("`organization` is required and cannot be `None`" )
633
+
634
+ max_profiles = options .get ("profiling.flamegraph.profile-set.size" )
635
+ initial_chunk_delta_hours = options .get (
636
+ "profiling.flamegraph.query.initial_chunk_delta.hours"
637
+ )
638
+ max_chunk_delta_hours = options .get ("profiling.flamegraph.query.max_delta.hours" )
639
+
640
+ initial_chunk_delta = timedelta (hours = initial_chunk_delta_hours )
641
+ max_chunk_delta = timedelta (hours = max_chunk_delta_hours )
642
+
643
+ referrer = Referrer .API_PROFILING_PROFILE_FLAMEGRAPH_PROFILE_CANDIDATES .value
644
+ transaction_profile_candidates : list [TransactionProfileCandidate ] = []
645
+ profiler_metas : list [ProfilerMeta ] = []
646
+
647
+ assert self .snuba_params .start is not None and self .snuba_params .end is not None
648
+ original_start , original_end = self .snuba_params .start , self .snuba_params .end
649
+
650
+ for chunk_start , chunk_end in split_datetime_range_exponential (
651
+ original_start , original_end , initial_chunk_delta , max_chunk_delta
652
+ ):
653
+ self .snuba_params .start = chunk_start
654
+ self .snuba_params .end = chunk_end
655
+
656
+ builder = self .get_transactions_based_candidate_query (
657
+ query = self .query , limit = max_profiles
658
+ )
659
+ results = builder .run_query (referrer )
660
+ results = builder .process_results (results )
661
+
662
+ for row in results ["data" ]:
663
+ if row ["profile.id" ] is not None :
664
+ transaction_profile_candidates .append (
665
+ {
666
+ "project_id" : row ["project.id" ],
667
+ "profile_id" : row ["profile.id" ],
668
+ }
669
+ )
670
+ elif row ["profiler.id" ] is not None and row ["thread.id" ]:
671
+ profiler_metas .append (
672
+ ProfilerMeta (
673
+ project_id = row ["project.id" ],
674
+ profiler_id = row ["profiler.id" ],
675
+ thread_id = row ["thread.id" ],
676
+ start = row ["precise.start_ts" ],
677
+ end = row ["precise.finish_ts" ],
678
+ transaction_id = row ["id" ],
679
+ )
680
+ )
681
+
682
+ if len (transaction_profile_candidates ) + len (profiler_metas ) >= max_profiles :
683
+ break
684
+
685
+ max_continuous_profile_candidates = max (
686
+ max_profiles - len (transaction_profile_candidates ), 0
687
+ )
688
+
689
+ continuous_profile_candidates : list [ContinuousProfileCandidate ] = []
690
+ continuous_duration = 0.0
691
+
692
+ # If there are continuous profiles attached to transactions, we prefer those as
693
+ # the active thread id gives us more user friendly flamegraphs than without.
694
+ if profiler_metas and max_continuous_profile_candidates > 0 :
695
+ continuous_profile_candidates , continuous_duration = self .get_chunks_for_profilers (
696
+ profiler_metas , max_continuous_profile_candidates
697
+ )
698
+
699
+ seen_chunks = {
700
+ (candidate ["profiler_id" ], candidate ["chunk_id" ])
701
+ for candidate in continuous_profile_candidates
702
+ }
703
+
704
+ always_use_direct_chunks = features .has (
705
+ "organizations:profiling-flamegraph-always-use-direct-chunks" ,
706
+ self .snuba_params .organization ,
707
+ actor = self .request .user ,
708
+ )
709
+
710
+ # If we still don't have enough continuous profile candidates + transaction profile candidates,
711
+ # we'll fall back to directly using the continuous profiling data
712
+ if (
713
+ len (continuous_profile_candidates ) + len (transaction_profile_candidates ) < max_profiles
714
+ and always_use_direct_chunks
715
+ ):
716
+ total_duration = continuous_duration if always_use_direct_chunks else 0.0
717
+ max_duration = options .get ("profiling.continuous-profiling.flamegraph.max-seconds" )
718
+
719
+ conditions = []
720
+ conditions .append (Condition (Column ("project_id" ), Op .IN , self .snuba_params .project_ids ))
721
+ conditions .append (
722
+ Condition (Column ("start_timestamp" ), Op .LT , resolve_datetime64 (original_end ))
723
+ )
724
+ conditions .append (
725
+ Condition (Column ("end_timestamp" ), Op .GTE , resolve_datetime64 (original_start ))
726
+ )
727
+ environments = self .snuba_params .environment_names
728
+ if environments :
729
+ conditions .append (Condition (Column ("environment" ), Op .IN , environments ))
730
+
731
+ continuous_profiles_query = Query (
732
+ match = Storage (StorageKey .ProfileChunks .value ),
733
+ select = [
734
+ Column ("project_id" ),
735
+ Column ("profiler_id" ),
736
+ Column ("chunk_id" ),
737
+ Column ("start_timestamp" ),
738
+ Column ("end_timestamp" ),
739
+ ],
740
+ where = conditions ,
741
+ orderby = [OrderBy (Column ("start_timestamp" ), Direction .DESC )],
742
+ limit = Limit (max_profiles ),
743
+ )
744
+
745
+ all_results = bulk_snuba_queries (
746
+ [
747
+ Request (
748
+ dataset = Dataset .Profiles .value ,
749
+ app_id = "default" ,
750
+ query = continuous_profiles_query ,
751
+ tenant_ids = {
752
+ "referrer" : referrer ,
753
+ "organization_id" : self .snuba_params .organization .id ,
754
+ },
755
+ ),
756
+ ],
757
+ referrer ,
758
+ )
759
+
760
+ continuous_profile_results = all_results [0 ]
761
+
762
+ for row in continuous_profile_results ["data" ]:
763
+
764
+ # Make sure to dedupe profile chunks so we don't reuse chunks
765
+ if (row ["profiler_id" ], row ["chunk_id" ]) in seen_chunks :
766
+ continue
767
+
768
+ start_timestamp = datetime .fromisoformat (row ["start_timestamp" ]).timestamp ()
769
+ end_timestamp = datetime .fromisoformat (row ["end_timestamp" ]).timestamp ()
770
+
771
+ candidate : ContinuousProfileCandidate = {
772
+ "project_id" : row ["project_id" ],
773
+ "profiler_id" : row ["profiler_id" ],
774
+ "chunk_id" : row ["chunk_id" ],
775
+ "start" : str (int (start_timestamp * 1e9 )),
776
+ "end" : str (int (end_timestamp * 1e9 )),
777
+ }
778
+
779
+ continuous_profile_candidates .append (candidate )
780
+
781
+ total_duration += end_timestamp - start_timestamp
782
+
783
+ # can set max duration to negative to skip this check
784
+ if (max_duration >= 0 and total_duration >= max_duration ) or (
785
+ len (continuous_profile_candidates ) + len (transaction_profile_candidates )
786
+ >= max_profiles
787
+ ):
788
+ break
789
+
790
+ return {
791
+ "transaction" : transaction_profile_candidates ,
792
+ "continuous" : continuous_profile_candidates ,
793
+ }
794
+
550
795
def get_profile_candidates_from_spans (self ) -> ProfileCandidates :
551
796
max_profiles = options .get ("profiling.flamegraph.profile-set.size" )
552
797
results = self .get_spans_based_candidates (query = self .query , limit = max_profiles )
0 commit comments