Skip to content

Commit a9d83b8

Browse files
authored
[refactor](sink) refactor vtablet writer v2 sequential close to parallel close (#52639)
### What problem does this PR solve? 1. re-pick: #51989 2. fix can not graceful shutdown.
1 parent bc03678 commit a9d83b8

File tree

6 files changed

+158
-69
lines changed

6 files changed

+158
-69
lines changed

be/src/vec/sink/load_stream_map_pool.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,15 @@ class LoadStreamMap {
9898
// only call this method after release() returns true.
9999
void close_load(bool incremental);
100100

101+
std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>> get_streams_for_node() {
102+
decltype(_streams_for_node) snapshot;
103+
{
104+
std::lock_guard<std::mutex> lock(_mutex);
105+
snapshot = _streams_for_node;
106+
}
107+
return snapshot;
108+
}
109+
101110
private:
102111
const UniqueId _load_id;
103112
const int64_t _src_id;

be/src/vec/sink/load_stream_stub.cpp

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,7 @@ void LoadStreamReplyHandler::on_closed(brpc::StreamId id) {
114114
LOG(WARNING) << "stub is not exist when on_closed, " << *this;
115115
return;
116116
}
117-
std::lock_guard<bthread::Mutex> lock(stub->_close_mutex);
118117
stub->_is_closed.store(true);
119-
stub->_close_cv.notify_all();
120118
}
121119

122120
inline std::ostream& operator<<(std::ostream& ostr, const LoadStreamReplyHandler& handler) {
@@ -330,37 +328,30 @@ Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, i
330328
return Status::OK();
331329
}
332330

333-
Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) {
331+
Status LoadStreamStub::close_finish_check(RuntimeState* state, bool* is_closed) {
334332
DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", DBUG_BLOCK);
333+
DBUG_EXECUTE_IF("LoadStreamStub::close_finish_check.close_failed",
334+
{ return Status::InternalError("close failed"); });
335+
*is_closed = true;
335336
if (!_is_open.load()) {
336337
// we don't need to close wait on non-open streams
337338
return Status::OK();
338339
}
340+
if (state->get_query_ctx()->is_cancelled()) {
341+
return state->get_query_ctx()->exec_status();
342+
}
339343
if (!_is_closing.load()) {
344+
*is_closed = false;
340345
return _status;
341346
}
342347
if (_is_closed.load()) {
343-
return _check_cancel();
344-
}
345-
DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0";
346-
std::unique_lock<bthread::Mutex> lock(_close_mutex);
347-
auto timeout_sec = timeout_ms / 1000;
348-
while (!_is_closed.load() && !state->get_query_ctx()->is_cancelled()) {
349-
//the query maybe cancel, so need check after wait 1s
350-
timeout_sec = timeout_sec - 1;
351-
LOG(INFO) << "close waiting, " << *this << ", timeout_sec=" << timeout_sec
352-
<< ", is_closed=" << _is_closed.load()
353-
<< ", is_cancelled=" << state->get_query_ctx()->is_cancelled();
354-
int ret = _close_cv.wait_for(lock, 1000000);
355-
if (ret != 0 && timeout_sec <= 0) {
356-
return Status::InternalError("stream close_wait timeout, error={}, timeout_ms={}, {}",
357-
ret, timeout_ms, to_string());
348+
RETURN_IF_ERROR(_check_cancel());
349+
if (!_is_eos.load()) {
350+
return Status::InternalError("Stream closed without EOS, {}", to_string());
358351
}
352+
return Status::OK();
359353
}
360-
RETURN_IF_ERROR(_check_cancel());
361-
if (!_is_eos.load()) {
362-
return Status::InternalError("stream closed without eos, {}", to_string());
363-
}
354+
*is_closed = false;
364355
return Status::OK();
365356
}
366357

@@ -374,11 +365,7 @@ void LoadStreamStub::cancel(Status reason) {
374365
_cancel_st = reason;
375366
_is_cancelled.store(true);
376367
}
377-
{
378-
std::lock_guard<bthread::Mutex> lock(_close_mutex);
379-
_is_closed.store(true);
380-
_close_cv.notify_all();
381-
}
368+
_is_closed.store(true);
382369
}
383370

384371
Status LoadStreamStub::_encode_and_send(PStreamHeader& header, std::span<const Slice> data) {
@@ -437,12 +424,34 @@ void LoadStreamStub::_handle_failure(butil::IOBuf& buf, Status st) {
437424
switch (hdr.opcode()) {
438425
case PStreamHeader::ADD_SEGMENT:
439426
case PStreamHeader::APPEND_DATA: {
427+
DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.append_data_failed", {
428+
add_failed_tablet(hdr.tablet_id(), st);
429+
return;
430+
});
431+
DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.add_segment_failed", {
432+
add_failed_tablet(hdr.tablet_id(), st);
433+
return;
434+
});
440435
add_failed_tablet(hdr.tablet_id(), st);
441436
} break;
442437
case PStreamHeader::CLOSE_LOAD: {
438+
DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.close_load_failed", {
439+
brpc::StreamClose(_stream_id);
440+
return;
441+
});
443442
brpc::StreamClose(_stream_id);
444443
} break;
445444
case PStreamHeader::GET_SCHEMA: {
445+
DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.get_schema_failed", {
446+
// Just log and let wait_for_schema timeout
447+
std::ostringstream oss;
448+
for (const auto& tablet : hdr.tablets()) {
449+
oss << " " << tablet.tablet_id();
450+
}
451+
LOG(WARNING) << "failed to send GET_SCHEMA request, tablet_id:" << oss.str() << ", "
452+
<< *this;
453+
return;
454+
});
446455
// Just log and let wait_for_schema timeout
447456
std::ostringstream oss;
448457
for (const auto& tablet : hdr.tablets()) {
@@ -556,13 +565,4 @@ Status LoadStreamStubs::close_load(const std::vector<PTabletID>& tablets_to_comm
556565
return status;
557566
}
558567

559-
Status LoadStreamStubs::close_wait(RuntimeState* state, int64_t timeout_ms) {
560-
MonotonicStopWatch watch;
561-
watch.start();
562-
for (auto& stream : _streams) {
563-
RETURN_IF_ERROR(stream->close_wait(state, timeout_ms - watch.elapsed_time() / 1000 / 1000));
564-
}
565-
return Status::OK();
566-
}
567-
568568
} // namespace doris

be/src/vec/sink/load_stream_stub.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> {
155155

156156
// wait remote to close stream,
157157
// remote will close stream when it receives CLOSE_LOAD
158-
Status close_wait(RuntimeState* state, int64_t timeout_ms = 0);
158+
Status close_finish_check(RuntimeState* state, bool* is_closed);
159159

160160
// cancel the stream, abort close_wait, mark _is_closed and _is_cancelled
161161
void cancel(Status reason);
@@ -223,6 +223,8 @@ class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> {
223223
void _handle_failure(butil::IOBuf& buf, Status st);
224224

225225
Status _check_cancel() {
226+
DBUG_EXECUTE_IF("LoadStreamStub._check_cancel.cancelled",
227+
{ return Status::InternalError("stream cancelled"); });
226228
if (!_is_cancelled.load()) {
227229
return Status::OK();
228230
}
@@ -247,9 +249,7 @@ class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> {
247249
Status _cancel_st;
248250

249251
bthread::Mutex _open_mutex;
250-
bthread::Mutex _close_mutex;
251252
bthread::Mutex _cancel_mutex;
252-
bthread::ConditionVariable _close_cv;
253253

254254
std::mutex _buffer_mutex;
255255
std::mutex _send_mutex;
@@ -310,8 +310,6 @@ class LoadStreamStubs {
310310

311311
Status close_load(const std::vector<PTabletID>& tablets_to_commit);
312312

313-
Status close_wait(RuntimeState* state, int64_t timeout_ms = 0);
314-
315313
std::unordered_set<int64_t> success_tablets() {
316314
std::unordered_set<int64_t> s;
317315
for (auto& stream : _streams) {
@@ -330,6 +328,8 @@ class LoadStreamStubs {
330328
return m;
331329
}
332330

331+
std::vector<std::shared_ptr<LoadStreamStub>> streams() { return _streams; }
332+
333333
private:
334334
std::vector<std::shared_ptr<LoadStreamStub>> _streams;
335335
std::atomic<bool> _open_success = false;

be/src/vec/sink/writer/vtablet_writer_v2.cpp

Lines changed: 81 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -666,7 +666,7 @@ Status VTabletWriterV2::close(Status exec_status) {
666666
// close_wait on all non-incremental streams, even if this is not the last sink.
667667
// because some per-instance data structures are now shared among all sinks
668668
// due to sharing delta writers and load stream stubs.
669-
RETURN_IF_ERROR(_close_wait(false));
669+
RETURN_IF_ERROR(_close_wait(_non_incremental_streams()));
670670

671671
// send CLOSE_LOAD on all incremental streams if this is the last sink.
672672
// this must happen after all non-incremental streams are closed,
@@ -676,7 +676,7 @@ Status VTabletWriterV2::close(Status exec_status) {
676676
}
677677

678678
// close_wait on all incremental streams, even if this is not the last sink.
679-
RETURN_IF_ERROR(_close_wait(true));
679+
RETURN_IF_ERROR(_close_wait(_incremental_streams()));
680680

681681
// calculate and submit commit info
682682
if (is_last_sink) {
@@ -721,32 +721,87 @@ Status VTabletWriterV2::close(Status exec_status) {
721721
return status;
722722
}
723723

724-
Status VTabletWriterV2::_close_wait(bool incremental) {
724+
std::unordered_set<std::shared_ptr<LoadStreamStub>> VTabletWriterV2::_incremental_streams() {
725+
std::unordered_set<std::shared_ptr<LoadStreamStub>> incremental_streams;
726+
auto streams_for_node = _load_stream_map->get_streams_for_node();
727+
for (const auto& [dst_id, streams] : streams_for_node) {
728+
for (const auto& stream : streams->streams()) {
729+
if (stream->is_incremental()) {
730+
incremental_streams.insert(stream);
731+
}
732+
}
733+
}
734+
return incremental_streams;
735+
}
736+
737+
std::unordered_set<std::shared_ptr<LoadStreamStub>> VTabletWriterV2::_non_incremental_streams() {
738+
std::unordered_set<std::shared_ptr<LoadStreamStub>> non_incremental_streams;
739+
auto streams_for_node = _load_stream_map->get_streams_for_node();
740+
for (const auto& [dst_id, streams] : streams_for_node) {
741+
for (const auto& stream : streams->streams()) {
742+
if (!stream->is_incremental()) {
743+
non_incremental_streams.insert(stream);
744+
}
745+
}
746+
}
747+
return non_incremental_streams;
748+
}
749+
750+
Status VTabletWriterV2::_close_wait(
751+
std::unordered_set<std::shared_ptr<LoadStreamStub>> unfinished_streams) {
725752
SCOPED_TIMER(_close_load_timer);
726-
auto st = _load_stream_map->for_each_st(
727-
[this, incremental](int64_t dst_id, LoadStreamStubs& streams) -> Status {
728-
if (streams.is_incremental() != incremental) {
729-
return Status::OK();
730-
}
731-
int64_t remain_ms = static_cast<int64_t>(_state->execution_timeout()) * 1000 -
732-
_timeout_watch.elapsed_time() / 1000 / 1000;
733-
DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", { remain_ms = 0; });
734-
if (remain_ms <= 0) {
735-
LOG(WARNING) << "load timed out before close waiting, load_id="
736-
<< print_id(_load_id);
737-
return Status::TimedOut("load timed out before close waiting");
738-
}
739-
auto st = streams.close_wait(_state, remain_ms);
740-
if (!st.ok()) {
741-
LOG(WARNING) << "close_wait timeout on streams to dst_id=" << dst_id
742-
<< ", load_id=" << print_id(_load_id) << ": " << st;
743-
}
744-
return st;
745-
});
746-
if (!st.ok()) {
747-
LOG(WARNING) << "close_wait failed: " << st << ", load_id=" << print_id(_load_id);
753+
Status status;
754+
auto streams_for_node = _load_stream_map->get_streams_for_node();
755+
while (true) {
756+
RETURN_IF_ERROR(_check_timeout());
757+
RETURN_IF_ERROR(_check_streams_finish(unfinished_streams, status, streams_for_node));
758+
if (!status.ok() || unfinished_streams.empty()) {
759+
LOG(INFO) << "is all unfinished: " << unfinished_streams.empty()
760+
<< ", status: " << status << ", txn_id: " << _txn_id
761+
<< ", load_id: " << print_id(_load_id);
762+
break;
763+
}
764+
bthread_usleep(1000 * 10);
748765
}
749-
return st;
766+
if (!status.ok()) {
767+
LOG(WARNING) << "close_wait failed: " << status << ", load_id=" << print_id(_load_id);
768+
}
769+
return status;
770+
}
771+
772+
Status VTabletWriterV2::_check_timeout() {
773+
int64_t remain_ms = static_cast<int64_t>(_state->execution_timeout()) * 1000 -
774+
_timeout_watch.elapsed_time() / 1000 / 1000;
775+
DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", { remain_ms = 0; });
776+
if (remain_ms <= 0) {
777+
LOG(WARNING) << "load timed out before close waiting, load_id=" << print_id(_load_id);
778+
return Status::TimedOut("load timed out before close waiting");
779+
}
780+
return Status::OK();
781+
}
782+
783+
Status VTabletWriterV2::_check_streams_finish(
784+
std::unordered_set<std::shared_ptr<LoadStreamStub>>& unfinished_streams, Status& status,
785+
const std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>>& streams_for_node) {
786+
for (const auto& [dst_id, streams] : streams_for_node) {
787+
for (const auto& stream : streams->streams()) {
788+
if (!unfinished_streams.contains(stream)) {
789+
continue;
790+
}
791+
bool is_closed = false;
792+
auto stream_st = stream->close_finish_check(_state, &is_closed);
793+
if (!stream_st.ok()) {
794+
status = stream_st;
795+
unfinished_streams.erase(stream);
796+
LOG(WARNING) << "close_wait failed: " << stream_st
797+
<< ", load_id=" << print_id(_load_id);
798+
}
799+
if (is_closed) {
800+
unfinished_streams.erase(stream);
801+
}
802+
}
803+
}
804+
return status;
750805
}
751806

752807
void VTabletWriterV2::_calc_tablets_to_commit() {

be/src/vec/sink/writer/vtablet_writer_v2.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,17 @@ class VTabletWriterV2 final : public AsyncResultWriter {
148148

149149
void _calc_tablets_to_commit();
150150

151-
Status _close_wait(bool incremental);
151+
std::unordered_set<std::shared_ptr<LoadStreamStub>> _incremental_streams();
152+
153+
std::unordered_set<std::shared_ptr<LoadStreamStub>> _non_incremental_streams();
154+
155+
Status _close_wait(std::unordered_set<std::shared_ptr<LoadStreamStub>> unfinished_streams);
156+
157+
Status _check_timeout();
158+
159+
Status _check_streams_finish(
160+
std::unordered_set<std::shared_ptr<LoadStreamStub>>& unfinished_streams, Status& status,
161+
const std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>>& streams_for_node);
152162

153163
void _cancel(Status status);
154164

regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ suite("test_writer_v2_fault_injection", "nonConcurrent") {
6868
file "baseall.txt"
6969
}
7070

71-
def load_with_injection = { injection, error_msg, success=false->
71+
def load_with_injection = { injection, error_msg="", success=false->
7272
try {
7373
GetDebugPoint().enableDebugPointForAllBEs(injection)
7474
sql "insert into test select * from baseall where k1 <= 3"
@@ -104,6 +104,21 @@ suite("test_writer_v2_fault_injection", "nonConcurrent") {
104104
// DeltaWriterV2 stream_size is 0
105105
load_with_injection("DeltaWriterV2.init.stream_size", "failed to find tablet schema")
106106

107+
// injection cases for VTabletWriterV2 close logic
108+
// Test LoadStreamStub close_finish_check close failed
109+
load_with_injection("LoadStreamStub::close_finish_check.close_failed")
110+
// Test LoadStreamStub _check_cancel when cancelled
111+
load_with_injection("LoadStreamStub._check_cancel.cancelled")
112+
// Test LoadStreamStub _send_with_retry stream write failed
113+
load_with_injection("LoadStreamStub._send_with_retry.stream_write_failed")
114+
// Test LoadStreamStub _handle_failure for different opcodes
115+
load_with_injection("LoadStreamStub._handle_failure.append_data_failed")
116+
load_with_injection("LoadStreamStub._handle_failure.add_segment_failed")
117+
load_with_injection("LoadStreamStub._handle_failure.close_load_failed")
118+
load_with_injection("LoadStreamStub._handle_failure.get_schema_failed")
119+
// Test LoadStreamStub skip send segment
120+
load_with_injection("LoadStreamStub.skip_send_segment")
121+
107122
sql """ set enable_memtable_on_sink_node=false """
108123
}
109124
}

0 commit comments

Comments
 (0)