@@ -37,14 +37,17 @@ AggregatingStep::AggregatingStep(
37
37
size_t merge_threads_,
38
38
size_t temporary_data_merge_threads_,
39
39
bool emit_version_,
40
- bool emit_changelog_)
41
- : ITransformingStep(input_stream_, AggregatingTransformParams::getHeader(params_, final_, emit_version_, emit_changelog_), getTraits(), false )
40
+ bool emit_changelog_,
41
+ bool fill_missing_window_)
42
+ : ITransformingStep(
43
+ input_stream_, AggregatingTransformParams::getHeader(params_, final_, emit_version_, emit_changelog_), getTraits(), false )
42
44
, params(std::move(params_))
43
45
, final (std::move(final_))
44
46
, merge_threads(merge_threads_)
45
47
, temporary_data_merge_threads(temporary_data_merge_threads_)
46
48
, emit_version(emit_version_)
47
49
, emit_changelog(emit_changelog_)
50
+ , fill_missing_window(fill_missing_window_)
48
51
{
49
52
}
50
53
@@ -69,7 +72,8 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
69
72
* 1. Parallel aggregation is done, and the results should be merged in parallel.
70
73
* 2. An aggregation is done with store of temporary data on the disk, and they need to be merged in a memory efficient way.
71
74
*/
72
- auto transform_params = std::make_shared<AggregatingTransformParams>(std::move (params), final , emit_version, emit_changelog);
75
+ auto transform_params
76
+ = std::make_shared<AggregatingTransformParams>(std::move (params), final , emit_version, emit_changelog, fill_missing_window);
73
77
74
78
// / If there are several sources, then we perform parallel aggregation
75
79
if (pipeline.getNumStreams () > 1 )
0 commit comments