Skip to content

Commit 404ca15

Browse files
mponomarakshatsikarwar
authored andcommitted
Don't always request rep records in recovery when a gap exists
Signed-off-by: Mike Ponomarenko <[email protected]>
1 parent 78e1850 commit 404ca15

File tree

11 files changed

+187
-7
lines changed

11 files changed

+187
-7
lines changed

bdb/file.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3015,6 +3015,8 @@ if (!is_real_netinfo(bdb_state->repinfo->netinfo))
30153015
starting_time = time(NULL);
30163016
}
30173017

3018+
bdb_state->dbenv->set_coherency_check_callback(bdb_state->dbenv, (int(*)(void*))bdb_am_i_coherent, bdb_state);
3019+
30183020
/* start the network up */
30193021
print(bdb_state, "starting network\n");
30203022
rc = net_init(bdb_state->repinfo->netinfo);

bdb/rep.c

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
#include <build/db_int.h>
4141
#include "dbinc/log.h"
4242
#include "dbinc/mp.h"
43+
#include "dbinc/db_swap.h"
4344
#include <trigger.h>
4445
#include "printformats.h"
4546
#include "phys_rep.h"
@@ -72,6 +73,7 @@ extern int gbl_debug_stat4dump_loop;
7273
extern struct thdpool *gbl_udppfault_thdpool;
7374
extern int gbl_commit_delay_trace;
7475
extern int gbl_2pc;
76+
extern int gbl_nudge_replication_when_idle;
7577

7678
/* osqlcomm.c code, hurray! */
7779
extern void osql_decom_node(char *decom_host);
@@ -5570,6 +5572,22 @@ void *watcher_thread(void *arg)
55705572

55715573
last_behind = behind;
55725574
}
5575+
5576+
DB_LSN next_lsn, gap_lsn;
5577+
int nrecs;
5578+
// if we're not seeing records or seeing only very old records, poke replication
5579+
// to request records in the range we expect
5580+
if (gbl_nudge_replication_when_idle && bdb_state->dbenv->get_rep_lsns(bdb_state->dbenv, &next_lsn, &gap_lsn, &nrecs) == 0) {
5581+
if (nrecs == 0 && !IS_ZERO_LSN(gap_lsn)) {
5582+
DB_LSN tmp_lsn = {0};
5583+
DBT max_lsn_dbt = {0};
5584+
LOGCOPY_TOLSN(&tmp_lsn, &gap_lsn);
5585+
max_lsn_dbt.data = &tmp_lsn;
5586+
max_lsn_dbt.size = sizeof(tmp_lsn);
5587+
logmsg(LOGMSG_INFO, "poking replication: asking for %u:%u gap end at %u:%u\n", next_lsn.file, next_lsn.offset, gap_lsn.file, gap_lsn.offset);
5588+
__rep_send_message(bdb_state->dbenv, bdb_state->repinfo->master_host, REP_LOG_REQ, &next_lsn, &max_lsn_dbt, 0, NULL);
5589+
}
5590+
}
55735591
}
55745592

55755593
/* are we the master? do we have lots of incoherent nodes? */

berkdb/build/db.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2845,6 +2845,11 @@ struct __db_env {
28452845
int (*pgout[DB_TYPE_MAX]) __P((DB_ENV *, db_pgno_t, void *, DBT *));
28462846

28472847
int (*last_commit_lsn) __P((DB_ENV *, DB_LSN *));
2848+
int (*get_rep_lsns) __P((DB_ENV *, DB_LSN *, DB_LSN *, int *));
2849+
2850+
int (*set_coherency_check_callback) __P((DB_ENV *, int(*)(void*), void*));
2851+
void *coherency_check_usrptr;
2852+
int (*coherency_check_callback)(void*);
28482853

28492854
pthread_mutex_t utxnid_lock;
28502855
u_int64_t next_utxnid;

berkdb/dbinc/log.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,8 @@ struct __log {
268268

269269
roff_t maint_off; /* offset of region maintenance info */
270270
#endif
271+
u_int64_t last_log_record_time;
272+
u_int32_t records_last_second;
271273
};
272274

273275
/*

berkdb/env/env_method.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,8 @@ __dbenv_init(dbenv)
291291
dbenv->dump_mintruncate_list = __dbenv_dump_mintruncate_list;
292292
dbenv->clear_mintruncate_list = __dbenv_clear_mintruncate_list;
293293
dbenv->build_mintruncate_list = __dbenv_build_mintruncate_list;
294+
dbenv->get_rep_lsns = __dbenv_get_rep_lsns;
295+
dbenv->set_coherency_check_callback = __dbenv_set_coherency_check_callback;
294296
#ifdef HAVE_RPC
295297
}
296298
#endif

berkdb/rep/rep_record.c

Lines changed: 85 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
#include "db_config.h"
99
#include "dbinc/db_swap.h"
1010

11+
#include <time.h>
12+
#include <epochlib.h>
13+
1114
#ifndef lint
1215
static const char revid[] =
1316
"$Id: rep_record.c,v 1.193 2003/11/14 05:32:31 ubell Exp $";
@@ -1083,7 +1086,6 @@ __rep_verify_will_recover(dbenv, control, rec)
10831086
* PUBLIC: int __rep_process_message __P((DB_ENV *, DBT *, DBT *, char**,
10841087
* PUBLIC: DB_LSN *, uint32_t *,uint32_t *, char **, int));
10851088
*/
1086-
10871089
int
10881090
__rep_process_message(dbenv, control, rec, eidp, ret_lsnp, commit_gen, newgen, newmaster, online)
10891091
DB_ENV *dbenv;
@@ -1117,17 +1119,24 @@ __rep_process_message(dbenv, control, rec, eidp, ret_lsnp, commit_gen, newgen, n
11171119
static int rpm_pr = 0;
11181120
int rpm_now;
11191121
#endif
1122+
time_t now;
11201123
int send_count = 0;
11211124
static time_t verify_req_print = 0;
11221125
static unsigned long long verify_req_count = 0;
11231126
unsigned long long bytes_behind;
1124-
time_t now;
1125-
1126-
1127+
time_t start_rep_all, end_rep_all;
1128+
int nlsns = 0;
1129+
DB_LSN rep_start_lsn;
11271130
u_int32_t vi_last_write_gen, vi_egen;
11281131
int vi_nsites, vi_priority, vi_tiebreaker;
11291132

11301133
char *master;
1134+
static time_t report_last = 0;
1135+
time_t report_now = time(NULL);
1136+
1137+
if (report_last == 0) {
1138+
report_last = report_now;
1139+
}
11311140

11321141
PANIC_CHECK(dbenv);
11331142
ENV_REQUIRES_CONFIG(dbenv, dbenv->rep_handle, "rep_process_message",
@@ -1156,6 +1165,17 @@ __rep_process_message(dbenv, control, rec, eidp, ret_lsnp, commit_gen, newgen, n
11561165
if (LOG_SWAPPED())
11571166
__rep_control_swap(rp);
11581167

1168+
if (report_now != report_last && IS_REP_CLIENT(dbenv)) {
1169+
report_last = report_now;
1170+
if (dbenv->coherency_check_callback && dbenv->coherency_check_callback(dbenv->coherency_check_usrptr) == 0) {
1171+
logmsg(LOGMSG_USER, "gen %u dups %u queued %u ready at %u:%u rep at %u:%u last recv %u:%u\n",
1172+
rep->stat.st_gen, rep->stat.st_log_duplicated, rep->stat.st_log_queued,
1173+
lp->ready_lsn.file, lp->ready_lsn.offset,
1174+
lp->waiting_lsn.file, lp->waiting_lsn.offset,
1175+
rp->lsn.file, rp->lsn.offset);
1176+
}
1177+
}
1178+
11591179
if (gbl_verbose_master_req) {
11601180
switch (rp->rectype) {
11611181
case REP_MASTER_REQ:
@@ -1505,9 +1525,12 @@ __rep_process_message(dbenv, control, rec, eidp, ret_lsnp, commit_gen, newgen, n
15051525
flags = IS_ZERO_LSN(rp->lsn) ||
15061526
IS_INIT_LSN(rp->lsn) ? DB_FIRST : DB_SET;
15071527
sendflags = DB_REP_SENDACK;
1528+
start_rep_all = comdb2_time_epochms();
1529+
rep_start_lsn = lsn;
15081530
for (ret = __log_c_get(logc, &lsn, &data_dbt, flags);
15091531
ret == 0 && type != REP_LOG_MORE;
15101532
ret = __log_c_get(logc, &lsn, &data_dbt, DB_NEXT)) {
1533+
nlsns++;
15111534
/*
15121535
* When a log file changes, we'll have a real log
15131536
* record with some lsn [n][m], and we'll also want
@@ -1581,6 +1604,8 @@ __rep_process_message(dbenv, control, rec, eidp, ret_lsnp, commit_gen, newgen, n
15811604
oldfilelsn = lsn;
15821605
oldfilelsn.offset += logc->c_len;
15831606
}
1607+
end_rep_all = comdb2_time_epochms();
1608+
logmsg(LOGMSG_USER, "sent %d lsns in %d ms %u:%u to %u:%u\n", nlsns, (int) (end_rep_all - start_rep_all), rep_start_lsn.file, rep_start_lsn.offset, lsn.file, lsn.offset);
15841609

15851610
if (gbl_verbose_fills){
15861611
logmsg(LOGMSG_USER, "%s line %d done REP_ALL fill for %s to "
@@ -3031,6 +3056,8 @@ __thread int disable_random_deadlocks = 0;
30313056
__thread int physrep_out_of_order = 0;
30323057
__thread DB_LSN commit_lsn = {0};
30333058

3059+
extern int gbl_always_request_log_req;
3060+
30343061
/*
30353062
* __rep_apply --
30363063
*
@@ -3068,6 +3095,7 @@ __rep_apply_int(dbenv, rp, rec, ret_lsnp, commit_gen, decoupled)
30683095
int num_retries;
30693096
int disabled_minwrite_noread = 0;
30703097
char *eid, *dist_txnid = NULL;
3098+
time_t now = comdb2_time_epoch();
30713099

30723100
db_rep = dbenv->rep_handle;
30733101
rep = db_rep->region;
@@ -3149,7 +3177,12 @@ __rep_apply_int(dbenv, rp, rec, ret_lsnp, commit_gen, decoupled)
31493177
(void)count_in_func;
31503178
lp = dblp->reginfo.primary;
31513179
cmp = log_compare(&rp->lsn, &lp->ready_lsn);
3152-
3180+
if (now != lp->last_log_record_time) {
3181+
lp->last_log_record_time = now;
3182+
lp->records_last_second = 0;
3183+
}
3184+
if (cmp > 0)
3185+
lp->records_last_second++;
31533186
/*
31543187
* fprintf(stderr, "Rep log file %s line %d for %d:%d ready_lsn is %d:%d cmp=%d\n",
31553188
* __FILE__, __LINE__, rp->lsn.file, rp->lsn.offset, lp->ready_lsn.file,
@@ -3430,8 +3463,14 @@ gap_check: max_lsn_dbtp = NULL;
34303463
*/
34313464
next_lsn = lp->ready_lsn;
34323465
do_req = ++lp->rcvd_recs >= lp->wait_recs;
3466+
/* We used to have an explicit do_req=1 here. Presumably this fixes a case
3467+
* where we haven't seen much traffic in the past, and get a few records
3468+
* but not enough to trigger a request. Unfortunately the back-and-forth
3469+
* requests this generates slow down catchup whene the replicant is very far
3470+
* behind and has multiple gaps. Instead we request on a timer from elsewhere. */
34333471

3434-
do_req = 1;
3472+
if (gbl_always_request_log_req)
3473+
do_req = 1;
34353474

34363475
if (do_req) {
34373476
lp->wait_recs = rep->request_gap;
@@ -3483,7 +3522,10 @@ gap_check: max_lsn_dbtp = NULL;
34833522
max_lsn_dbtp->data);
34843523
max_lsn_dbtp->data = &tmp_lsn;
34853524
}
3486-
3525+
else {
3526+
ZERO_LSN(tmp_lsn);
3527+
}
3528+
// fprintf(stderr, "Requesting %u:%u - %u:%u ready %u:%u waiting %u:%u\n", next_lsn.file, next_lsn.offset, tmp_lsn.file, tmp_lsn.offset, lp->ready_lsn.file, lp->ready_lsn.offset, lp->waiting_lsn.file, lp->waiting_lsn.offset);
34873529
/*
34883530
* fprintf(stderr, "Requesting file %s line %d lsn %d:%d\n",
34893531
* __FILE__, __LINE__, next_lsn.file, next_lsn.offset);
@@ -8858,6 +8900,42 @@ __rep_inflight_txns_older_than_lsn(DB_ENV *dbenv, DB_LSN *lsn)
88588900
return 0;
88598901
}
88608902

8903+
// PUBLIC: int __dbenv_get_rep_lsns __P((DB_ENV *, DB_LSN *, DB_LSN *, int *));
8904+
int
8905+
__dbenv_get_rep_lsns(dbenv, ready_lsn, gap_lsn, nrecs)
8906+
DB_ENV *dbenv;
8907+
DB_LSN *ready_lsn;
8908+
DB_LSN *gap_lsn;
8909+
int *nrecs;
8910+
{
8911+
DB_LOG *dblp;
8912+
LOG *lp;
8913+
DB_REP *db_rep;
8914+
8915+
db_rep = dbenv->rep_handle;
8916+
dblp = dbenv->lg_handle;
8917+
lp = dblp->reginfo.primary;
8918+
8919+
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
8920+
*ready_lsn = lp->ready_lsn;
8921+
*gap_lsn = lp->waiting_lsn;
8922+
*nrecs = lp->records_last_second;
8923+
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
8924+
return 0;
8925+
}
8926+
8927+
// PUBLIC: int __dbenv_set_coherency_check_callback __P((DB_ENV *, int(*)(void*), void*));
8928+
int
8929+
__dbenv_set_coherency_check_callback(dbenv, callback, usrptr)
8930+
DB_ENV *dbenv;
8931+
int (*callback)(void*);
8932+
void *usrptr;
8933+
{
8934+
dbenv->coherency_check_callback = callback;
8935+
dbenv->coherency_check_usrptr = usrptr;
8936+
return 0;
8937+
}
8938+
88618939
/* Not crazy about leaving this here. This is used in bdb and berkdb. It's
88628940
* initialized in db, early in main. It doesn't really belong in any one place. */
88638941
char *db_eid_broadcast = NULL;

db/db_tunables.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -597,6 +597,9 @@ int gbl_test_tunable_int_signed_limit = INT_MAX;
597597
int64_t gbl_test_tunable_int64_limit = INT64_MAX;
598598
int64_t gbl_test_tunable_int64_signed_limit = INT64_MAX;
599599

600+
int gbl_always_request_log_req = 0;
601+
int gbl_nudge_replication_when_idle = 0;
602+
600603
int parse_int64(const char *value, int64_t *num);
601604

602605
/*

db/db_tunables.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2558,4 +2558,6 @@ REGISTER_TUNABLE("iam_dbname",
25582558
NULL, NULL, NULL);
25592559
REGISTER_TUNABLE("comdb2_oplog_preserve_seqno", "Preserve max value of the seqno in llmeta", TUNABLE_BOOLEAN, &gbl_comdb2_oplog_preserve_seqno, INTERNAL, NULL, NULL, NULL, NULL);
25602560
REGISTER_TUNABLE("queue_nonodh_scan_limit", "For comdb2_queues, stop queue scan at this depth (Default: 10000)", TUNABLE_INTEGER, &gbl_nonodh_queue_scan_limit, 0, NULL, NULL, NULL, NULL);
2561+
REGISTER_TUNABLE("always_request_log_req", "Always request the next log record on replicant if there is a gap (default: off)", TUNABLE_BOOLEAN, &gbl_always_request_log_req, 0, NULL, NULL, NULL, NULL);
2562+
REGISTER_TUNABLE("nudge_replication_when_idle", "If we haven't seen any replication events in a while, request some (default: off)", TUNABLE_BOOLEAN, &gbl_nudge_replication_when_idle, 0, NULL, NULL, NULL, NULL);
25612563
#endif /* _DB_TUNABLES_H */

tests/catchup.test/Makefile

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
ifeq ($(TESTSROOTDIR),)
2+
include ../testcase.mk
3+
else
4+
include $(TESTSROOTDIR)/testcase.mk
5+
endif
6+
ifeq ($(TEST_TIMEOUT),)
7+
export TEST_TIMEOUT=10m
8+
endif

tests/catchup.test/runit

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
#!/usr/bin/env bash
2+
bash -n "$0" | exit 1
3+
4+
set -x
5+
6+
dbnm=$1
7+
8+
if [ "x$dbnm" == "x" ] ; then
9+
failexit "need a DB name"
10+
fi
11+
12+
source ${TESTSROOTDIR}/tools/runit_common.sh
13+
14+
$CDB2SQL_EXE ${CDB2_OPTIONS} ${DBNAME} default "create table t1(a int)"
15+
$CDB2SQL_EXE ${CDB2_OPTIONS} ${DBNAME} default "create table t2(a int)"
16+
$CDB2SQL_EXE ${CDB2_OPTIONS} ${DBNAME} default "create table t3(a int)"
17+
$CDB2SQL_EXE ${CDB2_OPTIONS} ${DBNAME} default "create table t4(a int)"
18+
$CDB2SQL_EXE ${CDB2_OPTIONS} ${DBNAME} default "create table t5(a int)"
19+
$CDB2SQL_EXE ${CDB2_OPTIONS} ${DBNAME} default "insert into t1 values(1)"
20+
$CDB2SQL_EXE ${CDB2_OPTIONS} ${DBNAME} default "insert into t2 values(1)"
21+
$CDB2SQL_EXE ${CDB2_OPTIONS} ${DBNAME} default "insert into t3 values(1)"
22+
$CDB2SQL_EXE ${CDB2_OPTIONS} ${DBNAME} default "insert into t4 values(1)"
23+
$CDB2SQL_EXE ${CDB2_OPTIONS} ${DBNAME} default "insert into t5 values(1)"
24+
25+
rm -f pids
26+
yes "update t1 set a=a --catchup" | $CDB2SQL_EXE ${CDB2_OPTIONS} ${DBNAME} default - >/dev/null &
27+
echo $! >pids
28+
yes "update t2 set a=a --catchup" | $CDB2SQL_EXE ${CDB2_OPTIONS} ${DBNAME} default - >/dev/null &
29+
echo $! >>pids
30+
yes "update t3 set a=a --catchup" | $CDB2SQL_EXE ${CDB2_OPTIONS} ${DBNAME} default - >/dev/null &
31+
echo $! >>pids
32+
yes "update t4 set a=a --catchup" | $CDB2SQL_EXE ${CDB2_OPTIONS} ${DBNAME} default - >/dev/null &
33+
echo $! >>pids
34+
yes "update t5 set a=a --catchup" | $CDB2SQL_EXE ${CDB2_OPTIONS} ${DBNAME} default - >/dev/null &
35+
echo $! >>pids
36+
37+
sleep 5
38+
host=$($CDB2SQL_EXE --tabs ${CDB2_OPTIONS} ${DBNAME} default "select host from comdb2_cluster where is_master='N' and coherent_state='coherent' limit 1")
39+
master=$($CDB2SQL_EXE --tabs ${CDB2_OPTIONS} ${DBNAME} default "select host from comdb2_cluster where is_master='Y'")
40+
$CDB2SQL_EXE --host $host ${CDB2_OPTIONS} ${DBNAME} "exec procedure sys.cmd.send('exit')"
41+
sleep 180
42+
ssh -n -o StrictHostKeyChecking=no $host /opt/bb/bin/comdb2 ${DBNAME} --lrl ${DBDIR}/${DBNAME}.lrl >dbout 2>&1 &
43+
44+
# wait for the node to become coherent again
45+
while :; do
46+
cdb2sql --tabs --host ${master} ${CDB2_OPTIONS} ${DBNAME} default "select * from comdb2_cluster"
47+
state=$(cdb2sql --tabs --host ${master} ${CDB2_OPTIONS} ${DBNAME} default "select coherent_state from comdb2_cluster where host='$host'")
48+
if [[ "$state" == "coherent" ]]; then
49+
break
50+
fi
51+
sleep 1
52+
done
53+
54+
for pid in $(cat pids); do
55+
kill $pid
56+
done
57+
58+
echo "Success"

0 commit comments

Comments
 (0)