Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions include/libnuraft/peer.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ public:

void shutdown();

void reopen(context& ctx, timer_task<int32>::executor& hb_exec);

bool is_abandoned() const { return abandoned_; }

// Time that sent the last request.
void reset_ls_timer() { last_sent_timer_.reset(); }
uint64_t get_ls_timer_us() { return last_sent_timer_.get_us(); }
Expand Down Expand Up @@ -626,6 +630,7 @@ private:
*/
std::atomic<bool> self_mark_down_;


/**
* Logger instance.
*/
Expand Down
9 changes: 9 additions & 0 deletions src/handle_timeout.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ namespace nuraft {
void raft_server::enable_hb_for_peer(peer& p) {
p.enable_hb(true);
p.resume_hb_speed();
if (p.is_abandoned()) {
timer_task<int32>::executor exec =
(timer_task<int32>::executor)
std::bind( &raft_server::handle_hb_timeout,
this,
std::placeholders::_1 );
p.reopen(*ctx_, exec);
}
p_tr("peer %d, interval: %d\n", p.get_id(), p.get_current_hb_interval());
schedule_task(p.get_hb_task(), p.get_current_hb_interval());
}
Expand Down Expand Up @@ -343,6 +351,7 @@ void raft_server::cancel_schedulers() {
cancel_task(p->get_hb_task());
}
// Shutdown peer to cut off smart pointers.
p_tr("cancel_schedulers shutdown peer: %d", p->get_id());
p->shutdown();

// Free user context of snapshot if exists.
Expand Down
15 changes: 15 additions & 0 deletions src/peer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ bool peer::recreate_rpc(ptr<srv_config>& config,
}

void peer::shutdown() {
p_tr("peer %d shutdown", get_id());
// Should set the flag to block all incoming requests.
abandoned_ = true;

Expand All @@ -326,5 +327,19 @@ void peer::shutdown() {
hb_task_.reset();
}


void peer::reopen(context& ctx, timer_task<int32>::executor& hb_exec) {
p_tr("peer %d reopen", get_id());
abandoned_ = false;

scheduler_ = ctx.scheduler_;
hb_task_ = cs_new< timer_task<int32>,
timer_task<int32>::executor&,
int32 >
( hb_exec, config_->get_id(),
timer_task_type::heartbeat_timer ) ;
p_tr("call peer %d reopen succeeded", get_id());
}

} // namespace nuraft;