Skip to content

Commit 8816d02

Browse files
authored
Merge pull request #222 from den818/SERVER_LOGS
Receiving server logs
2 parents 07ab4e8 + 69c039d commit 8816d02

File tree

3 files changed

+56
-0
lines changed

3 files changed

+56
-0
lines changed

clickhouse/client.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,24 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) {
443443
return false;
444444
}
445445

446+
case ServerCodes::Log: {
447+
// log tag
448+
if (!WireFormat::SkipString(*input_)) {
449+
return false;
450+
}
451+
Block block;
452+
453+
// Use uncompressed stream since log blocks usually contain only one row
454+
if (!ReadBlock(*input_, &block)) {
455+
return false;
456+
}
457+
458+
if (events_) {
459+
events_->OnServerLog(block);
460+
}
461+
return true;
462+
}
463+
446464
case ServerCodes::TableColumns: {
447465
// external table name
448466
if (!WireFormat::SkipString(*input_)) {

clickhouse/query.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ class QueryEvents {
5757

5858
virtual void OnProgress(const Progress& progress) = 0;
5959

60+
/** Handle query execution logs provided by server.
61+
* Amount of logs regulated by `send_logs_level` setting.
62+
* By-default only `fatal` log events are sent to the client side.
63+
*/
64+
virtual void OnServerLog(const Block& block) = 0;
65+
6066
virtual void OnFinish() = 0;
6167
};
6268

@@ -65,6 +71,7 @@ using ExceptionCallback = std::function<void(const Exception& e)>;
6571
using ProgressCallback = std::function<void(const Progress& progress)>;
6672
using SelectCallback = std::function<void(const Block& block)>;
6773
using SelectCancelableCallback = std::function<bool(const Block& block)>;
74+
using SelectServerLogCallback = std::function<bool(const Block& block)>;
6875

6976

7077
class Query : public QueryEvents {
@@ -122,6 +129,12 @@ class Query : public QueryEvents {
122129
return *this;
123130
}
124131

132+
/// Set handler for receiving a server log of query exceution.
133+
inline Query& OnServerLog(SelectServerLogCallback cb) {
134+
select_server_log_cb_ = std::move(cb);
135+
return *this;
136+
}
137+
125138
static const std::string default_query_id;
126139

127140
private:
@@ -155,6 +168,12 @@ class Query : public QueryEvents {
155168
}
156169
}
157170

171+
void OnServerLog(const Block& block) override {
172+
if (select_server_log_cb_) {
173+
select_server_log_cb_(block);
174+
}
175+
}
176+
158177
void OnFinish() override {
159178
}
160179

@@ -166,6 +185,7 @@ class Query : public QueryEvents {
166185
ProgressCallback progress_cb_;
167186
SelectCallback select_cb_;
168187
SelectCancelableCallback select_cancelable_cb_;
188+
SelectServerLogCallback select_server_log_cb_;
169189
};
170190

171191
}

ut/client_ut.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1097,6 +1097,24 @@ TEST_P(ClientCase, QuerySettings) {
10971097
EXPECT_THROW(client_->Execute(query), ServerException);
10981098
}
10991099

1100+
TEST_P(ClientCase, ServerLogs) {
1101+
1102+
Block block;
1103+
createTableWithOneColumn<ColumnString>(block);
1104+
1105+
size_t received_row_count = 0;
1106+
Query query("INSERT INTO " + table_name + " (*) VALUES (\'Foo\'), (\'Bar\')" );
1107+
query.SetSetting("send_logs_level", {"trace"});
1108+
query.OnServerLog([&](const Block& block) {
1109+
received_row_count += block.GetRowCount();
1110+
return true;
1111+
});
1112+
client_->Execute(query);
1113+
1114+
EXPECT_GT(received_row_count, 0U);
1115+
}
1116+
1117+
11001118
const auto LocalHostEndpoint = ClientOptions()
11011119
.SetHost( getEnvOrDefault("CLICKHOUSE_HOST", "localhost"))
11021120
.SetPort( getEnvOrDefault<size_t>("CLICKHOUSE_PORT", "9000"))

0 commit comments

Comments
 (0)