Skip to content

Commit c15e23b

Browse files
committed
enable fan out for the same source
1 parent 0e074fd commit c15e23b

21 files changed

+383
-252
lines changed

src/Storages/Streaming/SourceColumnsDescription.cpp

Lines changed: 50 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,31 @@ void SourceColumnsDescription::PhysicalColumnPositions::clear()
3030
subcolumns.clear();
3131
}
3232

33-
SourceColumnsDescription::SourceColumnsDescription(const Names & required_column_names, StorageSnapshotPtr storage_snapshot)
33+
SourceColumnsDescription::SourceColumnsDescription(
34+
const Names & required_column_names, StorageSnapshotPtr storage_snapshot, bool enable_partial_read)
3435
: SourceColumnsDescription(
35-
storage_snapshot->getColumnsByNames(GetColumnsOptions(GetColumnsOptions::All).withSubcolumns().withVirtuals().withExtendedObjects(), required_column_names),
36+
storage_snapshot->getColumnsByNames(
37+
GetColumnsOptions(GetColumnsOptions::All).withSubcolumns().withVirtuals().withExtendedObjects(), required_column_names),
3638
storage_snapshot->getMetadataForQuery()->getSampleBlock(),
37-
storage_snapshot->getColumns(GetColumnsOptions(GetColumnsOptions::All).withExtendedObjects()))
39+
storage_snapshot->getColumns(GetColumnsOptions(GetColumnsOptions::All).withExtendedObjects()),
40+
enable_partial_read)
3841
{
3942
}
4043

41-
SourceColumnsDescription::SourceColumnsDescription(const NamesAndTypesList & columns_to_read, const Block & schema, const NamesAndTypesList & all_extended_columns)
44+
SourceColumnsDescription::SourceColumnsDescription(
45+
const NamesAndTypesList & columns_to_read,
46+
const Block & schema,
47+
const NamesAndTypesList & all_extended_columns,
48+
bool enable_partial_read)
4249
{
4350
/// FIXME, when we have multi-version of schema, the header and the schema may be mismatched
4451
auto column_size = columns_to_read.size();
4552

53+
/// Just read required partial physical columns
54+
if (enable_partial_read)
55+
physical_column_positions_to_read.positions.reserve(column_size);
56+
4657
positions.reserve(column_size);
47-
physical_column_positions_to_read.positions.reserve(column_size);
4858
subcolumns_to_read.reserve(column_size);
4959

5060
std::vector<uint16_t> read_all_subcolumns_positions;
@@ -112,45 +122,48 @@ SourceColumnsDescription::SourceColumnsDescription(const NamesAndTypesList & col
112122
auto pos_in_schema = schema.getPositionByName(name_in_storage);
113123
const auto & column_in_storage = schema.getByName(name_in_storage);
114124

115-
/// Calculate main column pos
116-
size_t physical_pos_in_schema_to_read = 0;
117-
/// We don't need to read duplicate physical columns from schema
118-
auto physical_pos_iter = std::find(
119-
physical_column_positions_to_read.positions.begin(), physical_column_positions_to_read.positions.end(), pos_in_schema);
120-
if (physical_pos_iter == physical_column_positions_to_read.positions.end())
125+
size_t physical_pos_in_schema_to_read = pos_in_schema;
126+
/// Specially, re-calculate pos in partially read schema
127+
if (enable_partial_read)
121128
{
122-
physical_pos_in_schema_to_read = physical_column_positions_to_read.positions.size();
123-
physical_column_positions_to_read.positions.emplace_back(pos_in_schema);
124-
125-
/// json, array(json), tuple(..., json, ...)
126-
if (column_in_storage.type->hasDynamicSubcolumns())
129+
/// We don't need to read duplicate physical columns from schema
130+
auto physical_pos_iter = std::find(
131+
physical_column_positions_to_read.positions.begin(), physical_column_positions_to_read.positions.end(), pos_in_schema);
132+
if (physical_pos_iter == physical_column_positions_to_read.positions.end())
127133
{
128-
/// We like to read parent json column once if multiple subcolumns of the same json are required
129-
/// like `select json.a, json.b from stream`
130-
auto find_iter = std::find_if(
131-
physical_object_columns_to_read.begin(),
132-
physical_object_columns_to_read.end(),
133-
[&column](const auto & col_name_type) { return col_name_type.name == column.name; });
134-
135-
if (find_iter == physical_object_columns_to_read.end())
134+
physical_pos_in_schema_to_read = physical_column_positions_to_read.positions.size();
135+
physical_column_positions_to_read.positions.emplace_back(pos_in_schema);
136+
137+
/// json, array(json), tuple(..., json, ...)
138+
if (column_in_storage.type->hasDynamicSubcolumns())
136139
{
137-
if (column.isSubcolumn())
140+
/// We like to read parent json column once if multiple subcolumns of the same json are required
141+
/// like `select json.a, json.b from stream`
142+
auto find_iter = std::find_if(
143+
physical_object_columns_to_read.begin(),
144+
physical_object_columns_to_read.end(),
145+
[&column](const auto & col_name_type) { return col_name_type.name == column.name; });
146+
147+
if (find_iter == physical_object_columns_to_read.end())
138148
{
139-
/// When reading a subcolumn of a json like `select json.a from stream`, we will need read the parent `json` column
140-
auto name_and_type = all_extended_columns.tryGetByName(name_in_storage);
141-
assert(name_and_type);
142-
physical_object_columns_to_read.emplace_back(std::move(*name_and_type));
143-
}
144-
else
145-
{
146-
/// This column is parent json column, like `select json from stream`, use the name and type directly
147-
physical_object_columns_to_read.emplace_back(column);
149+
if (column.isSubcolumn())
150+
{
151+
/// When reading a subcolumn of a json like `select json.a from stream`, we will need read the parent `json` column
152+
auto name_and_type = all_extended_columns.tryGetByName(name_in_storage);
153+
assert(name_and_type);
154+
physical_object_columns_to_read.emplace_back(std::move(*name_and_type));
155+
}
156+
else
157+
{
158+
/// This column is parent json column, like `select json from stream`, use the name and type directly
159+
physical_object_columns_to_read.emplace_back(column);
160+
}
148161
}
149162
}
150163
}
164+
else
165+
physical_pos_in_schema_to_read = physical_pos_iter - physical_column_positions_to_read.positions.begin();
151166
}
152-
else
153-
physical_pos_in_schema_to_read = physical_pos_iter - physical_column_positions_to_read.positions.begin();
154167

155168
/// For subcolumn, which dependents on the main column
156169
if (column.isSubcolumn())
@@ -181,7 +194,7 @@ SourceColumnsDescription::SourceColumnsDescription(const NamesAndTypesList & col
181194
physical_column_positions_to_read.subcolumns.erase(pos);
182195

183196
/// Clients like to read virtual columns only, add `_tp_time`, then we know how many rows
184-
if (physical_column_positions_to_read.positions.empty())
197+
if (enable_partial_read && physical_column_positions_to_read.positions.empty())
185198
physical_column_positions_to_read.positions.emplace_back(schema.getPositionByName(ProtonConsts::RESERVED_EVENT_TIME));
186199
}
187200
}

src/Storages/Streaming/SourceColumnsDescription.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,12 @@ using StorageSnapshotPtr = std::shared_ptr<StorageSnapshot>;
2121
struct SourceColumnsDescription
2222
{
2323
SourceColumnsDescription() = default;
24-
SourceColumnsDescription(const Names & required_column_names, StorageSnapshotPtr storage_snapshot);
25-
SourceColumnsDescription(const NamesAndTypesList & columns_to_read, const Block & schema, const NamesAndTypesList & all_extended_columns);
24+
SourceColumnsDescription(const Names & required_column_names, StorageSnapshotPtr storage_snapshot, bool enable_partial_read = true);
25+
SourceColumnsDescription(
26+
const NamesAndTypesList & columns_to_read,
27+
const Block & schema,
28+
const NamesAndTypesList & all_extended_columns,
29+
bool enable_partial_read = true);
2630

2731
enum class ReadColumnType : uint8_t
2832
{

src/Storages/Streaming/StorageStream.cpp

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -601,22 +601,18 @@ void StorageStream::readStreaming(
601601

602602
assert(query_info.seek_to_info);
603603
const auto & settings_ref = context_->getSettingsRef();
604-
/// 1) Checkpointed queries shall not be multiplexed
605-
/// 2) Queries which seek to a specific timestamp shall not be multiplexed
606-
auto share_resource_group = (settings_ref.query_resource_group.value == "shared")
607-
&& (query_info.seek_to_info->getSeekTo().empty() || query_info.seek_to_info->getSeekTo() == "latest")
608-
&& (settings_ref.exec_mode == ExecuteMode::NORMAL);
609-
610-
if (share_resource_group)
604+
if (settings_ref.query_resource_group.value == "shared")
611605
{
606+
auto offsets = stream_shards.back()->getOffsets(query_info.seek_to_info);
612607
for (auto stream_shard : shards_to_read)
613608
{
609+
const auto & offset = offsets[stream_shard->shard];
614610
if (!column_names.empty())
615611
pipes.emplace_back(
616-
stream_shard->source_multiplexers->createChannel(stream_shard->shard, column_names, storage_snapshot, context_));
612+
source_multiplexers->createChannel(std::move(stream_shard), column_names, storage_snapshot, context_, offset));
617613
else
618-
pipes.emplace_back(stream_shard->source_multiplexers->createChannel(
619-
stream_shard->shard, {ProtonConsts::RESERVED_EVENT_TIME}, storage_snapshot, context_));
614+
pipes.emplace_back(source_multiplexers->createChannel(
615+
std::move(stream_shard), {ProtonConsts::RESERVED_EVENT_TIME}, storage_snapshot, context_, offset));
620616
}
621617

622618
LOG_INFO(log, "Starting reading {} streams in shared resource group", pipes.size());
@@ -945,6 +941,8 @@ void StorageStream::startup()
945941
assert(native_log->enabled());
946942
}
947943

944+
source_multiplexers.reset(new StreamingStoreSourceMultiplexers(getContext(), log));
945+
948946
log_initialized.test_and_set();
949947

950948
LOG_INFO(log, "Started");

src/Storages/Streaming/StorageStream.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
#include <Storages/MergeTree/MergeTreeData.h>
1212
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
13+
#include <Storages/Streaming/StreamingStoreSourceMultiplexer.h>
1314

1415
namespace nlog
1516
{
@@ -298,8 +299,7 @@ class StorageStream final : public shared_ptr_helper<StorageStream>, public Merg
298299
UInt64 base_block_id,
299300
UInt64 sub_block_id);
300301

301-
void
302-
appendToNativeLog(nlog::RecordPtr & record, IngestMode /*ingest_mode*/, klog::AppendCallback callback, klog::CallbackData data);
302+
void appendToNativeLog(nlog::RecordPtr & record, IngestMode /*ingest_mode*/, klog::AppendCallback callback, klog::CallbackData data);
303303

304304
void appendToKafka(
305305
nlog::RecordPtr & record,
@@ -354,5 +354,8 @@ class StorageStream final : public shared_ptr_helper<StorageStream>, public Merg
354354

355355
std::atomic_flag inited;
356356
std::atomic_flag stopped;
357+
358+
/// Multiplex latest records of each shard.
359+
std::unique_ptr<StreamingStoreSourceMultiplexers> source_multiplexers;
357360
};
358361
}

src/Storages/Streaming/StreamShard.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,6 @@ StreamShard::~StreamShard()
124124

125125
void StreamShard::startup()
126126
{
127-
source_multiplexers.reset(new StreamingStoreSourceMultiplexers(shared_from_this(), storage_stream->getContext(), log));
128-
129127
initLog();
130128

131129
/// for virtual tables or in-memory storage type, there is no storage object

src/Storages/Streaming/StreamShard.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,6 @@ class StreamShard final : public std::enable_shared_from_this<StreamShard>
148148

149149
std::unique_ptr<StreamCallbackData> callback_data;
150150

151-
std::unique_ptr<StreamingStoreSourceMultiplexers> source_multiplexers;
152-
153151
// For random shard index generation
154152
mutable std::mutex rng_mutex;
155153
pcg64 rng;

src/Storages/Streaming/StreamingBlockReaderNativeLog.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,8 @@ nlog::RecordPtrs StreamingBlockReaderNativeLog::processCached(nlog::RecordPtrs r
148148
{
149149
/// In general, an object has a large number of subcolumns,
150150
/// so when a few subcolumns required for the object, we only copy partials to improve performance
151-
if (isObject(col_with_type->type) && !schema_ctx.column_positions.positions.empty())
151+
if (isObject(col_with_type->type) && !schema_ctx.column_positions.subcolumns.empty())
152152
{
153-
assert(column_names.size() == schema_ctx.column_positions.positions.size());
154153
auto iter = schema_ctx.column_positions.subcolumns.find(i);
155154
if (iter != schema_ctx.column_positions.subcolumns.end())
156155
{

src/Storages/Streaming/StreamingStoreSource.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
#include "StreamingStoreSource.h"
2-
#include "StreamShard.h"
3-
#include "StreamingBlockReaderKafka.h"
4-
#include "StreamingBlockReaderNativeLog.h"
1+
#include <Storages/Streaming/StreamShard.h>
2+
#include <Storages/Streaming/StreamingBlockReaderKafka.h>
3+
#include <Storages/Streaming/StreamingBlockReaderNativeLog.h>
4+
#include <Storages/Streaming/StreamingStoreSource.h>
55

66
#include <Interpreters/inplaceBlockConversions.h>
77
#include <KafkaLog/KafkaWALPool.h>
@@ -16,7 +16,8 @@ StreamingStoreSource::StreamingStoreSource(
1616
ContextPtr context_,
1717
Int64 sn,
1818
Poco::Logger * log_)
19-
: StreamingStoreSourceBase(header, storage_snapshot_, std::move(context_), log_, ProcessorID::StreamingStoreSourceID)
19+
: StreamingStoreSourceBase(
20+
header, storage_snapshot_, /*enable_partial_read*/ true, std::move(context_), log_, ProcessorID::StreamingStoreSourceID)
2021
{
2122
const auto & settings = query_context->getSettingsRef();
2223
if (settings.record_consume_batch_count.value != 0)
@@ -98,7 +99,8 @@ void StreamingStoreSource::readAndProcess()
9899
/// NOTE: The `FilterTransform` will try optimizing filter ConstColumn to always_false or always_true,
99100
/// for exmaple: `_tp_sn < 1`, if filter first data _tp_sn is 0, it will be optimized always_true.
100101
/// So we can not create a constant column, since the virtual column data isn't constants value in fact.
101-
auto virtual_column = columns_desc.virtual_col_types[pos.virtualPosition()]->createColumnConst(rows, ts)->convertToFullColumnIfConst();
102+
auto virtual_column
103+
= columns_desc.virtual_col_types[pos.virtualPosition()]->createColumnConst(rows, ts)->convertToFullColumnIfConst();
102104
columns.push_back(std::move(virtual_column));
103105
break;
104106
}

src/Storages/Streaming/StreamingStoreSource.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#pragma once
22

3-
#include "StreamingStoreSourceBase.h"
3+
#include <Storages/Streaming/StreamingStoreSourceBase.h>
44

55
namespace DB
66
{

src/Storages/Streaming/StreamingStoreSourceBase.cpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#include "StreamingStoreSourceBase.h"
1+
#include <Storages/Streaming/StreamingStoreSourceBase.h>
22

33
#include <Checkpoint/CheckpointContext.h>
44
#include <Checkpoint/CheckpointCoordinator.h>
@@ -16,14 +16,19 @@ extern const int RECOVER_CHECKPOINT_FAILED;
1616
}
1717

1818
StreamingStoreSourceBase::StreamingStoreSourceBase(
19-
const Block & header, const StorageSnapshotPtr & storage_snapshot_, ContextPtr query_context_, Poco::Logger * log_, ProcessorID pid_)
19+
const Block & header,
20+
const StorageSnapshotPtr & storage_snapshot_,
21+
bool enable_partial_read,
22+
ContextPtr query_context_,
23+
Poco::Logger * log_,
24+
ProcessorID pid_)
2025
: ISource(header, true, pid_)
2126
, storage_snapshot(
2227
std::make_shared<StorageSnapshot>(*storage_snapshot_)) /// We like to make a copy of it since we will mutate the snapshot
2328
, query_context(std::move(query_context_))
2429
, log(log_)
2530
, header_chunk(header.getColumns(), 0)
26-
, columns_desc(header.getNames(), storage_snapshot)
31+
, columns_desc(header.getNames(), storage_snapshot, enable_partial_read)
2732
{
2833
is_streaming = true;
2934

@@ -52,7 +57,8 @@ StreamingStoreSourceBase::getSubcolumnFromBlock(const Block & block, size_t pare
5257

5358
/// Convert subcolumn if the subcolumn type of dynamic object may be dismatched with header.
5459
/// FIXME: Cache the ExpressionAction
55-
Block subcolumn_block({ColumnWithTypeAndName{std::move(subcolumn), std::move(subcolumn_type), subcolumn_pair.name}}); /// NOLINT(performance-move-const-arg)
60+
Block subcolumn_block({ColumnWithTypeAndName{
61+
std::move(subcolumn), std::move(subcolumn_type), subcolumn_pair.name}}); /// NOLINT(performance-move-const-arg)
5662
ExpressionActions convert_act(ActionsDAG::makeConvertingActions(
5763
subcolumn_block.getColumnsWithTypeAndName(),
5864
{ColumnWithTypeAndName{subcolumn_pair.type->createColumn(), subcolumn_pair.type, subcolumn_pair.name}},

0 commit comments

Comments
 (0)