Skip to content

Commit b8544bb

Browse files
authored
Merge pull request #448 from ClickHouse/begin-end-insert-cleanup
Clean up and simplify Begin/EndInsert
2 parents b4a9d70 + 8b96f8c commit b8544bb

File tree

3 files changed

+71
-144
lines changed

3 files changed

+71
-144
lines changed

clickhouse/client.cpp

Lines changed: 68 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ class Client::Impl {
163163

164164
Block BeginInsert(Query query);
165165

166-
void InsertData(const Block& block);
166+
void SendInsertBlock(const Block& block);
167167

168168
void EndInsert();
169169

@@ -181,7 +181,6 @@ class Client::Impl {
181181
bool Handshake();
182182

183183
bool ReceivePacket(uint64_t* server_packet = nullptr);
184-
bool ReceivePreparePackets(uint64_t* server_packet = nullptr);
185184

186185
void SendQuery(const Query& query, bool finalize = true);
187186
void FinalizeQuery();
@@ -215,7 +214,6 @@ class Client::Impl {
215214
}
216215

217216
private:
218-
bool inserting;
219217
/// In case of network errors tries to reconnect to server and
220218
/// call fuc several times.
221219
void RetryGuard(std::function<void()> func);
@@ -259,6 +257,8 @@ class Client::Impl {
259257
std::optional<Endpoint> current_endpoint_;
260258

261259
ServerInfo server_info_;
260+
261+
bool inserting_;
262262
};
263263

264264
ClientOptions modifyClientOptions(ClientOptions opts)
@@ -294,9 +294,10 @@ Client::Impl::~Impl() {
294294
}
295295

296296
void Client::Impl::ExecuteQuery(Query query) {
297-
if (inserting) {
298-
throw ProtocolError("cannot execute query while inserting");
297+
if (inserting_) {
298+
throw ValidationError("cannot execute query while inserting");
299299
}
300+
300301
EnsureNull en(static_cast<QueryEvents*>(&query), &events_);
301302

302303
if (options_.ping_before_query) {
@@ -312,9 +313,10 @@ void Client::Impl::ExecuteQuery(Query query) {
312313

313314

314315
void Client::Impl::SelectWithExternalData(Query query, const ExternalTables& external_tables) {
315-
if (inserting) {
316-
throw ProtocolError("cannot execute query while inserting");
316+
if (inserting_) {
317+
throw ValidationError("cannot execute query while inserting");
317318
}
319+
318320
if (server_info_.revision < DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) {
319321
throw UnimplementedError("This version of ClickHouse server doesn't support temporary tables");
320322
}
@@ -378,15 +380,18 @@ std::string NameToQueryString(const std::string &input)
378380
}
379381

380382
void Client::Impl::Insert(const std::string& table_name, const std::string& query_id, const Block& block) {
381-
if (inserting) {
382-
throw ProtocolError("cannot execute query while inserting");
383+
if (inserting_) {
384+
throw ValidationError("cannot execute query while inserting, use SendInsertData instead");
383385
}
386+
384387
if (options_.ping_before_query) {
385388
RetryGuard([this]() { Ping(); });
386389
}
387390

391+
inserting_ = true;
392+
388393
std::stringstream fields_section;
389-
const auto num_columns = block.GetColumnCount();
394+
const auto num_columns = block.GetColumnCount();
390395

391396
for (unsigned int i = 0; i < num_columns; ++i) {
392397
if (i == num_columns - 1) {
@@ -399,41 +404,67 @@ void Client::Impl::Insert(const std::string& table_name, const std::string& quer
399404
Query query("INSERT INTO " + table_name + " ( " + fields_section.str() + " ) VALUES", query_id);
400405
SendQuery(query);
401406

402-
uint64_t server_packet;
403-
// Receive data packet.
404-
while (true) {
405-
bool ret = ReceivePacket(&server_packet);
406-
407-
if (!ret) {
408-
throw ProtocolError("fail to receive data packet");
409-
}
407+
// Wait for a data packet and return
408+
uint64_t server_packet = 0;
409+
while (ReceivePacket(&server_packet)) {
410410
if (server_packet == ServerCodes::Data) {
411-
break;
411+
SendData(block);
412+
EndInsert();
413+
return;
412414
}
413-
if (server_packet == ServerCodes::Progress) {
414-
continue;
415+
}
416+
417+
throw ProtocolError("fail to receive data packet");
418+
}
419+
420+
Block Client::Impl::BeginInsert(Query query) {
421+
if (inserting_) {
422+
throw ValidationError("cannot execute query while inserting");
423+
}
424+
425+
EnsureNull en(static_cast<QueryEvents*>(&query), &events_);
426+
427+
if (options_.ping_before_query) {
428+
RetryGuard([this]() { Ping(); });
429+
}
430+
431+
inserting_ = true;
432+
433+
// Create a callback to extract the block with the proper query columns.
434+
Block block;
435+
query.OnData([&block](const Block& b) {
436+
block = std::move(b);
437+
return true;
438+
});
439+
440+
SendQuery(query.GetText());
441+
442+
// Wait for a data packet and return
443+
uint64_t server_packet = 0;
444+
while (ReceivePacket(&server_packet)) {
445+
if (server_packet == ServerCodes::Data) {
446+
return block;
415447
}
416448
}
417449

418-
// Send data.
419-
inserting = true;
420-
SendData(block);
421-
EndInsert();
450+
throw ProtocolError("fail to receive data packet");
422451
}
423452

424-
void Client::Impl::InsertData(const Block& block) {
425-
if (!inserting) {
426-
throw ProtocolError("illegal call to InsertData without first calling BeginInsert");
453+
void Client::Impl::SendInsertBlock(const Block& block) {
454+
if (!inserting_) {
455+
throw ValidationError("illegal call to InsertData without first calling BeginInsert");
427456
}
457+
428458
SendData(block);
429459
}
430460

431461
void Client::Impl::EndInsert() {
432-
if (!inserting) return;
462+
if (!inserting_) {
463+
return;
464+
}
433465

434466
// Send empty block as marker of end of data.
435467
SendData(Block());
436-
inserting = false;
437468

438469
// Wait for EOS.
439470
uint64_t eos_packet{0};
@@ -446,12 +477,14 @@ void Client::Impl::EndInsert() {
446477
throw ProtocolError(std::string{"unexpected packet from server while receiving end of query, expected (expected Exception, EndOfStream or Log, got: "}
447478
+ (eos_packet ? std::to_string(eos_packet) : "nothing") + ")");
448479
}
480+
inserting_ = false;
449481
}
450482

451483
void Client::Impl::Ping() {
452-
if (inserting) {
453-
throw ProtocolError("cannot execute query while inserting");
484+
if (inserting_) {
485+
throw ValidationError("cannot execute query while inserting");
454486
}
487+
455488
WireFormat::WriteUInt64(*output_, ClientCodes::Ping);
456489
output_->Flush();
457490

@@ -465,7 +498,7 @@ void Client::Impl::Ping() {
465498

466499
void Client::Impl::ResetConnection() {
467500
InitializeStreams(socket_factory_->connect(options_, current_endpoint_.value()));
468-
inserting = false;
501+
inserting_ = false;
469502

470503
if (!Handshake()) {
471504
throw ProtocolError("fail to connect to " + options_.host);
@@ -685,78 +718,6 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) {
685718
}
686719
}
687720

688-
bool Client::Impl::ReceivePreparePackets(uint64_t* server_packet) {
689-
uint64_t packet_type = 0;
690-
691-
while (true) {
692-
if (!WireFormat::ReadVarint64(*input_, &packet_type)) {
693-
throw std::runtime_error("unexpected package type " +
694-
std::to_string((int)packet_type) + " for insert query");
695-
}
696-
if (server_packet) {
697-
*server_packet = packet_type;
698-
}
699-
700-
switch (packet_type) {
701-
case ServerCodes::Data: {
702-
if (!ReceiveData()) {
703-
throw ProtocolError("can't read data packet from input stream");
704-
}
705-
return true;
706-
}
707-
708-
case ServerCodes::Exception: {
709-
ReceiveException();
710-
return false;
711-
}
712-
713-
case ServerCodes::ProfileInfo:
714-
case ServerCodes::Progress:
715-
case ServerCodes::Pong:
716-
case ServerCodes::Hello:
717-
continue;
718-
719-
case ServerCodes::Log: {
720-
// log tag
721-
if (!WireFormat::SkipString(*input_)) {
722-
return false;
723-
}
724-
Block block;
725-
726-
// Use uncompressed stream since log blocks usually contain only one row
727-
if (!ReadBlock(*input_, &block)) {
728-
return false;
729-
}
730-
731-
if (events_) {
732-
events_->OnServerLog(block);
733-
}
734-
continue;
735-
}
736-
737-
case ServerCodes::TableColumns: {
738-
// external table name
739-
if (!WireFormat::SkipString(*input_)) {
740-
return false;
741-
}
742-
743-
// columns metadata
744-
if (!WireFormat::SkipString(*input_)) {
745-
return false;
746-
}
747-
continue;
748-
}
749-
750-
// No others expected.
751-
case ServerCodes::EndOfStream:
752-
case ServerCodes::ProfileEvents:
753-
default:
754-
throw UnimplementedError("unimplemented " + std::to_string((int)packet_type));
755-
break;
756-
}
757-
}
758-
}
759-
760721
bool Client::Impl::ReadBlock(InputStream& input, Block* block) {
761722
// Additional information about block.
762723
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
@@ -1193,34 +1154,6 @@ void Client::Impl::RetryGuard(std::function<void()> func) {
11931154
}
11941155
}
11951156

1196-
Block Client::Impl::BeginInsert(Query query) {
1197-
if (inserting) {
1198-
throw ProtocolError("cannot execute query while inserting");
1199-
}
1200-
EnsureNull en(static_cast<QueryEvents*>(&query), &events_);
1201-
1202-
if (options_.ping_before_query) {
1203-
RetryGuard([this]() { Ping(); });
1204-
}
1205-
1206-
// Create a callback to extract the block with the proper query columns.
1207-
Block block;
1208-
query.OnData([&block](const Block& b) {
1209-
block = std::move(b);
1210-
return true;
1211-
});
1212-
1213-
SendQuery(query.GetText());
1214-
1215-
// Receive data packet but keep the query/connection open.
1216-
if (!ReceivePreparePackets()) {
1217-
throw std::runtime_error("fail to receive data packet");
1218-
}
1219-
1220-
inserting = true;
1221-
return block;
1222-
}
1223-
12241157
Client::Client(const ClientOptions& opts)
12251158
: options_(opts)
12261159
, impl_(new Impl(opts))
@@ -1293,13 +1226,8 @@ Block Client::BeginInsert(const std::string& query, const std::string& query_id)
12931226
return impl_->BeginInsert(Query(query, query_id));
12941227
}
12951228

1296-
void Client::InsertData(const Block& block) {
1297-
impl_->InsertData(block);
1298-
}
1299-
1300-
void Client::EndInsert(const Block& block) {
1301-
impl_->InsertData(block);
1302-
impl_->EndInsert();
1229+
void Client::SendInsertBlock(const Block& block) {
1230+
impl_->SendInsertBlock(block);
13031231
}
13041232

13051233
void Client::EndInsert() {

clickhouse/client.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,11 +278,10 @@ class Client {
278278
Block BeginInsert(const std::string& query, const std::string& query_id);
279279

280280
/// Insert data using a \p block returned by \p BeginInsert.
281-
void InsertData(const Block& block);
281+
void SendInsertBlock(const Block& block);
282282

283283
/// End an \p INSERT session started by \p BeginInsert.
284284
void EndInsert();
285-
void EndInsert(const Block& block);
286285

287286
/// Ping server for aliveness.
288287
void Ping();

ut/client_ut.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,7 @@ TEST_P(ClientCase, InsertData) {
402402
f->Append(td.f);
403403
}
404404
block.RefreshRowCount();
405-
client_->InsertData(block);
405+
client_->SendInsertBlock(block);
406406
block.Clear();
407407

408408
// Insert some more values.
@@ -412,7 +412,7 @@ TEST_P(ClientCase, InsertData) {
412412
f->Append(td.f);
413413
}
414414
block.RefreshRowCount();
415-
client_->InsertData(block);
415+
client_->SendInsertBlock(block);
416416
block.Clear();
417417
client_->EndInsert();
418418
// Second call to EndInsert should be no-op.

0 commit comments

Comments
 (0)