Skip to content

Commit b9abe43

Browse files
committed
support open telemetry
1 parent 25d7cd1 commit b9abe43

File tree

15 files changed

+182
-38
lines changed

15 files changed

+182
-38
lines changed

clickhouse/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,14 @@ INSTALL(FILES query.h DESTINATION include/clickhouse/)
9292
INSTALL(FILES base/buffer.h DESTINATION include/clickhouse/base/)
9393
INSTALL(FILES base/compressed.h DESTINATION include/clickhouse/base/)
9494
INSTALL(FILES base/input.h DESTINATION include/clickhouse/base/)
95+
INSTALL(FILES base/open_telemetry.h DESTINATION include/clickhouse/base/)
9596
INSTALL(FILES base/output.h DESTINATION include/clickhouse/base/)
9697
INSTALL(FILES base/platform.h DESTINATION include/clickhouse/base/)
9798
INSTALL(FILES base/singleton.h DESTINATION include/clickhouse/base/)
9899
INSTALL(FILES base/socket.h DESTINATION include/clickhouse/base/)
99100
INSTALL(FILES base/string_utils.h DESTINATION include/clickhouse/base/)
100101
INSTALL(FILES base/string_view.h DESTINATION include/clickhouse/base/)
102+
INSTALL(FILES base/uuid.h DESTINATION include/clickhouse/base/)
101103
INSTALL(FILES base/wire_format.h DESTINATION include/clickhouse/base/)
102104

103105
# columns

clickhouse/base/open_telemetry.h

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#pragma once
2+
3+
#include "uuid.h"
4+
5+
#include <string>
6+
7+
namespace clickhouse::open_telemetry {
8+
9+
/// See https://www.w3.org/TR/trace-context/ for trace_flags definition
10+
enum TraceFlags : uint8_t {
11+
TRACE_FLAG_NONE = 0,
12+
TRACE_FLAG_SAMPLED = 1,
13+
};
14+
15+
/// The runtime info we need to create new OpenTelemetry spans.
16+
struct TracingContext {
17+
UUID trace_id{};
18+
uint64_t span_id = 0;
19+
std::string tracestate;
20+
uint8_t trace_flags = TRACE_FLAG_NONE;
21+
};
22+
23+
} // namespace clickhouse::open_telemetry

clickhouse/base/uuid.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#pragma once
2+
3+
#include <cstdint>
4+
#include <utility>
5+
6+
namespace clickhouse {
7+
8+
using UInt128 = std::pair<uint64_t, uint64_t>;
9+
10+
using UUID = UInt128;
11+
12+
}

clickhouse/client.cpp

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,10 @@
3737
#define DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA 54410
3838
#define DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO 54420
3939
#define DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS 54429
40+
#define DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET 54441
41+
#define DBMS_MIN_REVISION_WITH_OPENTELEMETRY 54442
4042

41-
#define REVISION DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS
43+
#define REVISION DBMS_MIN_REVISION_WITH_OPENTELEMETRY
4244

4345
namespace clickhouse {
4446

@@ -661,6 +663,27 @@ void Client::Impl::SendQuery(const Query& query) {
661663
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH) {
662664
WireFormat::WriteUInt64(*output_, info.client_version_patch);
663665
}
666+
667+
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_OPENTELEMETRY) {
668+
if (const auto& tracing_context = query.GetTracingContext()) {
669+
// Have OpenTelemetry header.
670+
WireFormat::WriteFixed(*output_, uint8_t(1));
671+
// No point writing these numbers with variable length, because they
672+
// are random and will probably require the full length anyway.
673+
WireFormat::WriteFixed(*output_, tracing_context->trace_id);
674+
WireFormat::WriteFixed(*output_, tracing_context->span_id);
675+
WireFormat::WriteString(*output_, tracing_context->tracestate);
676+
WireFormat::WriteFixed(*output_, tracing_context->trace_flags);
677+
} else {
678+
// Don't have OpenTelemetry header.
679+
WireFormat::WriteFixed(*output_, uint8_t(0));
680+
}
681+
} else {
682+
if (query.GetTracingContext()) {
683+
// Current implementation works only for server version >= v20.11.2.1-stable
684+
throw UnimplementedError(std::string("Can't send open telemetry tracing context to a server, server version is too old"));
685+
}
686+
}
664687
}
665688

666689
/// Per query settings
@@ -678,6 +701,10 @@ void Client::Impl::SendQuery(const Query& query) {
678701
// Empty string signals end of serialized settings
679702
WireFormat::WriteString(*output_, std::string());
680703

704+
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET) {
705+
WireFormat::WriteString(*output_, "");
706+
}
707+
681708
WireFormat::WriteUInt64(*output_, Stages::Complete);
682709
WireFormat::WriteUInt64(*output_, compression_);
683710
WireFormat::WriteString(*output_, query.GetText());

clickhouse/columns/uuid.cpp

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ ColumnUUID::ColumnUUID(ColumnRef data)
2121
}
2222
}
2323

24-
void ColumnUUID::Append(const UInt128& value) {
24+
void ColumnUUID::Append(const UUID& value) {
2525
data_->Append(value.first);
2626
data_->Append(value.second);
2727
}
@@ -30,12 +30,12 @@ void ColumnUUID::Clear() {
3030
data_->Clear();
3131
}
3232

33-
const UInt128 ColumnUUID::At(size_t n) const {
34-
return UInt128(data_->At(n * 2), data_->At(n * 2 + 1));
33+
const UUID ColumnUUID::At(size_t n) const {
34+
return UUID(data_->At(n * 2), data_->At(n * 2 + 1));
3535
}
3636

37-
const UInt128 ColumnUUID::operator [] (size_t n) const {
38-
return UInt128((*data_)[n * 2], (*data_)[n * 2 + 1]);
37+
const UUID ColumnUUID::operator [] (size_t n) const {
38+
return UUID((*data_)[n * 2], (*data_)[n * 2 + 1]);
3939
}
4040

4141
void ColumnUUID::Append(ColumnRef column) {
@@ -78,4 +78,3 @@ ItemView ColumnUUID::GetItem(size_t index) const {
7878
}
7979

8080
}
81-

clickhouse/columns/uuid.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
#pragma once
22

3+
#include "../base/uuid.h"
34
#include "column.h"
45
#include "numeric.h"
56

67
namespace clickhouse {
78

8-
using UInt128 = std::pair<uint64_t, uint64_t>;
99

1010
/**
1111
* Represents a UUID column.
@@ -17,13 +17,13 @@ class ColumnUUID : public Column {
1717
explicit ColumnUUID(ColumnRef data);
1818

1919
/// Appends one element to the end of column.
20-
void Append(const UInt128& value);
20+
void Append(const UUID& value);
2121

2222
/// Returns element at given row number.
23-
const UInt128 At(size_t n) const;
23+
const UUID At(size_t n) const;
2424

2525
/// Returns element at given row number.
26-
const UInt128 operator [] (size_t n) const;
26+
const UUID operator [] (size_t n) const;
2727

2828
public:
2929
/// Appends content of given column to the end of current one.
@@ -34,7 +34,7 @@ class ColumnUUID : public Column {
3434

3535
/// Saves column data to output stream.
3636
void SaveBody(OutputStream* output) override;
37-
37+
3838
/// Clear column data .
3939
void Clear() override;
4040

clickhouse/query.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33
#include "block.h"
44
#include "server_exception.h"
55

6+
#include "base/open_telemetry.h"
7+
68
#include <cstdint>
79
#include <functional>
810
#include <memory>
11+
#include <optional>
912
#include <string>
1013
#include <unordered_map>
1114

@@ -106,6 +109,16 @@ class Query : public QueryEvents {
106109
return *this;
107110
}
108111

112+
inline const std::optional<open_telemetry::TracingContext>& GetTracingContext() const {
113+
return tracing_context_;
114+
}
115+
116+
/// Set tracing context for open telemetry signals
117+
inline Query& SetTracingContext(open_telemetry::TracingContext tracing_context) {
118+
tracing_context_ = std::move(tracing_context);
119+
return *this;
120+
}
121+
109122
/// Set handler for receiving result data.
110123
inline Query& OnData(SelectCallback cb) {
111124
select_cb_ = std::move(cb);
@@ -180,6 +193,7 @@ class Query : public QueryEvents {
180193
private:
181194
const std::string query_;
182195
const std::string query_id_;
196+
std::optional<open_telemetry::TracingContext> tracing_context_;
183197
QuerySettings query_settings_;
184198
ExceptionCallback exception_cb_;
185199
ProgressCallback progress_cb_;

clickhouse/types/type_parser.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
#include "type_parser.h"
2-
#include "../base/string_utils.h"
32

43
#include <algorithm>
54
#include <map>

ut/client_ut.cpp

Lines changed: 50 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,30 @@ class ClientCase : public testing::TestWithParam<ClientOptions> {
4343
return "SELECT " + column_name + " FROM " + table_name;
4444
}
4545

46+
void FlushLogs() {
47+
try {
48+
client_->Execute("SYSTEM FLUSH LOGS");
49+
} catch (const std::exception & e) {
50+
std::cerr << "Got error while flushing logs: " << e.what() << std::endl;
51+
const auto wait_for_flush = []() {
52+
// Insufficient privileges, the only safe way is to wait long enough for system
53+
// to flush the logs automaticaly. Usually it takes 7.5 seconds, so just in case,
54+
// wait 3 times that to ensure that all previously executed queries are in the logs now.
55+
const auto wait_duration = std::chrono::seconds(23);
56+
std::cerr << "Now we wait " << wait_duration << "..." << std::endl;
57+
std::this_thread::sleep_for(wait_duration);
58+
};
59+
// DB::Exception: clickhouse_cpp_cicd: Not enough privileges. To execute this query it's necessary to have grant SYSTEM FLUSH LOGS ON
60+
if (std::string(e.what()).find("To execute this query it's necessary to have grant SYSTEM FLUSH LOGS ON") != std::string::npos) {
61+
wait_for_flush();
62+
}
63+
// DB::Exception: clickhouse_cpp_cicd: Cannot execute query in readonly mode
64+
if (std::string(e.what()).find("Cannot execute query in readonly mode") != std::string::npos) {
65+
wait_for_flush();
66+
}
67+
}
68+
}
69+
4670
std::unique_ptr<Client> client_;
4771
const std::string table_name = "test_clickhouse_cpp_test_ut_table";
4872
const std::string column_name = "test_column";
@@ -850,19 +874,7 @@ TEST_P(ClientCase, Query_ID) {
850874
client_->SelectCancelable("SELECT 'b', count(*) FROM " + table_name, query_id, [](const Block &) { return true; });
851875
client_->Execute(Query("TRUNCATE TABLE " + table_name, query_id));
852876

853-
try {
854-
client_->Execute("SYSTEM FLUSH LOGS");
855-
} catch (const std::exception & e) {
856-
// DB::Exception: clickhouse_cpp_cicd: Not enough privileges. To execute this query it's necessary to have grant SYSTEM FLUSH LOGS ON
857-
if (std::string(e.what()).find("To execute this query it's necessary to have grant SYSTEM FLUSH LOGS ON") != std::string::npos) {
858-
// Insufficient privileges, the only safe way is to wait long enough for system
859-
// to flush the logs automatically. Usually it takes 7.5 seconds, so just in case,
860-
// wait 3 times that to ensure that all previously executed queries are in the logs now.
861-
const auto wait_duration = std::chrono::seconds(23);
862-
std::cerr << "Got error while flushing logs, now we wait " << wait_duration << "..." << std::endl;
863-
std::this_thread::sleep_for(wait_duration);
864-
}
865-
}
877+
FlushLogs();
866878

867879
size_t total_count = 0;
868880
client_->Select("SELECT type, query_kind, query_id, query "
@@ -1115,6 +1127,31 @@ TEST_P(ClientCase, ServerLogs) {
11151127
}
11161128

11171129

1130+
TEST_P(ClientCase, TracingContext) {
1131+
Block block;
1132+
createTableWithOneColumn<ColumnString>(block);
1133+
1134+
Query query("INSERT INTO " + table_name + " (*) VALUES (\'Foo\'), (\'Bar\')" );
1135+
open_telemetry::TracingContext tracing_context;
1136+
std::srand(std::time(0));
1137+
tracing_context.trace_id = {std::rand(), std::rand()};
1138+
query.SetTracingContext(tracing_context);
1139+
client_->Execute(query);
1140+
1141+
FlushLogs();
1142+
1143+
size_t received_rows = 0;
1144+
client_->Select("SELECT trace_id, toString(trace_id), operation_name "
1145+
"FROM system.opentelemetry_span_log "
1146+
"WHERE trace_id = toUUID(\'" + ToString(tracing_context.trace_id) + "\');",
1147+
[&](const Block& block) {
1148+
// std::cerr << PrettyPrintBlock{block} << std::endl;
1149+
received_rows += block.GetRowCount();
1150+
});
1151+
1152+
EXPECT_GT(received_rows, 0u);
1153+
}
1154+
11181155
const auto LocalHostEndpoint = ClientOptions()
11191156
.SetHost( getEnvOrDefault("CLICKHOUSE_HOST", "localhost"))
11201157
.SetPort( getEnvOrDefault<size_t>("CLICKHOUSE_PORT", "9000"))

ut/columns_ut.cpp

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -377,17 +377,17 @@ TEST(ColumnsCase, UUIDInit) {
377377
auto col = std::make_shared<ColumnUUID>(std::make_shared<ColumnUInt64>(MakeUUID_data()));
378378

379379
ASSERT_EQ(col->Size(), 3u);
380-
ASSERT_EQ(col->At(0), UInt128(0xbb6a8c699ab2414cllu, 0x86697b7fd27f0825llu));
381-
ASSERT_EQ(col->At(2), UInt128(0x3507213c178649f9llu, 0x9faf035d662f60aellu));
380+
ASSERT_EQ(col->At(0), UUID(0xbb6a8c699ab2414cllu, 0x86697b7fd27f0825llu));
381+
ASSERT_EQ(col->At(2), UUID(0x3507213c178649f9llu, 0x9faf035d662f60aellu));
382382
}
383383

384384
TEST(ColumnsCase, UUIDSlice) {
385385
auto col = std::make_shared<ColumnUUID>(std::make_shared<ColumnUInt64>(MakeUUID_data()));
386386
auto sub = col->Slice(1, 2)->As<ColumnUUID>();
387387

388388
ASSERT_EQ(sub->Size(), 2u);
389-
ASSERT_EQ(sub->At(0), UInt128(0x84b9f24bc26b49c6llu, 0xa03b4ab723341951llu));
390-
ASSERT_EQ(sub->At(1), UInt128(0x3507213c178649f9llu, 0x9faf035d662f60aellu));
389+
ASSERT_EQ(sub->At(0), UUID(0x84b9f24bc26b49c6llu, 0xa03b4ab723341951llu));
390+
ASSERT_EQ(sub->At(1), UUID(0x3507213c178649f9llu, 0x9faf035d662f60aellu));
391391
}
392392

393393
TEST(ColumnsCase, Int128) {
@@ -787,4 +787,3 @@ TEST(ColumnsCase, ColumnLowCardinalityString_WithEmptyString_3) {
787787
EXPECT_EQ(values[i], col.At(i)) << " at pos: " << i;
788788
}
789789
}
790-

0 commit comments

Comments
 (0)