From 2cc168f080c4ccc4eef33689fcca4f9f349267f3 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Lisen=20=E6=9D=A8?=
Date: Tue, 21 Nov 2023 23:14:55 +0800
Subject: [PATCH 1/2] enable fan out for the same source
---
src/Storages/Streaming/ProxyStream.cpp | 24 ++-
.../Streaming/SourceColumnsDescription.cpp | 101 +++++----
.../Streaming/SourceColumnsDescription.h | 8 +-
src/Storages/Streaming/StorageStream.cpp | 18 +-
src/Storages/Streaming/StorageStream.h | 7 +-
src/Storages/Streaming/StreamShard.cpp | 2 -
src/Storages/Streaming/StreamShard.h | 2 -
.../StreamingBlockReaderNativeLog.cpp | 3 +-
.../Streaming/StreamingStoreSource.cpp | 14 +-
src/Storages/Streaming/StreamingStoreSource.h | 2 +-
.../Streaming/StreamingStoreSourceBase.cpp | 14 +-
.../Streaming/StreamingStoreSourceBase.h | 9 +-
.../Streaming/StreamingStoreSourceChannel.cpp | 56 ++++-
.../Streaming/StreamingStoreSourceChannel.h | 11 +-
.../StreamingStoreSourceMultiplexer.cpp | 204 ++++++++++++++++--
.../StreamingStoreSourceMultiplexer.h | 64 +++++-
.../gtest_source_columns_description.cpp | 146 +++++++++++++
.../0013_changelog_stream4.yaml | 45 ----
.../0013_changelog_stream5.yaml | 40 ----
.../0013_changelog_stream6.yaml | 14 --
.../0013_changelog_stream7.yaml | 32 ---
.../0013_changelog_stream8.yaml | 15 +-
.../0015_single_stream_changelog_aggr.json | 4 +-
.../test_stream_smoke/0099_fixed_issues.json | 14 +-
24 files changed, 581 insertions(+), 268 deletions(-)
diff --git a/src/Storages/Streaming/ProxyStream.cpp b/src/Storages/Streaming/ProxyStream.cpp
index b335b2f3965..eae9b9d302b 100644
--- a/src/Storages/Streaming/ProxyStream.cpp
+++ b/src/Storages/Streaming/ProxyStream.cpp
@@ -107,11 +107,12 @@ ProxyStream::ProxyStream(
QueryProcessingStage::Enum ProxyStream::getQueryProcessingStage(
ContextPtr context_,
QueryProcessingStage::Enum to_stage,
- const StorageSnapshotPtr & storage_snapshot,
+ const StorageSnapshotPtr & /*storage_snapshot*/,
SelectQueryInfo & query_info) const
{
if (storage)
- return storage->getQueryProcessingStage(context_, to_stage, storage_snapshot, query_info);
+ return storage->getQueryProcessingStage(
+ context_, to_stage, storage->getStorageSnapshot(storage->getInMemoryMetadataPtr(), context_), query_info);
else
/// When it is created by subquery not a table
return QueryProcessingStage::FetchColumns;
@@ -174,7 +175,7 @@ void ProxyStream::read(
void ProxyStream::doRead(
QueryPlan & query_plan,
const Names & column_names,
- const StorageSnapshotPtr & storage_snapshot,
+ const StorageSnapshotPtr & /*storage_snapshot*/,
SelectQueryInfo & query_info,
ContextPtr context_,
QueryProcessingStage::Enum processed_stage,
@@ -206,32 +207,35 @@ void ProxyStream::doRead(
return;
}
+ assert(storage);
+ auto proxy_storage_snapshot = storage->getStorageSnapshot(storage->getInMemoryMetadataPtr(), context_);
if (auto * view = storage->as())
{
auto view_context = createProxySubqueryContext(context_, query_info, isStreamingQuery());
- view->read(query_plan, column_names, storage_snapshot, query_info, view_context, processed_stage, max_block_size, num_streams);
+ view->read(
+ query_plan, column_names, proxy_storage_snapshot, query_info, view_context, processed_stage, max_block_size, num_streams);
query_plan.addInterpreterContext(view_context);
return;
}
else if (auto * materialized_view = storage->as())
return materialized_view->read(
- query_plan, column_names, storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
+ query_plan, column_names, proxy_storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
else if (auto * external_stream = storage->as())
return external_stream->read(
- query_plan, column_names, storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
+ query_plan, column_names, proxy_storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
else if (auto * random_stream = storage->as())
return random_stream->read(
- query_plan, column_names, storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
+ query_plan, column_names, proxy_storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
else if (auto * file_stream = storage->as())
return file_stream->read(
- query_plan, column_names, storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
+ query_plan, column_names, proxy_storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
else if (nested_proxy_storage)
return nested_proxy_storage->read(
- query_plan, column_names, storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
+ query_plan, column_names, proxy_storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
auto * distributed = storage->as();
assert(distributed);
- distributed->read(query_plan, column_names, storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
+ distributed->read(query_plan, column_names, proxy_storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
}
Names ProxyStream::getRequiredColumnsForProxyStorage(const Names & column_names) const
diff --git a/src/Storages/Streaming/SourceColumnsDescription.cpp b/src/Storages/Streaming/SourceColumnsDescription.cpp
index c76b7eca409..e127f7bb6ec 100644
--- a/src/Storages/Streaming/SourceColumnsDescription.cpp
+++ b/src/Storages/Streaming/SourceColumnsDescription.cpp
@@ -1,4 +1,4 @@
-#include "SourceColumnsDescription.h"
+#include
#include
#include
@@ -6,6 +6,8 @@
#include
#include
+#include
+
namespace DB
{
SourceColumnsDescription::PhysicalColumnPositions &
@@ -30,21 +32,39 @@ void SourceColumnsDescription::PhysicalColumnPositions::clear()
subcolumns.clear();
}
-SourceColumnsDescription::SourceColumnsDescription(const Names & required_column_names, StorageSnapshotPtr storage_snapshot)
+SourceColumnsDescription::SourceColumnsDescription(
+ const Names & required_column_names, StorageSnapshotPtr storage_snapshot, bool enable_partial_read)
: SourceColumnsDescription(
- storage_snapshot->getColumnsByNames(GetColumnsOptions(GetColumnsOptions::All).withSubcolumns().withVirtuals().withExtendedObjects(), required_column_names),
+ storage_snapshot->getColumnsByNames(
+ GetColumnsOptions(GetColumnsOptions::All).withSubcolumns().withVirtuals().withExtendedObjects(), required_column_names),
storage_snapshot->getMetadataForQuery()->getSampleBlock(),
- storage_snapshot->getColumns(GetColumnsOptions(GetColumnsOptions::All).withExtendedObjects()))
+ storage_snapshot->getColumns(GetColumnsOptions(GetColumnsOptions::All).withExtendedObjects()),
+ enable_partial_read)
{
}
-SourceColumnsDescription::SourceColumnsDescription(const NamesAndTypesList & columns_to_read, const Block & schema, const NamesAndTypesList & all_extended_columns)
+SourceColumnsDescription::SourceColumnsDescription(
+ const NamesAndTypesList & columns_to_read,
+ const Block & schema,
+ const NamesAndTypesList & all_extended_columns,
+ bool enable_partial_read)
{
/// FIXME, when we have multi-version of schema, the header and the schema may be mismatched
auto column_size = columns_to_read.size();
+ if (enable_partial_read)
+ {
+ /// Just read required partial physical columns
+ physical_column_positions_to_read.positions.reserve(column_size);
+ }
+ else
+ {
+ /// Read full physical columns
+ physical_column_positions_to_read.positions.resize(schema.columns());
+ std::iota(physical_column_positions_to_read.positions.begin(), physical_column_positions_to_read.positions.end(), 0);
+ }
+
positions.reserve(column_size);
- physical_column_positions_to_read.positions.reserve(column_size);
subcolumns_to_read.reserve(column_size);
std::vector read_all_subcolumns_positions;
@@ -112,45 +132,48 @@ SourceColumnsDescription::SourceColumnsDescription(const NamesAndTypesList & col
auto pos_in_schema = schema.getPositionByName(name_in_storage);
const auto & column_in_storage = schema.getByName(name_in_storage);
- /// Calculate main column pos
- size_t physical_pos_in_schema_to_read = 0;
- /// We don't need to read duplicate physical columns from schema
- auto physical_pos_iter = std::find(
- physical_column_positions_to_read.positions.begin(), physical_column_positions_to_read.positions.end(), pos_in_schema);
- if (physical_pos_iter == physical_column_positions_to_read.positions.end())
+ size_t physical_pos_in_schema_to_read = pos_in_schema;
+ /// Specially, re-calculate pos in partially read schema
+ if (enable_partial_read)
{
- physical_pos_in_schema_to_read = physical_column_positions_to_read.positions.size();
- physical_column_positions_to_read.positions.emplace_back(pos_in_schema);
+ /// We don't need to read duplicate physical columns from schema
+ auto physical_pos_iter = std::find(
+ physical_column_positions_to_read.positions.begin(), physical_column_positions_to_read.positions.end(), pos_in_schema);
+ if (physical_pos_iter == physical_column_positions_to_read.positions.end())
+ {
+ physical_pos_in_schema_to_read = physical_column_positions_to_read.positions.size();
+ physical_column_positions_to_read.positions.emplace_back(pos_in_schema);
+ }
+ else
+ physical_pos_in_schema_to_read = physical_pos_iter - physical_column_positions_to_read.positions.begin();
+ }
- /// json, array(json), tuple(..., json, ...)
- if (column_in_storage.type->hasDynamicSubcolumns())
+ /// json, array(json), tuple(..., json, ...)
+ if (column_in_storage.type->hasDynamicSubcolumns())
+ {
+ /// We like to read parent json column once if multiple subcolumns of the same json are required
+ /// like `select json.a, json.b from stream`
+ auto find_iter = std::find_if(
+ physical_object_columns_to_read.begin(),
+ physical_object_columns_to_read.end(),
+ [&name_in_storage](const auto & col_name_type) { return col_name_type.name == name_in_storage; });
+
+ if (find_iter == physical_object_columns_to_read.end())
{
- /// We like to read parent json column once if multiple subcolumns of the same json are required
- /// like `select json.a, json.b from stream`
- auto find_iter = std::find_if(
- physical_object_columns_to_read.begin(),
- physical_object_columns_to_read.end(),
- [&column](const auto & col_name_type) { return col_name_type.name == column.name; });
-
- if (find_iter == physical_object_columns_to_read.end())
+ if (column.isSubcolumn())
{
- if (column.isSubcolumn())
- {
- /// When reading a subcolumn of a json like `select json.a from stream`, we will need read the parent `json` column
- auto name_and_type = all_extended_columns.tryGetByName(name_in_storage);
- assert(name_and_type);
- physical_object_columns_to_read.emplace_back(std::move(*name_and_type));
- }
- else
- {
- /// This column is parent json column, like `select json from stream`, use the name and type directly
- physical_object_columns_to_read.emplace_back(column);
- }
+ /// When reading a subcolumn of a json like `select json.a from stream`, we will need read the parent `json` column
+ auto name_and_type = all_extended_columns.tryGetByName(name_in_storage);
+ assert(name_and_type);
+ physical_object_columns_to_read.emplace_back(std::move(*name_and_type));
+ }
+ else
+ {
+ /// This column is parent json column, like `select json from stream`, use the name and type directly
+ physical_object_columns_to_read.emplace_back(column);
}
}
}
- else
- physical_pos_in_schema_to_read = physical_pos_iter - physical_column_positions_to_read.positions.begin();
/// For subcolumn, which dependents on the main column
if (column.isSubcolumn())
@@ -181,7 +204,7 @@ SourceColumnsDescription::SourceColumnsDescription(const NamesAndTypesList & col
physical_column_positions_to_read.subcolumns.erase(pos);
/// Clients like to read virtual columns only, add `_tp_time`, then we know how many rows
- if (physical_column_positions_to_read.positions.empty())
+ if (enable_partial_read && physical_column_positions_to_read.positions.empty())
physical_column_positions_to_read.positions.emplace_back(schema.getPositionByName(ProtonConsts::RESERVED_EVENT_TIME));
}
}
diff --git a/src/Storages/Streaming/SourceColumnsDescription.h b/src/Storages/Streaming/SourceColumnsDescription.h
index d6b3afc27b7..17bfadf2456 100644
--- a/src/Storages/Streaming/SourceColumnsDescription.h
+++ b/src/Storages/Streaming/SourceColumnsDescription.h
@@ -21,8 +21,12 @@ using StorageSnapshotPtr = std::shared_ptr;
struct SourceColumnsDescription
{
SourceColumnsDescription() = default;
- SourceColumnsDescription(const Names & required_column_names, StorageSnapshotPtr storage_snapshot);
- SourceColumnsDescription(const NamesAndTypesList & columns_to_read, const Block & schema, const NamesAndTypesList & all_extended_columns);
+ SourceColumnsDescription(const Names & required_column_names, StorageSnapshotPtr storage_snapshot, bool enable_partial_read = true);
+ SourceColumnsDescription(
+ const NamesAndTypesList & columns_to_read,
+ const Block & schema,
+ const NamesAndTypesList & all_extended_columns,
+ bool enable_partial_read = true);
enum class ReadColumnType : uint8_t
{
diff --git a/src/Storages/Streaming/StorageStream.cpp b/src/Storages/Streaming/StorageStream.cpp
index 9183f50adc2..0926f275fcb 100644
--- a/src/Storages/Streaming/StorageStream.cpp
+++ b/src/Storages/Streaming/StorageStream.cpp
@@ -601,22 +601,18 @@ void StorageStream::readStreaming(
assert(query_info.seek_to_info);
const auto & settings_ref = context_->getSettingsRef();
- /// 1) Checkpointed queries shall not be multiplexed
- /// 2) Queries which seek to a specific timestamp shall not be multiplexed
- auto share_resource_group = (settings_ref.query_resource_group.value == "shared")
- && (query_info.seek_to_info->getSeekTo().empty() || query_info.seek_to_info->getSeekTo() == "latest")
- && (settings_ref.exec_mode == ExecuteMode::NORMAL);
-
- if (share_resource_group)
+ if (settings_ref.query_resource_group.value == "shared")
{
+ auto offsets = stream_shards.back()->getOffsets(query_info.seek_to_info);
for (auto stream_shard : shards_to_read)
{
+ const auto & offset = offsets[stream_shard->shard];
if (!column_names.empty())
pipes.emplace_back(
- stream_shard->source_multiplexers->createChannel(stream_shard->shard, column_names, storage_snapshot, context_));
+ source_multiplexers->createChannel(std::move(stream_shard), column_names, storage_snapshot, context_, offset));
else
- pipes.emplace_back(stream_shard->source_multiplexers->createChannel(
- stream_shard->shard, {ProtonConsts::RESERVED_EVENT_TIME}, storage_snapshot, context_));
+ pipes.emplace_back(source_multiplexers->createChannel(
+ std::move(stream_shard), {ProtonConsts::RESERVED_EVENT_TIME}, storage_snapshot, context_, offset));
}
LOG_INFO(log, "Starting reading {} streams in shared resource group", pipes.size());
@@ -945,6 +941,8 @@ void StorageStream::startup()
assert(native_log->enabled());
}
+ source_multiplexers.reset(new StreamingStoreSourceMultiplexers(getContext(), log));
+
log_initialized.test_and_set();
LOG_INFO(log, "Started");
diff --git a/src/Storages/Streaming/StorageStream.h b/src/Storages/Streaming/StorageStream.h
index 15e0f901763..2afc88c8250 100644
--- a/src/Storages/Streaming/StorageStream.h
+++ b/src/Storages/Streaming/StorageStream.h
@@ -10,6 +10,7 @@
#include
#include
+#include
namespace nlog
{
@@ -298,8 +299,7 @@ class StorageStream final : public shared_ptr_helper, public Merg
UInt64 base_block_id,
UInt64 sub_block_id);
- void
- appendToNativeLog(nlog::RecordPtr & record, IngestMode /*ingest_mode*/, klog::AppendCallback callback, klog::CallbackData data);
+ void appendToNativeLog(nlog::RecordPtr & record, IngestMode /*ingest_mode*/, klog::AppendCallback callback, klog::CallbackData data);
void appendToKafka(
nlog::RecordPtr & record,
@@ -354,5 +354,8 @@ class StorageStream final : public shared_ptr_helper, public Merg
std::atomic_flag inited;
std::atomic_flag stopped;
+
+ /// Multiplex latest records of each shard.
+ std::unique_ptr source_multiplexers;
};
}
diff --git a/src/Storages/Streaming/StreamShard.cpp b/src/Storages/Streaming/StreamShard.cpp
index cf90cf7def1..ebe9c1e003b 100644
--- a/src/Storages/Streaming/StreamShard.cpp
+++ b/src/Storages/Streaming/StreamShard.cpp
@@ -124,8 +124,6 @@ StreamShard::~StreamShard()
void StreamShard::startup()
{
- source_multiplexers.reset(new StreamingStoreSourceMultiplexers(shared_from_this(), storage_stream->getContext(), log));
-
initLog();
/// for virtual tables or in-memory storage type, there is no storage object
diff --git a/src/Storages/Streaming/StreamShard.h b/src/Storages/Streaming/StreamShard.h
index 781001cce44..1b47c486f69 100644
--- a/src/Storages/Streaming/StreamShard.h
+++ b/src/Storages/Streaming/StreamShard.h
@@ -148,8 +148,6 @@ class StreamShard final : public std::enable_shared_from_this
std::unique_ptr callback_data;
- std::unique_ptr source_multiplexers;
-
// For random shard index generation
mutable std::mutex rng_mutex;
pcg64 rng;
diff --git a/src/Storages/Streaming/StreamingBlockReaderNativeLog.cpp b/src/Storages/Streaming/StreamingBlockReaderNativeLog.cpp
index 64b0f613441..69b18b598ac 100644
--- a/src/Storages/Streaming/StreamingBlockReaderNativeLog.cpp
+++ b/src/Storages/Streaming/StreamingBlockReaderNativeLog.cpp
@@ -148,9 +148,8 @@ nlog::RecordPtrs StreamingBlockReaderNativeLog::processCached(nlog::RecordPtrs r
{
/// In general, an object has a large number of subcolumns,
/// so when a few subcolumns required for the object, we only copy partials to improve performance
- if (isObject(col_with_type->type) && !schema_ctx.column_positions.positions.empty())
+ if (isObject(col_with_type->type) && !schema_ctx.column_positions.subcolumns.empty())
{
- assert(column_names.size() == schema_ctx.column_positions.positions.size());
auto iter = schema_ctx.column_positions.subcolumns.find(i);
if (iter != schema_ctx.column_positions.subcolumns.end())
{
diff --git a/src/Storages/Streaming/StreamingStoreSource.cpp b/src/Storages/Streaming/StreamingStoreSource.cpp
index 5bf9dfb7ea2..11e4a01964c 100644
--- a/src/Storages/Streaming/StreamingStoreSource.cpp
+++ b/src/Storages/Streaming/StreamingStoreSource.cpp
@@ -1,7 +1,7 @@
-#include "StreamingStoreSource.h"
-#include "StreamShard.h"
-#include "StreamingBlockReaderKafka.h"
-#include "StreamingBlockReaderNativeLog.h"
+#include
+#include
+#include
+#include
#include
#include
@@ -16,7 +16,8 @@ StreamingStoreSource::StreamingStoreSource(
ContextPtr context_,
Int64 sn,
Poco::Logger * log_)
- : StreamingStoreSourceBase(header, storage_snapshot_, std::move(context_), log_, ProcessorID::StreamingStoreSourceID)
+ : StreamingStoreSourceBase(
+ header, storage_snapshot_, /*enable_partial_read*/ true, std::move(context_), log_, ProcessorID::StreamingStoreSourceID)
{
const auto & settings = query_context->getSettingsRef();
if (settings.record_consume_batch_count.value != 0)
@@ -98,7 +99,8 @@ void StreamingStoreSource::readAndProcess()
/// NOTE: The `FilterTransform` will try optimizing filter ConstColumn to always_false or always_true,
/// for exmaple: `_tp_sn < 1`, if filter first data _tp_sn is 0, it will be optimized always_true.
/// So we can not create a constant column, since the virtual column data isn't constants value in fact.
- auto virtual_column = columns_desc.virtual_col_types[pos.virtualPosition()]->createColumnConst(rows, ts)->convertToFullColumnIfConst();
+ auto virtual_column
+ = columns_desc.virtual_col_types[pos.virtualPosition()]->createColumnConst(rows, ts)->convertToFullColumnIfConst();
columns.push_back(std::move(virtual_column));
break;
}
diff --git a/src/Storages/Streaming/StreamingStoreSource.h b/src/Storages/Streaming/StreamingStoreSource.h
index a28acb88511..72aa93e2645 100644
--- a/src/Storages/Streaming/StreamingStoreSource.h
+++ b/src/Storages/Streaming/StreamingStoreSource.h
@@ -1,6 +1,6 @@
#pragma once
-#include "StreamingStoreSourceBase.h"
+#include
namespace DB
{
diff --git a/src/Storages/Streaming/StreamingStoreSourceBase.cpp b/src/Storages/Streaming/StreamingStoreSourceBase.cpp
index 61ec43ae665..9b47c4385ee 100644
--- a/src/Storages/Streaming/StreamingStoreSourceBase.cpp
+++ b/src/Storages/Streaming/StreamingStoreSourceBase.cpp
@@ -1,4 +1,4 @@
-#include "StreamingStoreSourceBase.h"
+#include
#include
#include
@@ -16,14 +16,19 @@ extern const int RECOVER_CHECKPOINT_FAILED;
}
StreamingStoreSourceBase::StreamingStoreSourceBase(
- const Block & header, const StorageSnapshotPtr & storage_snapshot_, ContextPtr query_context_, Poco::Logger * log_, ProcessorID pid_)
+ const Block & header,
+ const StorageSnapshotPtr & storage_snapshot_,
+ bool enable_partial_read,
+ ContextPtr query_context_,
+ Poco::Logger * log_,
+ ProcessorID pid_)
: ISource(header, true, pid_)
, storage_snapshot(
std::make_shared(*storage_snapshot_)) /// We like to make a copy of it since we will mutate the snapshot
, query_context(std::move(query_context_))
, log(log_)
, header_chunk(header.getColumns(), 0)
- , columns_desc(header.getNames(), storage_snapshot)
+ , columns_desc(header.getNames(), storage_snapshot, enable_partial_read)
{
is_streaming = true;
@@ -52,7 +57,8 @@ StreamingStoreSourceBase::getSubcolumnFromBlock(const Block & block, size_t pare
/// Convert subcolumn if the subcolumn type of dynamic object may be dismatched with header.
/// FIXME: Cache the ExpressionAction
- Block subcolumn_block({ColumnWithTypeAndName{std::move(subcolumn), std::move(subcolumn_type), subcolumn_pair.name}}); /// NOLINT(performance-move-const-arg)
+ Block subcolumn_block({ColumnWithTypeAndName{
+ std::move(subcolumn), std::move(subcolumn_type), subcolumn_pair.name}}); /// NOLINT(performance-move-const-arg)
ExpressionActions convert_act(ActionsDAG::makeConvertingActions(
subcolumn_block.getColumnsWithTypeAndName(),
{ColumnWithTypeAndName{subcolumn_pair.type->createColumn(), subcolumn_pair.type, subcolumn_pair.name}},
diff --git a/src/Storages/Streaming/StreamingStoreSourceBase.h b/src/Storages/Streaming/StreamingStoreSourceBase.h
index 69194cf49e8..2573c68a0ab 100644
--- a/src/Storages/Streaming/StreamingStoreSourceBase.h
+++ b/src/Storages/Streaming/StreamingStoreSourceBase.h
@@ -1,6 +1,6 @@
#pragma once
-#include "SourceColumnsDescription.h"
+#include
#include
#include
@@ -16,7 +16,12 @@ class StreamingStoreSourceBase : public ISource
{
public:
StreamingStoreSourceBase(
- const Block & header, const StorageSnapshotPtr & storage_snapshot_, ContextPtr context_, Poco::Logger * log_, ProcessorID pid_);
+ const Block & header,
+ const StorageSnapshotPtr & storage_snapshot_,
+ bool enable_partial_read,
+ ContextPtr context_,
+ Poco::Logger * log_,
+ ProcessorID pid_);
Chunk generate() override;
diff --git a/src/Storages/Streaming/StreamingStoreSourceChannel.cpp b/src/Storages/Streaming/StreamingStoreSourceChannel.cpp
index 1f51808fa44..81c41eb92b7 100644
--- a/src/Storages/Streaming/StreamingStoreSourceChannel.cpp
+++ b/src/Storages/Streaming/StreamingStoreSourceChannel.cpp
@@ -1,8 +1,9 @@
-#include "StreamingStoreSourceChannel.h"
-#include "StreamingStoreSourceMultiplexer.h"
+#include
#include
+#include
#include
+#include
namespace DB
{
@@ -12,24 +13,34 @@ StreamingStoreSourceChannel::StreamingStoreSourceChannel(
StorageSnapshotPtr storage_snapshot_,
ContextPtr query_context_,
Poco::Logger * log_)
- : StreamingStoreSourceBase(header, storage_snapshot_, std::move(query_context_), log_, ProcessorID::StreamingStoreSourceChannelID) /// NOLINT(performance-move-const-arg)
+ : StreamingStoreSourceBase(
+ header,
+ storage_snapshot_,
+ /*enable_partial_read*/ false,
+ std::move(query_context_),
+ log_,
+ ProcessorID::StreamingStoreSourceChannelID) /// NOLINT(performance-move-const-arg)
, id(sequence_id++)
, multiplexer(std::move(multiplexer_))
, records_queue(1000)
{
+ const auto & settings = query_context->getSettingsRef();
+ if (settings.record_consume_timeout_ms.value != 0)
+ record_consume_timeout_ms = static_cast(settings.record_consume_timeout_ms.value);
}
std::atomic StreamingStoreSourceChannel::sequence_id = 0;
StreamingStoreSourceChannel::~StreamingStoreSourceChannel()
{
+ std::lock_guard lock(multiplexer_mutex);
multiplexer->removeChannel(id);
}
void StreamingStoreSourceChannel::readAndProcess()
{
nlog::RecordPtrs records;
- auto got_records = records_queue.tryPop(records, 100);
+ auto got_records = records_queue.tryPop(records, record_consume_timeout_ms);
if (!got_records)
return;
@@ -48,13 +59,19 @@ void StreamingStoreSourceChannel::readAndProcess()
if (record->empty())
continue;
+ /// Ingore duplicate records, It's possible for re-attach to shared group from independent multiplexer
+ if (record->getSN() <= last_sn)
+ continue;
+
+ last_sn = record->getSN();
+
Columns columns;
columns.reserve(header_chunk.getNumColumns());
Block & block = record->getBlock();
auto rows = block.rows();
/// Block in channel shall always contain full columns
- assert(block.columns() == columns_desc.positions.size());
+ assert(block.columns() == columns_desc.physical_column_positions_to_read.positions.size());
fillAndUpdateObjectsIfNecessary(block);
@@ -76,8 +93,12 @@ void StreamingStoreSourceChannel::readAndProcess()
/// The current column to return is a virtual column which needs be calculated lively
assert(columns_desc.virtual_col_calcs[pos.virtualPosition()]);
auto ts = columns_desc.virtual_col_calcs[pos.virtualPosition()](record);
- auto time_column = columns_desc.virtual_col_types[pos.virtualPosition()]->createColumnConst(rows, ts);
- columns.push_back(std::move(time_column));
+ /// NOTE: The `FilterTransform` will try optimizing filter ConstColumn to always_false or always_true,
+ /// for exmaple: `_tp_sn < 1`, if filter first data _tp_sn is 0, it will be optimized always_true.
+ /// So we can not create a constant column, since the virtual column data isn't constants value in fact.
+ auto virtual_column
+ = columns_desc.virtual_col_types[pos.virtualPosition()]->createColumnConst(rows, ts)->convertToFullColumnIfConst();
+ columns.push_back(std::move(virtual_column));
break;
}
case SourceColumnsDescription::ReadColumnType::SUB:
@@ -109,6 +130,27 @@ void StreamingStoreSourceChannel::add(nlog::RecordPtrs records)
std::pair StreamingStoreSourceChannel::getStreamShard() const
{
+ std::lock_guard lock(multiplexer_mutex);
return multiplexer->getStreamShard();
}
+
+void StreamingStoreSourceChannel::attachTo(std::shared_ptr new_multiplexer)
+{
+ std::lock_guard lock(multiplexer_mutex);
+ multiplexer = std::move(new_multiplexer);
+}
+
+void StreamingStoreSourceChannel::recover(CheckpointContextPtr ckpt_ctx_)
+{
+ StreamingStoreSourceBase::recover(std::move(ckpt_ctx_));
+
+ if (last_sn >= 0)
+ {
+ std::lock_guard lock(multiplexer_mutex);
+ assert(multiplexer->totalChannels() == 1);
+ multiplexer->resetSequenceNumber(last_sn + 1);
+ multiplexer->startup();
+ }
+}
+
}
diff --git a/src/Storages/Streaming/StreamingStoreSourceChannel.h b/src/Storages/Streaming/StreamingStoreSourceChannel.h
index 9d8264abb02..e7c68a5857b 100644
--- a/src/Storages/Streaming/StreamingStoreSourceChannel.h
+++ b/src/Storages/Streaming/StreamingStoreSourceChannel.h
@@ -1,8 +1,7 @@
#pragma once
-#include "StreamingStoreSourceBase.h"
-
#include
+#include
#include
namespace DB
@@ -21,6 +20,10 @@ class StreamingStoreSourceChannel final : public StreamingStoreSourceBase
~StreamingStoreSourceChannel() override;
+ void attachTo(std::shared_ptr new_multiplexer);
+
+ void recover(CheckpointContextPtr ckpt_ctx_) override;
+
String getName() const override { return "StreamingStoreSourceChannel"; }
UInt32 getID() const { return id; }
@@ -35,8 +38,12 @@ class StreamingStoreSourceChannel final : public StreamingStoreSourceBase
static std::atomic sequence_id;
UInt32 id;
+
+ mutable std::mutex multiplexer_mutex;
std::shared_ptr multiplexer;
+ Int32 record_consume_timeout_ms = 100;
+
/// FIXME, use another lock-free one?
ConcurrentBoundedQueue records_queue;
};
diff --git a/src/Storages/Streaming/StreamingStoreSourceMultiplexer.cpp b/src/Storages/Streaming/StreamingStoreSourceMultiplexer.cpp
index 73895db03f5..88d6b151a57 100644
--- a/src/Storages/Streaming/StreamingStoreSourceMultiplexer.cpp
+++ b/src/Storages/Streaming/StreamingStoreSourceMultiplexer.cpp
@@ -1,7 +1,9 @@
-#include "StreamingStoreSourceMultiplexer.h"
-#include "StreamShard.h"
+#include
#include
+#include
+#include
+#include
#include
namespace DB
@@ -13,18 +15,51 @@ extern const int DWAL_FATAL_ERROR;
}
StreamingStoreSourceMultiplexer::StreamingStoreSourceMultiplexer(
- UInt32 id_, std::shared_ptr stream_shard_, ContextPtr global_context, Poco::Logger * log_)
+ UInt32 id_,
+ std::shared_ptr stream_shard_,
+ ContextPtr query_context,
+ Poco::Logger * log_,
+ StreamingStoreSourceMultiplexer::AttachToSharedGroupFunc attach_to_shared_group_func_)
: id(id_)
, stream_shard(std::move(stream_shard_))
+ , attach_to_shared_group_func(attach_to_shared_group_func_)
, poller(std::make_unique(1))
, last_metrics_log_time(MonotonicMilliseconds::now())
, log(log_)
{
- auto consumer = klog::KafkaWALPool::instance(global_context).getOrCreateStreaming(stream_shard->logStoreClusterId());
- reader = std::make_shared(
- stream_shard, -1 /*latest*/, SourceColumnsDescription::PhysicalColumnPositions{}, std::move(consumer), log);
+ const auto & settings = query_context->getSettingsRef();
+ if (settings.record_consume_batch_count.value != 0)
+ record_consume_batch_count = static_cast(settings.record_consume_batch_count.value);
- poller->scheduleOrThrowOnError([this] { backgroundPoll(); });
+ if (settings.record_consume_timeout_ms.value != 0)
+ record_consume_timeout_ms = static_cast(settings.record_consume_timeout_ms.value);
+
+ if (stream_shard->isLogStoreKafka())
+ {
+ auto consumer = klog::KafkaWALPool::instance(query_context).getOrCreateStreaming(stream_shard->logStoreClusterId());
+ assert(consumer);
+ kafka_reader = std::make_unique(
+ stream_shard, nlog::LATEST_SN, SourceColumnsDescription::PhysicalColumnPositions{}, std::move(consumer), log);
+ }
+ else
+ {
+ auto fetch_buffer_size = query_context->getSettingsRef().fetch_buffer_size;
+ fetch_buffer_size = std::min(64 * 1024 * 1024, fetch_buffer_size);
+ nativelog_reader = std::make_unique(
+ stream_shard,
+ nlog::LATEST_SN,
+ record_consume_timeout_ms,
+ fetch_buffer_size,
+ /*schema_provider*/ nullptr,
+ /*schema_version*/ 0,
+ SourceColumnsDescription::PhysicalColumnPositions{},
+ log);
+ }
+
+ /// So far, the `attach_to_shared_group_func` is only set in an independent multiplexer, which requires lazy start up
+ bool need_lazy_startup = static_cast(attach_to_shared_group_func);
+ if (!need_lazy_startup)
+ startup();
}
StreamingStoreSourceMultiplexer::~StreamingStoreSourceMultiplexer()
@@ -45,13 +80,38 @@ StreamingStoreSourceMultiplexer::~StreamingStoreSourceMultiplexer()
metrics.total_time / (metrics.total_count == 0 ? 1 : metrics.total_count));
}
+void StreamingStoreSourceMultiplexer::startup()
+{
+ if (started.test_and_set())
+ return;
+
+ poller->scheduleOrThrowOnError([this] { backgroundPoll(); });
+}
+
+void StreamingStoreSourceMultiplexer::resetSequenceNumber(Int64 start_sn)
+{
+ assert(!started.test());
+ if (nativelog_reader)
+ nativelog_reader->resetSequenceNumber(start_sn);
+ else
+ kafka_reader->resetOffset(start_sn);
+}
+
+nlog::RecordPtrs StreamingStoreSourceMultiplexer::read()
+{
+ if (nativelog_reader)
+ return nativelog_reader->read();
+ else
+ return kafka_reader->read(record_consume_batch_count, record_consume_timeout_ms);
+}
+
void StreamingStoreSourceMultiplexer::backgroundPoll()
{
while (!shutdown)
{
try
{
- auto records = reader->read(1000, 100);
+ auto records = read();
auto start = MonotonicNanoseconds::now();
if (!records.empty())
@@ -61,8 +121,17 @@ void StreamingStoreSourceMultiplexer::backgroundPoll()
metrics.total_time = MonotonicNanoseconds::now() - start;
++metrics.total_count;
}
+ else if (attach_to_shared_group_func)
+ {
+ /// Assume that the latest record has been read, we can try attach to shared group,
+ /// After attached, means its all channels will consume from shared group
+ attach_to_shared_group_func(shared_from_this());
+ attach_to_shared_group_func = {};
+
+ LOG_INFO(log, "StreamingStoreSourceMultiplexer id={} for shard {} attached to shared group.", id, stream_shard->getShard());
+ }
- if (start - last_metrics_log_time >= 5000000000)
+ if (start - last_metrics_log_time >= 30000000000)
{
LOG_INFO(
log,
@@ -126,6 +195,11 @@ void StreamingStoreSourceMultiplexer::fanOut(nlog::RecordPtrs records)
if (channel)
fanout_channels.push_back(std::move(channel));
}
+
+ /// Update fanout max sn
+ auto iter = std::find_if(records.rbegin(), records.rend(), [](const auto & record) { return !record->empty(); });
+ if (iter != records.rend())
+ fanout_sn = (*iter)->getSN();
}
for (auto & channel : fanout_channels)
@@ -147,15 +221,41 @@ StreamingStoreSourceChannelPtr StreamingStoreSourceMultiplexer::createChannel(
return channel;
}
+bool StreamingStoreSourceMultiplexer::tryDetachChannelsInto(std::shared_ptr new_multiplexer)
+{
+ /// NOTE: `fanout_sn` is only updated during locking channels_mutex in `fanOut()`
+ {
+ std::lock_guard lock1{channels_mutex};
+ {
+ std::lock_guard lock2{new_multiplexer->channels_mutex};
+ if (fanout_sn < new_multiplexer->fanout_sn)
+ return false;
+
+ for (auto & shard_channel : channels)
+ {
+ auto channel = shard_channel.second.lock();
+ if (channel)
+ {
+ channel->attachTo(new_multiplexer);
+ [[maybe_unused]] auto [_, inserted] = new_multiplexer->channels.emplace(shard_channel.first, std::move(channel));
+ assert(inserted);
+ }
+ }
+ }
+ channels.clear();
+ }
+ doShutdown();
+ return true;
+}
+
void StreamingStoreSourceMultiplexer::removeChannel(UInt32 channel_id)
{
bool need_shutdown = false;
LOG_INFO(log, "Removing streaming store channel id={}", channel_id);
{
std::lock_guard lock{channels_mutex};
- auto erased = channels.erase(channel_id);
+ [[maybe_unused]] auto erased = channels.erase(channel_id);
assert(erased == 1);
- (void)erased;
if (channels.empty())
need_shutdown = true;
@@ -180,17 +280,31 @@ std::pair StreamingStoreSourceMultiplexer::getStreamShard() const
return stream_shard->getStreamShard();
}
-StreamingStoreSourceMultiplexers::StreamingStoreSourceMultiplexers(
- std::shared_ptr stream_shard_, ContextPtr global_context_, Poco::Logger * log_)
- : stream_shard(std::move(stream_shard_)), global_context(std::move(global_context_)), log(log_)
+StreamingStoreSourceMultiplexers::StreamingStoreSourceMultiplexers(ContextPtr global_context_, Poco::Logger * log_)
+ : global_context(std::move(global_context_)), log(log_)
{
}
StreamingStoreSourceChannelPtr StreamingStoreSourceMultiplexers::createChannel(
- Int32 shard, const Names & column_names, const StorageSnapshotPtr & storage_snapshot, ContextPtr query_context)
+ std::shared_ptr stream_shard,
+ const Names & column_names,
+ const StorageSnapshotPtr & storage_snapshot,
+ ContextPtr query_context,
+ Int64 start_sn)
{
+ /// In following scenarios, we need independent channel to read some past data
+ /// 1) Recover from checkpointed queries
+ /// 2) Queries which seek to a specific timestamp or earliest
+ if (query_context->getSettingsRef().exec_mode == ExecuteMode::RECOVER)
+ return createIndependentChannelForRecover(stream_shard, column_names, storage_snapshot, query_context);
+ else if (start_sn != nlog::LATEST_SN)
+ return createIndependentChannelWithSeekTo(stream_shard, column_names, storage_snapshot, query_context, start_sn);
+
+ assert(start_sn == nlog::LATEST_SN);
+
std::lock_guard lock{multiplexers_mutex};
+ auto shard = stream_shard->getShard();
auto iter = multiplexers.find(shard);
if (iter == multiplexers.end())
{
@@ -225,8 +339,7 @@ StreamingStoreSourceChannelPtr StreamingStoreSourceMultiplexers::createChannel(
if (best_multiplexer)
{
/// Found one
- /// If min channels is greater than > 20, create another multiplexer for this shard
- /// FIXME, make this configurable
+ /// If min channels is greater than > 20(default value), create another multiplexer for this shard
if (min_channels > global_context->getSettingsRef().max_channels_per_resource_group.value)
{
best_multiplexer = std::make_shared(iter->second.size(), stream_shard, global_context, log);
@@ -243,4 +356,61 @@ StreamingStoreSourceChannelPtr StreamingStoreSourceMultiplexers::createChannel(
return multiplexer->createChannel(column_names, storage_snapshot, query_context);
}
}
+
+StreamingStoreSourceChannelPtr StreamingStoreSourceMultiplexers::createIndependentChannelForRecover(
+ std::shared_ptr stream_shard,
+ const Names & column_names,
+ const StorageSnapshotPtr & storage_snapshot,
+ ContextPtr query_context)
+{
+ /// will startup after `StreamingStoreSourceChannel::recover()` and reset recovered sn
+ /// The `multiplexer` is cached in created StreamingStoreSourceChannel, we can release this one
+ auto multiplexer = std::make_shared(
+ 0, std::move(stream_shard), global_context, log, [this](auto multiplexer_) { attachToSharedGroup(multiplexer_); });
+ return multiplexer->createChannel(column_names, storage_snapshot, query_context);
+}
+
+StreamingStoreSourceChannelPtr StreamingStoreSourceMultiplexers::createIndependentChannelWithSeekTo(
+ std::shared_ptr stream_shard,
+ const Names & column_names,
+ const StorageSnapshotPtr & storage_snapshot,
+ ContextPtr query_context,
+ Int64 start_sn)
+{
+ /// The `multiplexer` is cached in created StreamingStoreSourceChannel, we can release this one
+ auto multiplexer = std::make_shared(
+ 0, std::move(stream_shard), global_context, log, [this](auto multiplexer_) { attachToSharedGroup(multiplexer_); });
+ auto channel = multiplexer->createChannel(column_names, storage_snapshot, query_context);
+ multiplexer->resetSequenceNumber(start_sn);
+ multiplexer->startup();
+ return channel;
+}
+
+void StreamingStoreSourceMultiplexers::attachToSharedGroup(StreamingStoreSourceMultiplexerPtr multiplexer)
+{
+ std::lock_guard lock{multiplexers_mutex};
+
+ /// First release the detached multiplexers
+ detached_multiplexers.clear();
+
+ auto & multiplexer_list = multiplexers[multiplexer->stream_shard->getShard()];
+ for (auto & shared_multiplexer : multiplexer_list)
+ {
+ /// Skip multiplexer that already have too many channels
+ if (shared_multiplexer->totalChannels() > global_context->getSettingsRef().max_channels_per_resource_group.value)
+ continue;
+
+ if (multiplexer->tryDetachChannelsInto(shared_multiplexer))
+ {
+ /// keep the detached multiplexer for a while since we cannot release itself in his own background polling thread,
+ /// we will release it on next call (another background polling thread),
+ detached_multiplexers.emplace_back(std::move(multiplexer));
+ return;
+ }
+ }
+
+ /// Not detach channels into any existed shared multiplexer, so we reuse it and join in shared groups
+ multiplexer->id = multiplexer_list.size(); /// it's thread safe
+ multiplexer_list.emplace_back(std::move(multiplexer));
+}
}
diff --git a/src/Storages/Streaming/StreamingStoreSourceMultiplexer.h b/src/Storages/Streaming/StreamingStoreSourceMultiplexer.h
index aaa8c26785b..63ec4a75e93 100644
--- a/src/Storages/Streaming/StreamingStoreSourceMultiplexer.h
+++ b/src/Storages/Streaming/StreamingStoreSourceMultiplexer.h
@@ -1,13 +1,14 @@
#pragma once
-#include "StreamingBlockReaderKafka.h"
-#include "StreamingStoreSourceChannel.h"
-
-#include
+#include
+#include
+#include
namespace DB
{
class StreamShard;
+struct StorageSnapshot;
+using StorageSnapshotPtr = std::shared_ptr;
/// The multiplexer fans out one streaming store reader to different streaming queries. This has
/// efficiency of disk read / TFF deserialization, memory allocation etc. But we may introduce
@@ -23,13 +24,20 @@ class StreamShard;
class StreamingStoreSourceMultiplexer final : public std::enable_shared_from_this
{
public:
+ using AttachToSharedGroupFunc = std::function)>;
StreamingStoreSourceMultiplexer(
- UInt32 id_, std::shared_ptr storage_, ContextPtr global_context, Poco::Logger * log_);
+ UInt32 id_,
+ std::shared_ptr storage_,
+ ContextPtr global_context,
+ Poco::Logger * log_,
+ AttachToSharedGroupFunc attach_to_shared_group_func = {});
~StreamingStoreSourceMultiplexer();
StreamingStoreSourceChannelPtr
createChannel(const Names & column_names, const StorageSnapshotPtr & storage_snapshot, ContextPtr query_context);
+ bool tryDetachChannelsInto(std::shared_ptr new_multiplexer);
+
void removeChannel(UInt32 channel_id);
size_t totalChannels() const;
@@ -38,15 +46,31 @@ class StreamingStoreSourceMultiplexer final : public std::enable_shared_from_thi
std::pair getStreamShard() const;
+ /// NOTE: Reset sequence number only before startup()
+ void resetSequenceNumber(Int64 start_sn);
+ void startup();
+
private:
+ inline nlog::RecordPtrs read();
void backgroundPoll();
void fanOut(nlog::RecordPtrs records);
void doShutdown();
+ friend class StreamingStoreSourceMultiplexers;
+
private:
UInt32 id;
std::shared_ptr stream_shard;
- std::shared_ptr reader;
+ std::unique_ptr kafka_reader;
+ std::unique_ptr nativelog_reader;
+
+ std::atomic_flag started;
+
+ AttachToSharedGroupFunc attach_to_shared_group_func;
+ Int64 fanout_sn = -1;
+
+ UInt32 record_consume_batch_count = 1000;
+ Int32 record_consume_timeout_ms = 100;
std::unique_ptr poller;
std::atomic shutdown = false;
@@ -73,17 +97,37 @@ using StreamingStoreSourceMultiplexerPtrs = std::list stream_shard_, ContextPtr global_context_, Poco::Logger * log_);
+ StreamingStoreSourceMultiplexers(ContextPtr global_context_, Poco::Logger * log_);
- StreamingStoreSourceChannelPtr
- createChannel(Int32 shard, const Names & column_names, const StorageSnapshotPtr & storage_snapshot, ContextPtr query_context);
+ StreamingStoreSourceChannelPtr createChannel(
+ std::shared_ptr stream_shard,
+ const Names & column_names,
+ const StorageSnapshotPtr & storage_snapshot,
+ ContextPtr query_context,
+ Int64 start_sn);
+
+private:
+ StreamingStoreSourceChannelPtr createIndependentChannelForRecover(
+ std::shared_ptr stream_shard,
+ const Names & column_names,
+ const StorageSnapshotPtr & storage_snapshot,
+ ContextPtr query_context);
+
+ StreamingStoreSourceChannelPtr createIndependentChannelWithSeekTo(
+ std::shared_ptr stream_shard,
+ const Names & column_names,
+ const StorageSnapshotPtr & storage_snapshot,
+ ContextPtr query_context,
+ Int64 start_sn);
+
+ void attachToSharedGroup(StreamingStoreSourceMultiplexerPtr multiplexer);
private:
- std::shared_ptr stream_shard;
ContextPtr global_context;
Poco::Logger * log;
std::mutex multiplexers_mutex;
std::unordered_map multiplexers;
+ StreamingStoreSourceMultiplexerPtrs detached_multiplexers;
};
}
diff --git a/src/Storages/Streaming/tests/gtest_source_columns_description.cpp b/src/Storages/Streaming/tests/gtest_source_columns_description.cpp
index d4d2d99d3c4..c833c65d19f 100644
--- a/src/Storages/Streaming/tests/gtest_source_columns_description.cpp
+++ b/src/Storages/Streaming/tests/gtest_source_columns_description.cpp
@@ -666,3 +666,149 @@ TEST(SourceColumnsDescription, PhysicalAndVirtualAndSubcolumn)
EXPECT_EQ(columns_desc.physical_object_columns_to_read.rbegin()->name, "col4");
}
}
+
+TEST(SourceColumnsDescription, PhysicalAndVirtualAndSubcolumnWithoutPartialRead)
+{
+ auto schema = generateCommonSchema();
+
+ { /// physical + virtual + subcolumn
+ DB::NamesAndTypesList columns_to_read{
+ {"col1", getType("string")},
+ {DB::ProtonConsts::RESERVED_APPEND_TIME, getType("int64")},
+ {"col3", "y", getType("tuple(x int, y string)"), getType("string")},
+ {"col5", "abc", getType("tuple(abc int, xyz string)"), getType("int")}};
+ DB::SourceColumnsDescription columns_desc(columns_to_read, schema, all_extended_columns, /*enable_partial_read*/false);
+ /// Pos to read
+ ASSERT_EQ(columns_desc.positions.size(), 4);
+ ASSERT_EQ(columns_desc.positions[0].type(), Physical);
+ EXPECT_EQ(columns_desc.positions[0].physicalPosition(), 1);
+ ASSERT_EQ(columns_desc.positions[1].type(), Virtual);
+ EXPECT_EQ(columns_desc.positions[1].virtualPosition(), 0);
+ ASSERT_EQ(columns_desc.positions[2].type(), Sub);
+ EXPECT_EQ(columns_desc.positions[2].parentPosition(), 3);
+ EXPECT_EQ(columns_desc.positions[2].subPosition(), 0);
+ ASSERT_EQ(columns_desc.positions[3].type(), Sub);
+ EXPECT_EQ(columns_desc.positions[3].parentPosition(), 5);
+ EXPECT_EQ(columns_desc.positions[3].subPosition(), 1);
+
+ /// Physical columns description
+ ASSERT_EQ(columns_desc.physical_column_positions_to_read.positions.size(), 8);
+ EXPECT_EQ(columns_desc.physical_column_positions_to_read.positions[0], 0);
+ EXPECT_EQ(columns_desc.physical_column_positions_to_read.positions[1], 1);
+ EXPECT_EQ(columns_desc.physical_column_positions_to_read.positions[2], 2);
+ EXPECT_EQ(columns_desc.physical_column_positions_to_read.positions[3], 3);
+ EXPECT_EQ(columns_desc.physical_column_positions_to_read.positions[4], 4);
+ EXPECT_EQ(columns_desc.physical_column_positions_to_read.positions[5], 5);
+ EXPECT_EQ(columns_desc.physical_column_positions_to_read.positions[6], 6);
+ EXPECT_EQ(columns_desc.physical_column_positions_to_read.positions[7], 7);
+ ASSERT_EQ(columns_desc.physical_column_positions_to_read.subcolumns.size(), 2);
+ EXPECT_EQ(columns_desc.physical_column_positions_to_read.subcolumns[3], std::vector({"y"}));
+ EXPECT_EQ(columns_desc.physical_column_positions_to_read.subcolumns[5], std::vector({"abc"}));
+
+ /// Virtual columns description
+ ASSERT_EQ(columns_desc.virtual_col_calcs.size(), 1);
+ ASSERT_EQ(columns_desc.virtual_col_calcs.size(), columns_desc.virtual_col_types.size());
+ ASSERT_TRUE(columns_desc.virtual_col_types[0]->equals(*getType("int64")));
+
+ /// Sub-columns description
+ ASSERT_EQ(columns_desc.subcolumns_to_read.size(), 2);
+ EXPECT_TRUE(columns_desc.subcolumns_to_read[0].isSubcolumn());
+ EXPECT_EQ(columns_desc.subcolumns_to_read[0].getNameInStorage(), "col3");
+ EXPECT_EQ(columns_desc.subcolumns_to_read[0].getSubcolumnName(), "y");
+ EXPECT_TRUE(columns_desc.subcolumns_to_read[0].getTypeInStorage()->equals(*getType("tuple(x int, y string)")));
+ EXPECT_TRUE(columns_desc.subcolumns_to_read[0].type->equals(*getType("string")));
+ EXPECT_EQ(columns_desc.subcolumns_to_read[0].name, "col3.y");
+ EXPECT_TRUE(columns_desc.subcolumns_to_read[1].isSubcolumn());
+ EXPECT_EQ(columns_desc.subcolumns_to_read[1].getNameInStorage(), "col5");
+ EXPECT_EQ(columns_desc.subcolumns_to_read[1].getSubcolumnName(), "abc");
+ EXPECT_TRUE(columns_desc.subcolumns_to_read[1].getTypeInStorage()->equals(*getType("json")));
+ EXPECT_TRUE(columns_desc.subcolumns_to_read[1].type->equals(*getType("int")));
+ EXPECT_EQ(columns_desc.subcolumns_to_read[1].name, "col5.abc");
+
+ /// Json description
+ ASSERT_EQ(columns_desc.physical_object_columns_to_read.size(), 1);
+ EXPECT_EQ(columns_desc.physical_object_columns_to_read.begin()->name, "col5");
+ }
+
+ { /// (complex) physical + virtual + subcolumn
+ DB::NamesAndTypesList columns_to_read{
+ {"col3", "y", getType("tuple(x int, y string)"), getType("string")},
+ {"col1", getType("string")},
+ {"col5", getType("json")},
+ {"col5", "xyz", getType("tuple(abc int, xyz string)"), getType("string")},
+ {"col4", getType("json")},
+ {"col2", getType("tuple(a int, b int)")},
+ {DB::ProtonConsts::RESERVED_APPEND_TIME, getType("int64")},
+ {"col5", "abc", getType("tuple(abc int, xyz string)"), getType("int")},
+ {DB::ProtonConsts::RESERVED_PROCESS_TIME, getType("int64")}};
+ DB::SourceColumnsDescription columns_desc(columns_to_read, schema, all_extended_columns, /*enable_partial_read*/ false);
+ /// Pos to read
+ ASSERT_EQ(columns_desc.positions.size(), 9);
+ ASSERT_EQ(columns_desc.positions[0].type(), Sub);
+ EXPECT_EQ(columns_desc.positions[0].parentPosition(), 3);
+ EXPECT_EQ(columns_desc.positions[0].subPosition(), 0);
+ ASSERT_EQ(columns_desc.positions[1].type(), Physical);
+ EXPECT_EQ(columns_desc.positions[1].physicalPosition(), 1);
+ ASSERT_EQ(columns_desc.positions[2].type(), Physical);
+ EXPECT_EQ(columns_desc.positions[2].physicalPosition(), 5);
+ ASSERT_EQ(columns_desc.positions[3].type(), Sub);
+ EXPECT_EQ(columns_desc.positions[3].parentPosition(), 5);
+ EXPECT_EQ(columns_desc.positions[3].subPosition(), 1);
+ ASSERT_EQ(columns_desc.positions[4].type(), Physical);
+ EXPECT_EQ(columns_desc.positions[4].physicalPosition(), 4);
+ ASSERT_EQ(columns_desc.positions[5].type(), Physical);
+ EXPECT_EQ(columns_desc.positions[5].physicalPosition(), 2);
+ ASSERT_EQ(columns_desc.positions[6].type(), Virtual);
+ EXPECT_EQ(columns_desc.positions[6].virtualPosition(), 0);
+ ASSERT_EQ(columns_desc.positions[7].type(), Sub);
+ EXPECT_EQ(columns_desc.positions[7].parentPosition(), 5);
+ EXPECT_EQ(columns_desc.positions[7].subPosition(), 2);
+ ASSERT_EQ(columns_desc.positions[8].type(), Virtual);
+ EXPECT_EQ(columns_desc.positions[8].virtualPosition(), 1);
+
+ /// Physical columns description
+ ASSERT_EQ(columns_desc.physical_column_positions_to_read.positions.size(), 8);
+ EXPECT_EQ(columns_desc.physical_column_positions_to_read.positions[0], 0);
+ EXPECT_EQ(columns_desc.physical_column_positions_to_read.positions[1], 1);
+ EXPECT_EQ(columns_desc.physical_column_positions_to_read.positions[2], 2);
+ EXPECT_EQ(columns_desc.physical_column_positions_to_read.positions[3], 3);
+ EXPECT_EQ(columns_desc.physical_column_positions_to_read.positions[4], 4);
+ EXPECT_EQ(columns_desc.physical_column_positions_to_read.positions[5], 5);
+ EXPECT_EQ(columns_desc.physical_column_positions_to_read.positions[6], 6);
+ EXPECT_EQ(columns_desc.physical_column_positions_to_read.positions[7], 7);
+ ASSERT_EQ(columns_desc.physical_column_positions_to_read.subcolumns.size(), 1);
+ EXPECT_EQ(columns_desc.physical_column_positions_to_read.subcolumns[3], std::vector({"y"}));
+
+ /// Virtual columns description
+ ASSERT_EQ(columns_desc.virtual_col_calcs.size(), 2);
+ ASSERT_EQ(columns_desc.virtual_col_calcs.size(), columns_desc.virtual_col_types.size());
+ ASSERT_TRUE(columns_desc.virtual_col_types[0]->equals(*getType("int64")));
+ ASSERT_TRUE(columns_desc.virtual_col_types[1]->equals(*getType("int64")));
+
+ /// Sub-columns description
+ ASSERT_EQ(columns_desc.subcolumns_to_read.size(), 3);
+ EXPECT_TRUE(columns_desc.subcolumns_to_read[0].isSubcolumn());
+ EXPECT_EQ(columns_desc.subcolumns_to_read[0].getNameInStorage(), "col3");
+ EXPECT_EQ(columns_desc.subcolumns_to_read[0].getSubcolumnName(), "y");
+ EXPECT_TRUE(columns_desc.subcolumns_to_read[0].getTypeInStorage()->equals(*getType("tuple(x int, y string)")));
+ EXPECT_TRUE(columns_desc.subcolumns_to_read[0].type->equals(*getType("string")));
+ EXPECT_EQ(columns_desc.subcolumns_to_read[0].name, "col3.y");
+ EXPECT_TRUE(columns_desc.subcolumns_to_read[1].isSubcolumn());
+ EXPECT_EQ(columns_desc.subcolumns_to_read[1].getNameInStorage(), "col5");
+ EXPECT_EQ(columns_desc.subcolumns_to_read[1].getSubcolumnName(), "xyz");
+ EXPECT_TRUE(columns_desc.subcolumns_to_read[1].getTypeInStorage()->equals(*getType("json")));
+ EXPECT_TRUE(columns_desc.subcolumns_to_read[1].type->equals(*getType("string")));
+ EXPECT_EQ(columns_desc.subcolumns_to_read[1].name, "col5.xyz");
+ EXPECT_TRUE(columns_desc.subcolumns_to_read[2].isSubcolumn());
+ EXPECT_EQ(columns_desc.subcolumns_to_read[2].getNameInStorage(), "col5");
+ EXPECT_EQ(columns_desc.subcolumns_to_read[2].getSubcolumnName(), "abc");
+ EXPECT_TRUE(columns_desc.subcolumns_to_read[2].getTypeInStorage()->equals(*getType("json")));
+ EXPECT_TRUE(columns_desc.subcolumns_to_read[2].type->equals(*getType("int")));
+ EXPECT_EQ(columns_desc.subcolumns_to_read[2].name, "col5.abc");
+
+ /// Json description
+ ASSERT_EQ(columns_desc.physical_object_columns_to_read.size(), 2);
+ EXPECT_EQ(columns_desc.physical_object_columns_to_read.begin()->name, "col5");
+ EXPECT_EQ(columns_desc.physical_object_columns_to_read.rbegin()->name, "col4");
+ }
+}
diff --git a/tests/stream/test_stream_smoke/0013_changelog_stream4.yaml b/tests/stream/test_stream_smoke/0013_changelog_stream4.yaml
index e4028e9cb9e..1e21757902a 100644
--- a/tests/stream/test_stream_smoke/0013_changelog_stream4.yaml
+++ b/tests/stream/test_stream_smoke/0013_changelog_stream4.yaml
@@ -46,12 +46,10 @@ tests:
- statements:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view_4;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_4;
- client: python
@@ -118,12 +116,10 @@ tests:
- statements:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view_4;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_4;
- client: python
@@ -190,17 +186,14 @@ tests:
- statements:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view1_4;
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view2_4;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_4;
- client: python
@@ -255,17 +248,14 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view2_4;
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view1_4;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_4;
expected_results:
@@ -289,7 +279,6 @@ tests:
- statements:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_4
- client: python
@@ -332,7 +321,6 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_4;
expected_results:
@@ -356,17 +344,14 @@ tests:
- statements:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view1_4;
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view2_4;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_4;
- client: python
@@ -421,17 +406,14 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view2_4;
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view1_4;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_4;
expected_results:
@@ -474,12 +456,10 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view_4;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_4;
- client: python
@@ -527,12 +507,10 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view_4;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_4;
expected_results:
@@ -575,12 +553,10 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view_4;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_4;
- client: python
@@ -628,12 +604,10 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view_4;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_4;
expected_results:
@@ -676,7 +650,6 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_4;
- client: python
@@ -722,7 +695,6 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_4;
expected_results:
@@ -765,17 +737,14 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view2_4;
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view1_4;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_4;
- client: python
@@ -830,17 +799,14 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view2_4;
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view1_4;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_4;
expected_results:
@@ -924,12 +890,10 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view_4;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_4;
- client: python
@@ -987,7 +951,6 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_4;
expected_results:
@@ -1073,12 +1036,10 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view_4;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_4;
- client: python
@@ -1136,7 +1097,6 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_4;
expected_results:
@@ -1223,17 +1183,14 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view2_4;
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view1_4;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_4;
- client: python
@@ -1298,12 +1255,10 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view1_4;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_4;
expected_results:
diff --git a/tests/stream/test_stream_smoke/0013_changelog_stream5.yaml b/tests/stream/test_stream_smoke/0013_changelog_stream5.yaml
index 0975aef35e2..8b8b8ee83f9 100644
--- a/tests/stream/test_stream_smoke/0013_changelog_stream5.yaml
+++ b/tests/stream/test_stream_smoke/0013_changelog_stream5.yaml
@@ -106,7 +106,6 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_5;
- client: python
@@ -230,17 +229,14 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view2_5;
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view1_5;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_5;
- client: python
@@ -305,12 +301,10 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view1_5;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_5;
expected_results:
@@ -336,12 +330,10 @@ tests:
- statements:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view_5;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_5;
- client: python
@@ -404,7 +396,6 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_5;
expected_results:
@@ -428,12 +419,10 @@ tests:
- statements:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view_5;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_5;
- client: python
@@ -496,7 +485,6 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_5;
expected_results:
@@ -527,17 +515,14 @@ tests:
- statements:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view2_5;
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view1_5;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_5;
- client: python
@@ -607,12 +592,10 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view1_5;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_5;
expected_results:
@@ -643,7 +626,6 @@ tests:
- statements:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_5;
- client: python
@@ -737,17 +719,14 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view2_5;
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view1_5;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_5;
- client: python
@@ -807,12 +786,10 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view1_5;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_5;
expected_results:
@@ -835,12 +812,10 @@ tests:
- statements:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view_5;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_5;
- client: python
@@ -898,7 +873,6 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_5;
expected_results:
@@ -923,12 +897,10 @@ tests:
- statements:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view_5;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_5;
- client: python
@@ -986,7 +958,6 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_5;
expected_results:
@@ -1011,17 +982,14 @@ tests:
- statements:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view2_5;
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view1_5;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_5;
- client: python
@@ -1083,12 +1051,10 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view1_5;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_5;
expected_results:
@@ -1113,7 +1079,6 @@ tests:
- statements:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_5;
- client: python
@@ -1186,17 +1151,14 @@ tests:
- statements:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view2_5;
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view1_5;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_5;
- client: python
@@ -1261,12 +1223,10 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view1_5;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_5;
expected_results:
diff --git a/tests/stream/test_stream_smoke/0013_changelog_stream6.yaml b/tests/stream/test_stream_smoke/0013_changelog_stream6.yaml
index 1f37e04d993..0467e7daf59 100644
--- a/tests/stream/test_stream_smoke/0013_changelog_stream6.yaml
+++ b/tests/stream/test_stream_smoke/0013_changelog_stream6.yaml
@@ -47,12 +47,10 @@ tests:
- statements:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view_6;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_6;
- client: python
@@ -110,7 +108,6 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_6;
- id: 25
@@ -125,7 +122,6 @@ tests:
- statements:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_substream_6;
- client: python
@@ -237,7 +233,6 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_substream_6;
- client: python
@@ -328,7 +323,6 @@ tests:
- statements:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_substream_6;
- client: python
@@ -420,7 +414,6 @@ tests:
- statements:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_substream_6;
- client: python
@@ -567,7 +560,6 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_substream_6;
- client: python
@@ -641,7 +633,6 @@ tests:
- statements:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_substream_6;
- client: python
@@ -731,7 +722,6 @@ tests:
- statements:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_substream_6;
- client: python
@@ -823,7 +813,6 @@ tests:
- statements:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_substream_6;
- client: python
@@ -898,7 +887,6 @@ tests:
- statements:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_substream_6;
- client: python
@@ -974,7 +962,6 @@ tests:
- statements:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_substream_6;
- client: python
@@ -1061,7 +1048,6 @@ tests:
- statements:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_substream_6;
- client: python
diff --git a/tests/stream/test_stream_smoke/0013_changelog_stream7.yaml b/tests/stream/test_stream_smoke/0013_changelog_stream7.yaml
index 81eddcebc88..2e3a3bc829b 100644
--- a/tests/stream/test_stream_smoke/0013_changelog_stream7.yaml
+++ b/tests/stream/test_stream_smoke/0013_changelog_stream7.yaml
@@ -107,7 +107,6 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_substream_7;
- client: python
@@ -181,7 +180,6 @@ tests:
- statements:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_substream_7;
- client: python
@@ -332,7 +330,6 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_substream_7;
- client: python
@@ -409,7 +406,6 @@ tests:
- statements:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_stream;
- client: python
@@ -495,7 +491,6 @@ tests:
- statements:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_stream;
- client: python
@@ -601,12 +596,10 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view_7;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_7;
- client: python
@@ -675,7 +668,6 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_7;
expected_results:
@@ -720,12 +712,10 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view_7;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_7;
- client: python
@@ -798,7 +788,6 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_7;
expected_results:
@@ -843,17 +832,14 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view2_7;
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view1_7;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_7;
- client: python
@@ -929,12 +915,10 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view1_7;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_7;
expected_results:
@@ -979,7 +963,6 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_7;
- client: python
@@ -1085,22 +1068,18 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view2_7;
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view1_7;
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view_7;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_7;
- client: python
@@ -1176,12 +1155,10 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view1_7;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_7;
expected_results:
@@ -1207,12 +1184,10 @@ tests:
- statements:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view_7;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_7;
- client: python
@@ -1256,7 +1231,6 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_7;
expected_results:
@@ -1283,17 +1257,14 @@ tests:
- statements:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view_7;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_7;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_target_changelog_kv_7;
- client: python
@@ -1303,7 +1274,6 @@ tests:
- client: python
query_type: table
- wait: 0
query: create stream if not exists test14_target_changelog_kv_7(i float, k1 int, k2 string) primary key k2 settings mode='changelog_kv';
- client: python
@@ -1346,12 +1316,10 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_7;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_target_changelog_kv_7;
expected_results:
diff --git a/tests/stream/test_stream_smoke/0013_changelog_stream8.yaml b/tests/stream/test_stream_smoke/0013_changelog_stream8.yaml
index 80683c514e8..1d2f03b7050 100644
--- a/tests/stream/test_stream_smoke/0013_changelog_stream8.yaml
+++ b/tests/stream/test_stream_smoke/0013_changelog_stream8.yaml
@@ -28,17 +28,14 @@ tests:
- statements:
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view2_8;
- client: python
query_type: table
- wait: 0
query: drop view if exists test14_view1_8;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_8;
- client: python
@@ -321,12 +318,10 @@ tests:
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_subquery_8;
- client: python
query_type: table
- wait: 0
query: drop stream if exists test14_target_changelog_kv_8;
expected_results:
@@ -360,8 +355,8 @@ tests:
type: javascript
name: test_14_8_add_five
arguments:
- - name: value
- - type: float32
+ - name: value
+ type: float32
return_type: float32
source: |
function test_14_8_add_five(value) {
@@ -524,7 +519,6 @@ tests:
- statements:
- client: python
query_type: table
- wait: 2
query: drop stream if exists test14_substream_8;
- client: python
@@ -535,6 +529,7 @@ tests:
- client: python
query_id: '1453'
depends_on_stream: test14_substream_8
+ wait: 1
query_type: stream
query: select max(val), min(val), avg(val), id, name from test14_substream_8 partition by id group by name emit periodic 1s;
@@ -618,8 +613,8 @@ tests:
type: javascript
name: test_14_8_add_five
arguments:
- - name: value
- - type: float32
+ - name: value
+ type: float32
return_type: float32
source: |
function test_14_8_add_five(value) {
diff --git a/tests/stream/test_stream_smoke/0015_single_stream_changelog_aggr.json b/tests/stream/test_stream_smoke/0015_single_stream_changelog_aggr.json
index 2e905aa075f..bf73291f91c 100644
--- a/tests/stream/test_stream_smoke/0015_single_stream_changelog_aggr.json
+++ b/tests/stream/test_stream_smoke/0015_single_stream_changelog_aggr.json
@@ -47,10 +47,10 @@
"steps":[
{
"statements": [
- {"client":"python","query_id":"1601", "depends_on_stream":"changelog_kv_stream_aggr","query_end_timer":6, "query_type": "stream", "query":"select count(), avg(i), sum(i), min(i), max(i), min(s), max(s) from changelog_kv_stream_aggr emit periodic 1s"},
+ {"client":"python","query_id":"1601", "depends_on_stream":"changelog_kv_stream_aggr", "query_type": "stream", "query":"select count(), avg(i), sum(i), min(i), max(i), min(s), max(s) from changelog_kv_stream_aggr emit periodic 1s"},
{"client":"python", "query_type": "table","depends_on":1601,"depends_on_stream":"changelog_kv_stream_aggr", "query": "insert into changelog_kv_stream_aggr(i, s) values (1, 's1'), (2, 's2'), (3, 's3')"},
{"client":"python", "query_type": "table","depends_on":1601,"depends_on_stream":"changelog_kv_stream_aggr", "wait":1, "query": "insert into changelog_kv_stream_aggr(i, s, _tp_delta) values (1, 's1', -1)"},
- {"client":"python", "query_type": "table","depends_on":1601,"depends_on_stream":"changelog_kv_stream_aggr", "wait":1, "query": "insert into changelog_kv_stream_aggr(i, s, _tp_delta) values (3, 's3', -1)"}
+ {"client":"python", "query_type": "table","depends_on":1601,"depends_on_stream":"changelog_kv_stream_aggr", "wait":1, "kill":1601, "kill_wait":3, "query": "insert into changelog_kv_stream_aggr(i, s, _tp_delta) values (3, 's3', -1)"}
]
}
],
diff --git a/tests/stream/test_stream_smoke/0099_fixed_issues.json b/tests/stream/test_stream_smoke/0099_fixed_issues.json
index 0d637eeb66c..6525cae41f8 100644
--- a/tests/stream/test_stream_smoke/0099_fixed_issues.json
+++ b/tests/stream/test_stream_smoke/0099_fixed_issues.json
@@ -15,14 +15,14 @@
"steps":[
{
"statements": [
- {"client":"python", "query_type": "table", "wait":2, "query":"drop view if exists mv_2"},
- {"client":"python", "query_type": "table", "wait":2, "query":"drop view if exists mv_1"},
+ {"client":"python", "query_type": "table", "query":"drop view if exists mv_2"},
+ {"client":"python", "query_type": "table", "query":"drop view if exists mv_1"},
{"client":"python", "query_type": "table", "wait":2, "query":"drop view if exists mv_truck_track"},
{"client":"python", "query_type": "table", "wait":2, "query":"drop stream if exists ttp_truck_track"},
{"client":"python", "query_type": "table", "wait":2, "query_id":"9900", "query": "create stream ttp_truck_track(`lpn` string, `vno` string, `drc` string, `drcCode` int32, `wgs84Lat` float32, `wgs84Lon` float32, `gcj02Lat` float32, `gcj02Lon` float32, `province` nullable(string), `city` nullable(string), `country` nullable(string), `spd` float32, `mil` float32, `time` string, `adr` string)"},
- {"client":"python", "query_type": "table", "wait":5, "query_id":"9901","depends_on_stream":"ttp_truck_track", "query": "create materialized view mv_truck_track as (select * from ttp_truck_track where date_diff('second', _tp_time, now()) < 30)"},
- {"client":"python", "query_type": "table", "wait":5, "query_id":"9902", "depends_on_stream":"mv_truck_track", "query": "create materialized view if not exists mv_1 as (select now() as time, count_distinct(lpn) as cnt from mv_truck_track emit last 10m and periodic 10s)"},
- {"client":"python", "query_type": "table", "wait":5, "query_id":"9903", "depends_on_stream":"mv_truck_track","drop_view":"mv_2,mv_1,mv_truck_track", "drop_view_wait":2, "query": "create materialized view if not exists mv_2 as (select now() as time, count_distinct(lpn) as cnt from mv_truck_track emit last 10m and periodic 10s)"}
+ {"client":"python", "query_type": "table", "wait":2, "query_id":"9901","depends_on_stream":"ttp_truck_track", "query": "create materialized view mv_truck_track as (select * from ttp_truck_track where date_diff('second', _tp_time, now()) < 30)"},
+ {"client":"python", "query_type": "table", "wait":2, "query_id":"9902", "depends_on_stream":"mv_truck_track", "query": "create materialized view if not exists mv_1 as (select now() as time, count_distinct(lpn) as cnt from mv_truck_track emit last 10m and periodic 10s)"},
+ {"client":"python", "query_type": "table", "wait":2, "query_id":"9903", "depends_on_stream":"mv_truck_track","drop_view":"mv_2,mv_1,mv_truck_track", "drop_view_wait":2, "query": "create materialized view if not exists mv_2 as (select now() as time, count_distinct(lpn) as cnt from mv_truck_track emit last 10m and periodic 10s)"}
]
}
],
@@ -142,9 +142,9 @@
"steps":[
{
"statements": [
- {"client":"python", "query_type": "table", "wait":2, "query": "drop view if exists test_mv_1934"},
+ {"client":"python", "query_type": "table", "query": "drop view if exists test_mv_1934"},
{"client":"python", "query_type": "table", "wait":2, "query": "drop stream if exists test_stream_1934"},
- {"client":"python", "query_type": "table", "wait":2, "query": "create stream test_stream_1934(value int)"},
+ {"client":"python", "query_type": "table", "exist":"test_stream_1934", "exist_wait":2, "wait":1, "query": "create stream test_stream_1934(value int)"},
{"client":"python", "query_type": "table", "depends_on_stream":"test_stream_1934", "query": "create materialized view test_mv_1934 as select count() from test_stream_1934"},
{"client":"python", "query_type": "table", "depends_on_stream":"test_mv_1934", "wait":2, "query": "insert into test_stream_1934(value) values(1)"},
{"client":"python", "query_type": "table", "wait":3, "query": "insert into test_stream_1934(value) values(2)"},
From 92008436a22f6c4101d77c050b23f03206a410fb Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Lisen=20=E6=9D=A8?=
Date: Thu, 23 Nov 2023 17:05:14 +0800
Subject: [PATCH 2/2] support fan out for backfill read and fix more
---
src/Storages/Streaming/StorageStream.cpp | 25 ++++++++++++++-----
.../StreamingStoreSourceMultiplexer.cpp | 25 +++++++++++++------
.../StreamingStoreSourceMultiplexer.h | 4 +++
3 files changed, 41 insertions(+), 13 deletions(-)
diff --git a/src/Storages/Streaming/StorageStream.cpp b/src/Storages/Streaming/StorageStream.cpp
index 0926f275fcb..532cc085b8f 100644
--- a/src/Storages/Streaming/StorageStream.cpp
+++ b/src/Storages/Streaming/StorageStream.cpp
@@ -486,15 +486,19 @@ void StorageStream::readConcat(
for (auto & stream_shard : shards_to_read)
{
auto create_streaming_source = [this, header, storage_snapshot, stream_shard, seek_to_info = query_info.seek_to_info, context_](
- Int64 & max_sn_in_parts) {
+ Int64 & max_sn_in_parts) -> SourcePtr {
if (max_sn_in_parts < 0)
{
/// Fallback to seek streaming store
auto offsets = stream_shard->getOffsets(seek_to_info);
LOG_INFO(log, "Fused read fallbacks to seek stream for shard={} since there are no historical data", stream_shard->shard);
- return std::make_shared(
- stream_shard, header, storage_snapshot, context_, offsets[stream_shard->shard], log);
+ if (context_->getSettingsRef().query_resource_group.value == "shared")
+ return source_multiplexers->createChannel(
+ stream_shard, header.getNames(), storage_snapshot, context_, offsets[stream_shard->shard]);
+ else
+ return std::make_shared(
+ stream_shard, header, storage_snapshot, context_, offsets[stream_shard->shard], log);
}
auto committed = stream_shard->storage->inMemoryCommittedSN();
@@ -526,7 +530,12 @@ void StorageStream::readConcat(
max_sn_in_parts,
committed);
- return std::make_shared(stream_shard, header, storage_snapshot, context_, max_sn_in_parts + 1, log);
+ if (context_->getSettingsRef().query_resource_group.value == "shared")
+ return source_multiplexers->createChannel(
+ stream_shard, header.getNames(), storage_snapshot, context_, max_sn_in_parts + 1);
+ else
+ return std::make_shared(
+ stream_shard, header, storage_snapshot, context_, max_sn_in_parts + 1, log);
}
else
{
@@ -542,8 +551,12 @@ void StorageStream::readConcat(
/// We need reset max_sn_in_parts to tell caller that we are seeking streaming store directly
max_sn_in_parts = -1;
- return std::make_shared(
- stream_shard, header, storage_snapshot, context_, offsets[stream_shard->shard], log);
+ if (context_->getSettingsRef().query_resource_group.value == "shared")
+ return source_multiplexers->createChannel(
+ stream_shard, header.getNames(), storage_snapshot, context_, offsets[stream_shard->shard]);
+ else
+ return std::make_shared(
+ stream_shard, header, storage_snapshot, context_, offsets[stream_shard->shard], log);
}
};
diff --git a/src/Storages/Streaming/StreamingStoreSourceMultiplexer.cpp b/src/Storages/Streaming/StreamingStoreSourceMultiplexer.cpp
index 88d6b151a57..f15399a6c13 100644
--- a/src/Storages/Streaming/StreamingStoreSourceMultiplexer.cpp
+++ b/src/Storages/Streaming/StreamingStoreSourceMultiplexer.cpp
@@ -280,6 +280,8 @@ std::pair StreamingStoreSourceMultiplexer::getStreamShard() const
return stream_shard->getStreamShard();
}
+std::atomic StreamingStoreSourceMultiplexers::multiplexer_id = 0;
+
StreamingStoreSourceMultiplexers::StreamingStoreSourceMultiplexers(ContextPtr global_context_, Poco::Logger * log_)
: global_context(std::move(global_context_)), log(log_)
{
@@ -310,7 +312,8 @@ StreamingStoreSourceChannelPtr StreamingStoreSourceMultiplexers::createChannel(
{
multiplexers.emplace(
shard,
- StreamingStoreSourceMultiplexerPtrs{std::make_shared(0, stream_shard, global_context, log)});
+ StreamingStoreSourceMultiplexerPtrs{
+ std::make_shared(getMultiplexerID(), stream_shard, global_context, log)});
iter = multiplexers.find(shard);
}
@@ -342,7 +345,7 @@ StreamingStoreSourceChannelPtr StreamingStoreSourceMultiplexers::createChannel(
/// If min channels is greater than > 20(default value), create another multiplexer for this shard
if (min_channels > global_context->getSettingsRef().max_channels_per_resource_group.value)
{
- best_multiplexer = std::make_shared(iter->second.size(), stream_shard, global_context, log);
+ best_multiplexer = std::make_shared(getMultiplexerID(), stream_shard, global_context, log);
iter->second.push_back(best_multiplexer);
}
@@ -351,7 +354,7 @@ StreamingStoreSourceChannelPtr StreamingStoreSourceMultiplexers::createChannel(
else
{
/// All multiplexers are shutdown
- auto multiplexer{std::make_shared(iter->second.size(), stream_shard, global_context, log)};
+ auto multiplexer{std::make_shared(getMultiplexerID(), stream_shard, global_context, log)};
iter->second.push_back(multiplexer);
return multiplexer->createChannel(column_names, storage_snapshot, query_context);
}
@@ -366,7 +369,7 @@ StreamingStoreSourceChannelPtr StreamingStoreSourceMultiplexers::createIndepende
/// will startup after `StreamingStoreSourceChannel::recover()` and reset recovered sn
/// The `multiplexer` is cached in created StreamingStoreSourceChannel, we can release this one
auto multiplexer = std::make_shared(
- 0, std::move(stream_shard), global_context, log, [this](auto multiplexer_) { attachToSharedGroup(multiplexer_); });
+ getMultiplexerID(), std::move(stream_shard), global_context, log, [this](auto multiplexer_) { attachToSharedGroup(multiplexer_); });
return multiplexer->createChannel(column_names, storage_snapshot, query_context);
}
@@ -379,7 +382,7 @@ StreamingStoreSourceChannelPtr StreamingStoreSourceMultiplexers::createIndepende
{
/// The `multiplexer` is cached in created StreamingStoreSourceChannel, we can release this one
auto multiplexer = std::make_shared(
- 0, std::move(stream_shard), global_context, log, [this](auto multiplexer_) { attachToSharedGroup(multiplexer_); });
+ getMultiplexerID(), std::move(stream_shard), global_context, log, [this](auto multiplexer_) { attachToSharedGroup(multiplexer_); });
auto channel = multiplexer->createChannel(column_names, storage_snapshot, query_context);
multiplexer->resetSequenceNumber(start_sn);
multiplexer->startup();
@@ -394,8 +397,17 @@ void StreamingStoreSourceMultiplexers::attachToSharedGroup(StreamingStoreSourceM
detached_multiplexers.clear();
auto & multiplexer_list = multiplexers[multiplexer->stream_shard->getShard()];
- for (auto & shared_multiplexer : multiplexer_list)
+ for (auto it = multiplexer_list.begin(); it != multiplexer_list.end();)
{
+ if ((*it)->isShutdown())
+ {
+ it = multiplexer_list.erase(it);
+ continue;
+ }
+
+ auto & shared_multiplexer = *it;
+ ++it;
+
/// Skip multiplexer that already have too many channels
if (shared_multiplexer->totalChannels() > global_context->getSettingsRef().max_channels_per_resource_group.value)
continue;
@@ -410,7 +422,6 @@ void StreamingStoreSourceMultiplexers::attachToSharedGroup(StreamingStoreSourceM
}
/// Not detach channels into any existed shared multiplexer, so we reuse it and join in shared groups
- multiplexer->id = multiplexer_list.size(); /// it's thread safe
multiplexer_list.emplace_back(std::move(multiplexer));
}
}
diff --git a/src/Storages/Streaming/StreamingStoreSourceMultiplexer.h b/src/Storages/Streaming/StreamingStoreSourceMultiplexer.h
index 63ec4a75e93..21b41f7c34d 100644
--- a/src/Storages/Streaming/StreamingStoreSourceMultiplexer.h
+++ b/src/Storages/Streaming/StreamingStoreSourceMultiplexer.h
@@ -122,10 +122,14 @@ class StreamingStoreSourceMultiplexers final
void attachToSharedGroup(StreamingStoreSourceMultiplexerPtr multiplexer);
+ uint32_t getMultiplexerID() { return multiplexer_id.fetch_add(1); }
+
private:
ContextPtr global_context;
Poco::Logger * log;
+ static std::atomic multiplexer_id;
+
std::mutex multiplexers_mutex;
std::unordered_map multiplexers;
StreamingStoreSourceMultiplexerPtrs detached_multiplexers;