Skip to content

Commit 07ab4e8

Browse files
authored
Merge pull request #231 from den818/SETTINGS
Implementation per query settings
2 parents 52bdf94 + 83701e0 commit 07ab4e8

File tree

3 files changed

+107
-33
lines changed

3 files changed

+107
-33
lines changed

clickhouse/client.cpp

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,9 @@
3636
#define DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE 54405
3737
#define DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA 54410
3838
#define DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO 54420
39+
#define DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS 54429
3940

40-
#define REVISION DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO
41+
#define REVISION DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS
4142

4243
namespace clickhouse {
4344

@@ -131,7 +132,7 @@ class Client::Impl {
131132

132133
bool ReceivePacket(uint64_t* server_packet = nullptr);
133134

134-
void SendQuery(const std::string& query, const std::string& query_id);
135+
void SendQuery(const Query& query);
135136

136137
void SendData(const Block& block);
137138

@@ -230,7 +231,7 @@ void Client::Impl::ExecuteQuery(Query query) {
230231
RetryGuard([this]() { Ping(); });
231232
}
232233

233-
SendQuery(query.GetText(), query.GetQueryID());
234+
SendQuery(query);
234235

235236
while (ReceivePacket()) {
236237
;
@@ -272,7 +273,8 @@ void Client::Impl::Insert(const std::string& table_name, const std::string& quer
272273
}
273274
}
274275

275-
SendQuery("INSERT INTO " + table_name + " ( " + fields_section.str() + " ) VALUES", query_id);
276+
Query query("INSERT INTO " + table_name + " ( " + fields_section.str() + " ) VALUES", query_id);
277+
SendQuery(query);
276278

277279
uint64_t server_packet;
278280
// Receive data packet.
@@ -608,9 +610,9 @@ void Client::Impl::SendCancel() {
608610
output_->Flush();
609611
}
610612

611-
void Client::Impl::SendQuery(const std::string& query, const std::string& query_id) {
613+
void Client::Impl::SendQuery(const Query& query) {
612614
WireFormat::WriteUInt64(*output_, ClientCodes::Query);
613-
WireFormat::WriteString(*output_, query_id);
615+
WireFormat::WriteString(*output_, query.GetQueryID());
614616

615617
/// Client info.
616618
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_CLIENT_INFO) {
@@ -643,15 +645,24 @@ void Client::Impl::SendQuery(const std::string& query, const std::string& query_
643645
}
644646
}
645647

646-
/// Per query settings.
647-
//if (settings)
648-
// settings->serialize(*out);
649-
//else
648+
/// Per query settings
649+
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS) {
650+
for(const auto& [name, field] : query.GetQuerySettings()) {
651+
WireFormat::WriteString(*output_, name);
652+
WireFormat::WriteVarint64(*output_, field.flags);
653+
WireFormat::WriteString(*output_, field.value);
654+
}
655+
}
656+
else if (query.GetQuerySettings().size() > 0) {
657+
// Current implementation works only for server version >= v20.1.2.4-stable, since we do not implement binary settings serialization.
658+
throw UnimplementedError(std::string("Can't send query settings to a server, server version is too old"));
659+
}
660+
// Empty string signals end of serialized settings
650661
WireFormat::WriteString(*output_, std::string());
651662

652663
WireFormat::WriteUInt64(*output_, Stages::Complete);
653664
WireFormat::WriteUInt64(*output_, compression_);
654-
WireFormat::WriteString(*output_, query);
665+
WireFormat::WriteString(*output_, query.GetText());
655666
// Send empty block as marker of
656667
// end of data
657668
SendData(Block());

clickhouse/query.h

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,32 +7,21 @@
77
#include <functional>
88
#include <memory>
99
#include <string>
10+
#include <unordered_map>
1011

1112
namespace clickhouse {
1213

13-
/**
14-
* Settings of individual query.
15-
*/
16-
struct QuerySettings {
17-
/// Maximum thread to use on the server-side to process a query. Default - let the server choose.
18-
int max_threads = 0;
19-
/// Compute min and max values of the result.
20-
bool extremes = false;
21-
/// Silently skip unavailable shards.
22-
bool skip_unavailable_shards = false;
23-
/// Write statistics about read rows, bytes, time elapsed, etc.
24-
bool output_format_write_statistics = true;
25-
/// Use client timezone for interpreting DateTime string values, instead of adopting server timezone.
26-
bool use_client_time_zone = false;
27-
28-
// connect_timeout
29-
// max_block_size
30-
// distributed_group_by_no_merge = false
31-
// strict_insert_defaults = 0
32-
// network_compression_method = LZ4
33-
// priority = 0
14+
struct QuerySettingsField {
15+
enum Flags : uint64_t
16+
{
17+
IMPORTANT = 0x01,
18+
CUSTOM = 0x02,
19+
};
20+
std::string value;
21+
uint64_t flags{0};
3422
};
3523

24+
using QuerySettings = std::unordered_map<std::string, QuerySettingsField>;
3625

3726
struct Profile {
3827
uint64_t rows = 0;
@@ -94,6 +83,22 @@ class Query : public QueryEvents {
9483
return query_id_;
9584
}
9685

86+
inline const QuerySettings& GetQuerySettings() const {
87+
return query_settings_;
88+
}
89+
90+
/// Set per query settings
91+
inline Query& SetQuerySettings(QuerySettings query_settings) {
92+
query_settings_ = std::move(query_settings);
93+
return *this;
94+
}
95+
96+
/// Set per query setting
97+
inline Query& SetSetting(const std::string& key, const QuerySettingsField& value) {
98+
query_settings_[key] = value;
99+
return *this;
100+
}
101+
97102
/// Set handler for receiving result data.
98103
inline Query& OnData(SelectCallback cb) {
99104
select_cb_ = std::move(cb);
@@ -111,7 +116,6 @@ class Query : public QueryEvents {
111116
return *this;
112117
}
113118

114-
115119
/// Set handler for receiving a progress of query exceution.
116120
inline Query& OnProgress(ProgressCallback cb) {
117121
progress_cb_ = std::move(cb);
@@ -157,6 +161,7 @@ class Query : public QueryEvents {
157161
private:
158162
const std::string query_;
159163
const std::string query_id_;
164+
QuerySettings query_settings_;
160165
ExceptionCallback exception_cb_;
161166
ProgressCallback progress_cb_;
162167
SelectCallback select_cb_;

ut/client_ut.cpp

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1039,6 +1039,64 @@ TEST_P(ClientCase, OnProgress) {
10391039
EXPECT_LE(received_progress->written_bytes, 10000u);
10401040
}
10411041

1042+
TEST_P(ClientCase, QuerySettings) {
1043+
client_->Execute("DROP TEMPORARY TABLE IF EXISTS test_clickhouse_query_settings_table_1;");
1044+
client_->Execute("CREATE TEMPORARY TABLE IF NOT EXISTS test_clickhouse_query_settings_table_1 ( id Int64 )");
1045+
1046+
client_->Execute("DROP TEMPORARY TABLE IF EXISTS test_clickhouse_query_settings_table_2;");
1047+
client_->Execute("CREATE TEMPORARY TABLE IF NOT EXISTS test_clickhouse_query_settings_table_2 ( id Int64, value Int64 )");
1048+
1049+
client_->Execute("INSERT INTO test_clickhouse_query_settings_table_1 (*) VALUES (1)");
1050+
1051+
Query query("SELECT value "
1052+
"FROM test_clickhouse_query_settings_table_1 "
1053+
"LEFT OUTER JOIN test_clickhouse_query_settings_table_2 "
1054+
"ON test_clickhouse_query_settings_table_1.id = test_clickhouse_query_settings_table_2.id");
1055+
1056+
1057+
bool checked = false;
1058+
1059+
query.SetSetting("join_use_nulls", {"1"});
1060+
1061+
query.OnData(
1062+
[&](const Block& block) {
1063+
if (block.GetRowCount() == 0)
1064+
return;
1065+
ASSERT_EQ(1U, block.GetColumnCount());
1066+
ASSERT_EQ(1U, block.GetRowCount());
1067+
ASSERT_TRUE(block[0]->GetType().IsEqual(Type::CreateNullable(Type::CreateSimple<int64_t>())));
1068+
auto cl = block[0]->As<ColumnNullable>();
1069+
EXPECT_TRUE(cl->IsNull(0));
1070+
checked = true;
1071+
});
1072+
client_->Execute(query);
1073+
1074+
EXPECT_TRUE(checked);
1075+
1076+
query.SetSetting("join_use_nulls", {"0"});
1077+
1078+
query.OnData(
1079+
[&](const Block& block) {
1080+
if (block.GetRowCount() == 0)
1081+
return;
1082+
ASSERT_EQ(1U, block.GetColumnCount());
1083+
ASSERT_EQ(1U, block.GetRowCount());
1084+
ASSERT_TRUE(block[0]->GetType().IsEqual(Type::CreateSimple<int64_t>()));
1085+
auto cl = block[0]->As<ColumnInt64>();
1086+
EXPECT_EQ(cl->At(0), 0);
1087+
checked = true;
1088+
}
1089+
);
1090+
checked = false;
1091+
client_->Execute(query);
1092+
1093+
EXPECT_TRUE(checked);
1094+
1095+
query.SetSetting("wrong_setting_name", {"0", QuerySettingsField::IMPORTANT});
1096+
1097+
EXPECT_THROW(client_->Execute(query), ServerException);
1098+
}
1099+
10421100
const auto LocalHostEndpoint = ClientOptions()
10431101
.SetHost( getEnvOrDefault("CLICKHOUSE_HOST", "localhost"))
10441102
.SetPort( getEnvOrDefault<size_t>("CLICKHOUSE_PORT", "9000"))

0 commit comments

Comments
 (0)