diff --git a/src/bio.c b/src/bio.c index b044f9124c..aae1d34c50 100644 --- a/src/bio.c +++ b/src/bio.c @@ -62,6 +62,7 @@ #include "server.h" #include "connection.h" +#include "cluster.h" #include "bio.h" #include @@ -71,6 +72,7 @@ static unsigned int bio_job_to_worker[] = { [BIO_CLOSE_AOF] = 1, [BIO_LAZY_FREE] = 2, [BIO_RDB_SAVE] = 3, + [BIO_CLUSTER_SAVE] = 4, }; typedef struct { @@ -86,6 +88,7 @@ static bio_worker_data bio_workers[] = { {"bio_aof"}, {"bio_lazy_free"}, {"bio_rdb_save"}, + {"bio_cluster_config_save"}, }; static const bio_worker_data *const bio_worker_end = bio_workers + (sizeof bio_workers / sizeof *bio_workers); @@ -128,6 +131,12 @@ typedef union bio_job { connection *conn; /* Connection to download the RDB from */ int is_dual_channel; /* Single vs dual channel */ } save_to_disk_args; + + struct { + int type; + sds content; /* Cluster config file content. */ + unsigned do_fsync : 1; /* A flag to indicate that a fsync is required. */ + } cluster_save_args; } bio_job; void *bioProcessBackgroundJobs(void *arg); @@ -227,6 +236,13 @@ void bioCreateSaveRDBToDiskJob(connection *conn, int is_dual_channel) { bioSubmitJob(BIO_RDB_SAVE, job); } +void bioCreateClusterConfigSaveJob(sds content, int do_fsync) { + bio_job *job = zmalloc(sizeof(*job)); + job->cluster_save_args.content = content; + job->cluster_save_args.do_fsync = do_fsync; + bioSubmitJob(BIO_CLUSTER_SAVE, job); +} + void *bioProcessBackgroundJobs(void *arg) { bio_worker_data *const bwd = arg; bio_job *job; @@ -304,6 +320,10 @@ void *bioProcessBackgroundJobs(void *arg) { job->free_args.free_fn(job->free_args.free_args); } else if (job_type == BIO_RDB_SAVE) { replicaReceiveRDBFromPrimaryToDisk(job->save_to_disk_args.conn, job->save_to_disk_args.is_dual_channel); + } else if (job_type == BIO_CLUSTER_SAVE) { + if (clusterSaveConfigFromBio(job->cluster_save_args.content, job->cluster_save_args.do_fsync) == C_ERR) { + serverLog(LL_WARNING, "Failed to save the cluster config file in background."); + } } else { serverPanic("Wrong job type in bioProcessBackgroundJobs()."); } diff --git a/src/bio.h b/src/bio.h index 8ae76ec0a2..0bb801952a 100644 --- a/src/bio.h +++ b/src/bio.h @@ -42,6 +42,7 @@ void bioCreateCloseAofJob(int fd, long long offset, int need_reclaim_cache); void bioCreateFsyncJob(int fd, long long offset, int need_reclaim_cache); void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...); void bioCreateSaveRDBToDiskJob(connection *conn, int is_dual_channel); +void bioCreateClusterConfigSaveJob(sds content, int do_fsync); int inBioThread(void); /* Background job opcodes */ @@ -51,6 +52,7 @@ enum { BIO_LAZY_FREE, /* Deferred objects freeing. */ BIO_CLOSE_AOF, /* Deferred close for AOF files. */ BIO_RDB_SAVE, /* Deferred save RDB to disk on replica */ + BIO_CLUSTER_SAVE, /* Deferred cluster config file save and fsync. */ BIO_NUM_OPS }; diff --git a/src/cluster.h b/src/cluster.h index 443b035127..250d5a8942 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -149,8 +149,8 @@ sds aggregateClientOutputBuffer(client *c); void resetClusterStats(void); unsigned int delKeysInSlot(unsigned int hashslot, int lazy, bool propagate_del, bool send_del_event); -unsigned int propagateSlotDeletionByKeys(unsigned int hashslot); void clusterUpdateState(void); +int clusterSaveConfigFromBio(sds content, int do_fsync); void clusterSaveConfigOrDie(int do_fsync); int clusterDelSlot(int slot); int clusterAddSlot(clusterNode *n, int slot); diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index aa479edd30..b385931b68 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -44,6 +44,7 @@ #include "endianconv.h" #include "connection.h" #include "module.h" +#include "bio.h" #include #include @@ -886,6 +887,16 @@ int clusterLoadConfig(char *filename) { serverPanic("Unrecoverable error: corrupted cluster config file \"%s\".", line); } +/* Get the nodes description and concatenate our "vars" directive to + * save currentEpoch and lastVoteEpoch. */ +sds clusterGenNodesConfContent(void) { + sds content = clusterGenNodesDescription(NULL, CLUSTER_NODE_HANDSHAKE, 0); + content = sdscatfmt(content, "vars currentEpoch %U lastVoteEpoch %U\n", + (unsigned long long)server.cluster->currentEpoch, + (unsigned long long)server.cluster->lastVoteEpoch); + return content; +} + /* Cluster node configuration is exactly the same as CLUSTER NODES output. * * This function writes the node config and returns C_OK, on error C_ERR @@ -898,23 +909,14 @@ int clusterLoadConfig(char *filename) { * a single write to write the whole file. If the pre-existing file was * bigger we pad our payload with newlines that are anyway ignored and truncate * the file afterward. */ -int clusterSaveConfig(int do_fsync) { - sds ci, tmpfilename; +int clusterSaveConfigImpl(sds content, int from_bio, int do_fsync) { + sds tmpfilename; size_t content_size, offset = 0; ssize_t written_bytes; int fd = -1; int retval = C_ERR; mstime_t latency; - - server.cluster->todo_before_sleep &= ~CLUSTER_TODO_SAVE_CONFIG; - - /* Get the nodes description and concatenate our "vars" directive to - * save currentEpoch and lastVoteEpoch. */ - ci = clusterGenNodesDescription(NULL, CLUSTER_NODE_HANDSHAKE, 0); - ci = sdscatfmt(ci, "vars currentEpoch %U lastVoteEpoch %U\n", - (unsigned long long)server.cluster->currentEpoch, - (unsigned long long)server.cluster->lastVoteEpoch); - content_size = sdslen(ci); + content_size = sdslen(content); /* Create a temp file with the new content. */ tmpfilename = sdscatfmt(sdsempty(), "%s.tmp-%i-%I", server.cluster_configfile, (int)getpid(), mstime()); @@ -924,11 +926,11 @@ int clusterSaveConfig(int do_fsync) { goto cleanup; } latencyEndMonitor(latency); - latencyAddSampleIfNeeded("cluster-config-open", latency); - latencyTraceIfNeeded(cluster, cluster_config_open, latency); + if (!from_bio) latencyAddSampleIfNeeded("cluster-config-open", latency); + if (!from_bio) latencyTraceIfNeeded(cluster, cluster_config_open, latency); latencyStartMonitor(latency); while (offset < content_size) { - written_bytes = write(fd, ci + offset, content_size - offset); + written_bytes = write(fd, content + offset, content_size - offset); if (written_bytes <= 0) { if (errno == EINTR) continue; serverLog(LL_WARNING, "Failed after writing (%zd) bytes to tmp cluster config file: %s", offset, @@ -938,8 +940,8 @@ int clusterSaveConfig(int do_fsync) { offset += written_bytes; } latencyEndMonitor(latency); - latencyAddSampleIfNeeded("cluster-config-write", latency); - latencyTraceIfNeeded(cluster, cluster_config_write, latency); + if (!from_bio) latencyAddSampleIfNeeded("cluster-config-write", latency); + if (!from_bio) latencyTraceIfNeeded(cluster, cluster_config_write, latency); if (do_fsync) { latencyStartMonitor(latency); server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG; @@ -948,8 +950,8 @@ int clusterSaveConfig(int do_fsync) { goto cleanup; } latencyEndMonitor(latency); - latencyAddSampleIfNeeded("cluster-config-fsync", latency); - latencyTraceIfNeeded(cluster, cluster_config_fsync, latency); + if (!from_bio) latencyAddSampleIfNeeded("cluster-config-fsync", latency); + if (!from_bio) latencyTraceIfNeeded(cluster, cluster_config_fsync, latency); } latencyStartMonitor(latency); @@ -958,8 +960,8 @@ int clusterSaveConfig(int do_fsync) { goto cleanup; } latencyEndMonitor(latency); - latencyAddSampleIfNeeded("cluster-config-rename", latency); - latencyTraceIfNeeded(cluster, cluster_config_rename, latency); + if (!from_bio) latencyAddSampleIfNeeded("cluster-config-rename", latency); + if (!from_bio) latencyTraceIfNeeded(cluster, cluster_config_rename, latency); if (do_fsync) { latencyStartMonitor(latency); if (fsyncFileDir(server.cluster_configfile) == -1) { @@ -967,8 +969,8 @@ int clusterSaveConfig(int do_fsync) { goto cleanup; } latencyEndMonitor(latency); - latencyAddSampleIfNeeded("cluster-config-dir-fsync", latency); - latencyTraceIfNeeded(cluster, cluster_config_dir_fsync, latency); + if (!from_bio) latencyAddSampleIfNeeded("cluster-config-dir-fsync", latency); + if (!from_bio) latencyTraceIfNeeded(cluster, cluster_config_dir_fsync, latency); } retval = C_OK; /* If we reached this point, everything is fine. */ @@ -977,23 +979,48 @@ int clusterSaveConfig(int do_fsync) { latencyStartMonitor(latency); close(fd); latencyEndMonitor(latency); - latencyAddSampleIfNeeded("cluster-config-close", latency); - latencyTraceIfNeeded(cluster, cluster_config_close, latency); + if (!from_bio) latencyAddSampleIfNeeded("cluster-config-close", latency); + if (!from_bio) latencyTraceIfNeeded(cluster, cluster_config_close, latency); } if (retval == C_ERR) { latencyStartMonitor(latency); unlink(tmpfilename); latencyEndMonitor(latency); - latencyAddSampleIfNeeded("cluster-config-unlink", latency); - latencyTraceIfNeeded(cluster, cluster_config_unlink, latency); + if (!from_bio) latencyAddSampleIfNeeded("cluster-config-unlink", latency); + if (!from_bio) latencyTraceIfNeeded(cluster, cluster_config_unlink, latency); } sdsfree(tmpfilename); - sdsfree(ci); + sdsfree(content); return retval; } +/* Save cluster config file. + * + * This function writes the node config and returns C_OK, on error C_ERR + * is returned. It is possible to use bio, which can move I/O latency into + * the bio thread. If bio is used, it always returns C_OK. */ +int clusterSaveConfig(int bio, int do_fsync) { + server.cluster->todo_before_sleep &= ~CLUSTER_TODO_SAVE_CONFIG; + if (do_fsync) server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG; + + sds content = clusterGenNodesConfContent(); + if (bio) { + /* We can actually always fsync the file in bio, but anyway lets follow the old code. */ + bioCreateClusterConfigSaveJob(content, do_fsync); + return C_OK; + } else { + return clusterSaveConfigImpl(content, 0, do_fsync); + } +} + +/* Save the cluster file, it is called from the bio thread. */ +int clusterSaveConfigFromBio(sds content, int do_fsync) { + return clusterSaveConfigImpl(content, 1, do_fsync); +} + +/* Save the cluster file, if save fails, the process will exit. */ void clusterSaveConfigOrDie(int do_fsync) { - if (clusterSaveConfig(do_fsync) == C_ERR) { + if (clusterSaveConfig(0, do_fsync) == C_ERR) { serverLog(LL_WARNING, "Fatal: can't update cluster config file."); exit(1); } @@ -1417,7 +1444,8 @@ void clusterHandleServerShutdown(bool auto_failover) { /* The error logs have been logged in the save function if the save fails. */ serverLog(LL_NOTICE, "Saving the cluster configuration file before exiting."); - clusterSaveConfig(1); + bioDrainWorker(BIO_CLUSTER_SAVE); + clusterSaveConfig(0, 1); #if !defined(__sun) /* Unlock the cluster config file before shutdown, see clusterLockConfig. @@ -5907,7 +5935,7 @@ void clusterBeforeSleep(void) { /* Save the config, possibly using fsync. */ if (flags & CLUSTER_TODO_SAVE_CONFIG) { int fsync = flags & CLUSTER_TODO_FSYNC_CONFIG; - clusterSaveConfigOrDie(fsync); + clusterSaveConfig(1, fsync); } if (flags & CLUSTER_TODO_BROADCAST_ALL) { @@ -6309,7 +6337,10 @@ int verifyClusterConfigWithData(void) { delKeysInSlot(j, server.lazyfree_lazy_server_del, true, false); } } - if (update_config) clusterSaveConfigOrDie(1); + if (update_config) { + bioDrainWorker(BIO_CLUSTER_SAVE); + clusterSaveConfigOrDie(1); + } return C_OK; } @@ -7538,7 +7569,8 @@ int clusterCommandSpecial(client *c) { (unsigned long long)myself->configEpoch); addReplySds(c, reply); } else if (!strcasecmp(c->argv[1]->ptr, "saveconfig") && c->argc == 2) { - int retval = clusterSaveConfig(1); + bioDrainWorker(BIO_CLUSTER_SAVE); + int retval = clusterSaveConfig(0, 1); if (retval == C_OK) addReply(c, shared.ok); diff --git a/tests/support/cluster_util.tcl b/tests/support/cluster_util.tcl index d69a195409..e35b2b6a20 100644 --- a/tests/support/cluster_util.tcl +++ b/tests/support/cluster_util.tcl @@ -263,7 +263,7 @@ proc start_cluster {masters replicas options code {slot_allocator continuous_slo # Configure the starting of multiple servers. Set cluster node timeout # aggressively since many tests depend on ping/pong messages. - set cluster_options [list overrides [list cluster-enabled yes cluster-ping-interval 100 cluster-node-timeout 3000 cluster-databases 16 cluster-slot-stats-enabled yes]] + set cluster_options [list overrides [list cluster-enabled yes cluster-ping-interval 100 cluster-node-timeout 3000 cluster-databases 16 cluster-slot-stats-enabled yes latency-monitor-threshold 1]] set options [concat $cluster_options $options] # Cluster mode only supports a single database, so before executing the tests