diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index b7df963b4a..304df504fe 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -173,10 +173,41 @@ static inline char *clusterLinkGetNodeName(clusterLink *link) { return link->node ? link->node->name : ""; } +/* By default, a server doesn't have a human-readable nodename unless explicitly + * assigned by CONFIG SET cluster-announce-human-nodename command or config file + * edit, so we simply fall back to using the node's IP and port as the nodename. + * This is only used for logging purpose, so we return either the SDS field or a + * pointer to a thread-local scratch buffer. */ +char *humanNodename(clusterNode *node) { + if (sdslen(node->human_nodename) > 0) { + return node->human_nodename; + } + + /* Avoid allocating heap memory so that users can call the function with ease. + * Use a small ring of thread-local buffers here so that multiple function calls + * in the same logging statement are safe. */ + enum { BUF_COUNT = 8 }; + enum { BUF_SIZE = 64 }; /* Long enough to hold "255.255.255.255:65535" */ + static _Thread_local char buffers[BUF_COUNT][BUF_SIZE]; + static _Thread_local int idx; + + char *buffer = buffers[idx]; + idx = (idx + 1) % BUF_COUNT; + + const int port = server.tls_cluster ? node->tls_port : node->tcp_port; + /* Be defensive in case IP is empty. */ + if (node->ip[0] == '\0') { + snprintf(buffer, BUF_SIZE, ":%d", port); + } else { + snprintf(buffer, BUF_SIZE, "%s:%d", node->ip, port); + } + return buffer; +} + /* Return human assigned node name if the link has the node associated to it * or else return "". */ static inline char *clusterLinkGetHumanNodeName(clusterLink *link) { - return link->node ? link->node->human_nodename : ""; + return link->node ? humanNodename(link->node) : ""; } #define isSlotUnclaimed(slot) \ @@ -1227,7 +1258,11 @@ static void updateAnnouncedHostname(clusterNode *node, char *value) { } static void updateAnnouncedHumanNodename(clusterNode *node, char *value) { - updateSdsExtensionField(&node->human_nodename, value); + /* We should only update the human nodename when the provided new + * value isn't NULL, otherwise the following function will clear + * the human nodename field. */ + if (value != NULL) + updateSdsExtensionField(&node->human_nodename, value); } static void updateAnnouncedClientIpV4(clusterNode *node, char *value) { @@ -2099,7 +2134,7 @@ void clusterAddNode(clusterNode *node) { */ void clusterDelNode(clusterNode *delnode) { serverAssert(delnode != NULL); - serverLog(LL_DEBUG, "Deleting node %.40s (%s) from cluster view", delnode->name, delnode->human_nodename); + serverLog(LL_DEBUG, "Deleting node %.40s (%s) from cluster view", delnode->name, humanNodename(delnode)); int j; dictIterator *di; @@ -2160,7 +2195,7 @@ void clusterRenameNode(clusterNode *node, char *newname) { int retval; sds s = sdsnewlen(node->name, CLUSTER_NAMELEN); - serverLog(LL_DEBUG, "Renaming node %.40s (%s) into %.40s", node->name, node->human_nodename, newname); + serverLog(LL_DEBUG, "Renaming node %.40s (%s) into %.40s", node->name, humanNodename(node), newname); retval = dictDelete(server.cluster->nodes, s); sdsfree(s); serverAssert(retval == DICT_OK); @@ -2322,7 +2357,7 @@ void clusterHandleConfigEpochCollision(clusterNode *sender) { myself->configEpoch = server.cluster->currentEpoch; clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_FSYNC_CONFIG | CLUSTER_TODO_BROADCAST_ALL); serverLog(LL_NOTICE, "configEpoch collision with node %.40s (%s). configEpoch set to %llu", sender->name, - sender->human_nodename, (unsigned long long)myself->configEpoch); + humanNodename(sender), (unsigned long long)myself->configEpoch); } /* ----------------------------------------------------------------------------- @@ -2452,7 +2487,7 @@ void markNodeAsFailingIfNeeded(clusterNode *node) { if (clusterNodeIsVotingPrimary(myself)) failures++; if (failures < needed_quorum) return; /* No weak agreement from primaries. */ - serverLog(LL_NOTICE, "Marking node %.40s (%s) as failing (quorum reached).", node->name, node->human_nodename); + serverLog(LL_NOTICE, "Marking node %.40s (%s) as failing (quorum reached).", node->name, humanNodename(node)); /* Mark the node as failing. */ markNodeAsFailing(node); @@ -2477,7 +2512,7 @@ void clearNodeFailureIfNeeded(clusterNode *node) { * right, we always clear the FAIL flag if we can contact the node again. */ if (!clusterNodeIsVotingPrimary(node)) { serverLog(LL_NOTICE, "Clear FAIL state for node %.40s (%s): %s is reachable again.", node->name, - node->human_nodename, nodeIsReplica(node) ? "replica" : "primary without slots"); + humanNodename(node), nodeIsReplica(node) ? "replica" : "primary without slots"); node->flags &= ~CLUSTER_NODE_FAIL; clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG); } @@ -2491,7 +2526,7 @@ void clearNodeFailureIfNeeded(clusterNode *node) { serverLog( LL_NOTICE, "Clear FAIL state for node %.40s (%s): is reachable again and nobody is serving its slots after some time.", - node->name, node->human_nodename); + node->name, humanNodename(node)); node->flags &= ~CLUSTER_NODE_FAIL; clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG); } @@ -2635,7 +2670,7 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { if (invalid_ids) { if (sender) { serverLog(LL_WARNING, "Node %.40s (%s) gossiped %d nodes with invalid IDs.", sender->name, - sender->human_nodename, invalid_ids); + humanNodename(sender), invalid_ids); } else { serverLog(LL_WARNING, "Unknown node gossiped %d nodes with invalid IDs.", invalid_ids); } @@ -2668,13 +2703,13 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { if (flags & (CLUSTER_NODE_FAIL | CLUSTER_NODE_PFAIL)) { if (clusterNodeIsVotingPrimary(sender) && clusterNodeAddFailureReport(node, sender)) { serverLog(LL_VERBOSE, "Node %.40s (%s) reported node %.40s (%s) as not reachable.", sender->name, - sender->human_nodename, node->name, node->human_nodename); + humanNodename(sender), node->name, humanNodename(node)); } markNodeAsFailingIfNeeded(node); } else { if (clusterNodeDelFailureReport(node, sender)) { serverLog(LL_VERBOSE, "Node %.40s (%s) reported node %.40s (%s) is back online.", sender->name, - sender->human_nodename, node->name, node->human_nodename); + humanNodename(sender), node->name, humanNodename(node)); } } } @@ -2715,7 +2750,7 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { node->cport = ntohs(g->cport); node->flags &= ~CLUSTER_NODE_NOADDR; - serverLog(LL_NOTICE, "Address updated for node %.40s (%s), now %s:%d", node->name, node->human_nodename, + serverLog(LL_NOTICE, "Address updated for node %.40s (%s), now %s:%d", node->name, humanNodename(node), node->ip, getNodeDefaultClientPort(node)); /* Check if this is our primary and we have to change the @@ -2818,7 +2853,7 @@ int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link, clusterMsg * node->cport = cport; if (node->link) freeClusterLink(node->link); node->flags &= ~CLUSTER_NODE_NOADDR; - serverLog(LL_NOTICE, "Address updated for node %.40s (%s), now %s:%d", node->name, node->human_nodename, node->ip, + serverLog(LL_NOTICE, "Address updated for node %.40s (%s), now %s:%d", node->name, humanNodename(node), node->ip, getNodeDefaultClientPort(node)); /* Check if this is our primary and we have to change the @@ -2834,7 +2869,7 @@ int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link, clusterMsg * void clusterSetNodeAsPrimary(clusterNode *n) { if (clusterNodeIsPrimary(n)) return; - serverLog(LL_NOTICE, "Reconfiguring node %.40s (%s) as primary for shard %.40s", n->name, n->human_nodename, n->shard_id); + serverLog(LL_NOTICE, "Reconfiguring node %.40s (%s) as primary for shard %.40s", n->name, humanNodename(n), n->shard_id); if (n->replicaof) { clusterNodeRemoveReplica(n->replicaof, n); @@ -2860,8 +2895,8 @@ static void clusterLogSlotRangeMigration(int first_slot, "Slot range [%d, %d] is migrated from node %.40s (%s) in shard %.40s" " to node %.40s (%s) in shard %.40s.", first_slot, last_slot, - source_node->name, source_node->human_nodename, source_node->shard_id, - target_node->name, target_node->human_nodename, target_node->shard_id); + source_node->name, humanNodename(source_node), source_node->shard_id, + target_node->name, humanNodename(target_node), target_node->shard_id); } /* This function is called when we receive a primary configuration via a @@ -2984,7 +3019,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc if (mn != NULL) { if (!are_in_same_shard) { serverLog(LL_NOTICE, "Slot %d is no longer being migrated to node %.40s (%s) in shard %.40s.", - j, mn->name, mn->human_nodename, mn->shard_id); + j, mn->name, humanNodename(mn), mn->shard_id); setMigratingSlotDest(j, NULL); } } @@ -2999,14 +3034,14 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc serverLog(LL_VERBOSE, "Failover occurred in migration source. Update importing " "source for slot %d to node %.40s (%s) in shard %.40s.", - j, sender->name, sender->human_nodename, sender->shard_id); + j, sender->name, humanNodename(sender), sender->shard_id); setImportingSlotSource(j, sender); } else { /* If the sender is from a different shard, it must be a result * of deliberate operator actions. We should clear the importing * state to conform to the operator's will. */ serverLog(LL_NOTICE, "Slot %d is no longer being imported from node %.40s (%s) in shard %.40s.", - j, in->name, in->human_nodename, in->shard_id); + j, in->name, humanNodename(in), in->shard_id); setImportingSlotSource(j, NULL); } } @@ -3040,7 +3075,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc serverLog(LL_VERBOSE, "Failover occurred in migration target." " Slot %d is now being migrated to node %.40s (%s) in shard %.40s.", - j, sender->name, sender->human_nodename, sender->shard_id); + j, sender->name, humanNodename(sender), sender->shard_id); setMigratingSlotDest(j, sender); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG); } @@ -3065,7 +3100,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc serverLog(LL_NOTICE, "Slot %d is no longer being imported from node %.40s (%s) in shard %.40s;" " Clear my importing source for the slot.", - j, sender->name, sender->human_nodename, sender->shard_id); + j, sender->name, humanNodename(sender), sender->shard_id); setImportingSlotSource(j, NULL); /* Take over the slot ownership if I am not the owner yet*/ if (server.cluster->slots[j] != myself) { @@ -3134,7 +3169,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc serverLog(LL_NOTICE, "Configuration change detected. Reconfiguring myself " "as a replica of node %.40s (%s) in shard %.40s", - sender->name, sender->human_nodename, sender->shard_id); + sender->name, humanNodename(sender), sender->shard_id); /* Don't clear the migrating/importing states if this is a replica that * just gets promoted to the new primary in the shard. * @@ -3149,7 +3184,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc * in this case. */ serverLog(LL_NOTICE, "My last slot was migrated to node %.40s (%s) in shard %.40s. I am now an empty primary.", - sender->name, sender->human_nodename, sender->shard_id); + sender->name, humanNodename(sender), sender->shard_id); /* We may still have dirty slots when we became a empty primary due to * a bad migration. * @@ -3180,7 +3215,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc if (delete_dirty_slots) { for (int j = 0; j < dirty_slots_count; j++) { serverLog(LL_NOTICE, "Deleting keys in dirty slot %d on node %.40s (%s) in shard %.40s", dirty_slots[j], - myself->name, myself->human_nodename, myself->shard_id); + myself->name, humanNodename(myself), myself->shard_id); delKeysInSlot(dirty_slots[j], server.lazyfree_lazy_server_del, true, false); } } @@ -3234,10 +3269,13 @@ static void *preparePingExt(clusterMsgPingExt *ext, uint16_t type, uint32_t leng return &ext->ext[0]; } -/* If value is nonempty and cursor_ptr points to a non-NULL cursor, writes a - * ping extension at the cursor, advances the cursor, increments totlen and - * returns 1. If value is nonempty and cursor_ptr points to NULL, just computes - * the size, increments totlen and returns 1. If value is empty, returns 0. */ +/* If given value is non-empty: + * - with non-NULL cursor, function writes a ping extension at the cursor, advances + * the cursor and increments totlen. + * - with NULL cursor, function just computes the size and increments totlen. + * If given value is empty, function does no computation. + * Returns 1 (added a new extension) or 0 (no extension added). + */ static uint32_t writeSdsPingExtIfNonempty(uint32_t *totlen_ptr, clusterMsgPingExt **cursor_ptr, clusterMsgPingtypes type, sds value) { size_t len = sdslen(value); @@ -3282,7 +3320,8 @@ static uint32_t writePingExtensions(clusterMsg *hdr, int gossipcount) { } /* Write simple optional SDS ping extensions. */ - extensions += writeSdsPingExtIfNonempty(&totlen, &cursor, CLUSTERMSG_EXT_TYPE_HOSTNAME, myself->hostname); + extensions += + writeSdsPingExtIfNonempty(&totlen, &cursor, CLUSTERMSG_EXT_TYPE_HOSTNAME, myself->hostname); extensions += writeSdsPingExtIfNonempty(&totlen, &cursor, CLUSTERMSG_EXT_TYPE_HUMAN_NODENAME, myself->human_nodename); extensions += @@ -3717,7 +3756,7 @@ int clusterProcessPacket(clusterLink *link) { "node %.40s (%s) with an equal or higher epoch %llu. Resetting the election " "since we cannot win an election in the past.", (unsigned long long)server.cluster->failover_auth_epoch, - sender->name, sender->human_nodename, + sender->name, humanNodename(sender), (unsigned long long)sender->configEpoch); /* Maybe we could start a new election, set a flag here to make sure * we check as soon as possible, instead of waiting for a cron. */ @@ -3824,7 +3863,7 @@ int clusterProcessPacket(clusterLink *link) { serverAssert(link != sender->link); serverLog(LL_NOTICE, "Freeing outbound link to node %.40s (%s) after receiving a MEET packet " "from this known node", - sender->name, sender->human_nodename); + sender->name, humanNodename(sender)); freeClusterLink(sender->link); } } @@ -3842,7 +3881,7 @@ int clusterProcessPacket(clusterLink *link) { /* Once we get a response for MEET from the sender, we can stop sending more MEET. */ sender->flags &= ~CLUSTER_NODE_MEET; serverLog(LL_NOTICE, "Successfully completed handshake with %.40s (%s)", sender->name, - sender->human_nodename); + humanNodename(sender)); } if (!link->inbound) { if (nodeInHandshake(link->node)) { @@ -3852,7 +3891,7 @@ int clusterProcessPacket(clusterLink *link) { serverLog(LL_VERBOSE, "Handshake: we already know node %.40s (%s), " "updating the address if needed.", - sender->name, sender->human_nodename); + sender->name, humanNodename(sender)); if (nodeUpdateAddressIfNeeded(sender, link, hdr)) { clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE); } @@ -3865,7 +3904,7 @@ int clusterProcessPacket(clusterLink *link) { /* First thing to do is replacing the random name with the * right node name if this was a handshake stage. */ clusterRenameNode(link->node, hdr->sender); - serverLog(LL_DEBUG, "Handshake with node %.40s (%s) completed.", link->node->name, link->node->human_nodename); + serverLog(LL_DEBUG, "Handshake with node %.40s (%s) completed.", link->node->name, humanNodename(link->node)); link->node->flags &= ~CLUSTER_NODE_HANDSHAKE; link->node->flags |= flags & (CLUSTER_NODE_PRIMARY | CLUSTER_NODE_REPLICA); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); @@ -3878,7 +3917,7 @@ int clusterProcessPacket(clusterLink *link) { serverLog(LL_NOTICE, "PONG contains mismatching sender ID. About node %.40s (%s) in shard %.40s added %d ms ago, " "having flags %d", - link->node->name, link->node->human_nodename, link->node->shard_id, + link->node->name, humanNodename(link->node), link->node->shard_id, (int)(now - (link->node->ctime)), link->node->flags); link->node->flags |= CLUSTER_NODE_NOADDR; link->node->ip[0] = '\0'; @@ -3945,7 +3984,7 @@ int clusterProcessPacket(clusterLink *link) { /* Node is a primary. */ if (sender_last_reported_as_replica) { serverLog(LL_DEBUG, "node %.40s (%s) announces that it is a %s in shard %.40s", sender->name, - sender->human_nodename, sender_claims_to_be_primary ? "primary" : "replica", sender->shard_id); + humanNodename(sender), sender_claims_to_be_primary ? "primary" : "replica", sender->shard_id); clusterSetNodeAsPrimary(sender); } } else { @@ -3954,7 +3993,7 @@ int clusterProcessPacket(clusterLink *link) { if (sender_last_reported_as_primary) { serverLog(LL_DEBUG, "node %.40s (%s) announces that it is a %s in shard %.40s", sender->name, - sender->human_nodename, sender_claims_to_be_primary ? "primary" : "replica", sender->shard_id); + humanNodename(sender), sender_claims_to_be_primary ? "primary" : "replica", sender->shard_id); /* Primary turned into a replica! Reconfigure the node. */ if (sender_claimed_primary && areInSameShard(sender_claimed_primary, sender)) { @@ -3963,7 +4002,7 @@ int clusterProcessPacket(clusterLink *link) { serverLog(LL_NOTICE, "Ignore stale message from %.40s (%s) in shard %.40s;" " gossip config epoch: %llu, current config epoch: %llu", - sender->name, sender->human_nodename, sender->shard_id, + sender->name, humanNodename(sender), sender->shard_id, (unsigned long long)sender_claimed_config_epoch, (unsigned long long)nodeEpoch(sender_claimed_primary)); /* This packet is stale so we avoid processing it anymore. Otherwise @@ -3984,8 +4023,8 @@ int clusterProcessPacket(clusterLink *link) { serverLog(LL_NOTICE, "A failover occurred in shard %.40s; node %.40s (%s) lost %d slot(s) and" " failed over to node %.40s (%s) with a config epoch of %llu", - sender->shard_id, sender->name, sender->human_nodename, slots, - sender_claimed_primary->name, sender_claimed_primary->human_nodename, + sender->shard_id, sender->name, humanNodename(sender), slots, + sender_claimed_primary->name, humanNodename(sender_claimed_primary), (unsigned long long)sender_claimed_primary->configEpoch); } if (importing_slots) { @@ -3993,14 +4032,14 @@ int clusterProcessPacket(clusterLink *link) { "A failover occurred in migration source. Update importing " "source of %d slot(s) to node %.40s (%s) in shard %.40s.", importing_slots, sender_claimed_primary->name, - sender_claimed_primary->human_nodename, sender_claimed_primary->shard_id); + humanNodename(sender_claimed_primary), sender_claimed_primary->shard_id); } if (migrating_slots) { serverLog(LL_NOTICE, "A failover occurred in migration target. Update migrating " "target of %d slot(s) to node %.40s (%s) in shard %.40s.", migrating_slots, sender_claimed_primary->name, - sender_claimed_primary->human_nodename, sender_claimed_primary->shard_id); + humanNodename(sender_claimed_primary), sender_claimed_primary->shard_id); } serverAssert(sender->numslots == 0); } @@ -4010,10 +4049,10 @@ int clusterProcessPacket(clusterLink *link) { serverLog(LL_NOTICE, "Node %.40s (%s) is no longer primary of shard %.40s;" " removed all %d slot(s) it used to own", - sender->name, sender->human_nodename, sender->shard_id, slots); + sender->name, humanNodename(sender), sender->shard_id, slots); if (sender_claimed_primary != NULL) { serverLog(LL_NOTICE, "Node %.40s (%s) is now part of shard %.40s", sender->name, - sender->human_nodename, sender_claimed_primary->shard_id); + humanNodename(sender), sender_claimed_primary->shard_id); } serverAssert(sender->numslots == 0); } @@ -4029,8 +4068,8 @@ int clusterProcessPacket(clusterLink *link) { if (sender_claimed_primary && sender->replicaof != sender_claimed_primary) { if (sender->replicaof) clusterNodeRemoveReplica(sender->replicaof, sender); serverLog(LL_NOTICE, "Node %.40s (%s) is now a replica of node %.40s (%s) in shard %.40s", - sender->name, sender->human_nodename, sender_claimed_primary->name, - sender_claimed_primary->human_nodename, sender_claimed_primary->shard_id); + sender->name, humanNodename(sender), sender_claimed_primary->name, + humanNodename(sender_claimed_primary), sender_claimed_primary->shard_id); clusterNodeAddReplica(sender_claimed_primary, sender); sender->replicaof = sender_claimed_primary; @@ -4093,7 +4132,7 @@ int clusterProcessPacket(clusterLink *link) { serverAssert(nodeIsPrimary(sender)); serverLog(LL_NOTICE, "Mismatch in topology information for sender node %.40s (%s) in shard %.40s", sender->name, - sender->human_nodename, sender->shard_id); + humanNodename(sender), sender->shard_id); /* 1) If the sender of the message is a primary, and we detected that * the set of slots it claims changed, scan the slots to see if we @@ -4120,14 +4159,15 @@ int clusterProcessPacket(clusterLink *link) { * failover if there are the conditions to win the election). */ for (int j = 0; j < CLUSTER_SLOTS; j++) { if (bitmapTestBit(hdr->myslots, j)) { - if (server.cluster->slots[j] == sender || isSlotUnclaimed(j)) continue; - if (server.cluster->slots[j]->configEpoch > sender_claimed_config_epoch) { + clusterNode *slot_owner = server.cluster->slots[j]; + if (slot_owner == sender || isSlotUnclaimed(j)) continue; + if (slot_owner->configEpoch > sender_claimed_config_epoch) { serverLog(LL_VERBOSE, "Node %.40s (%s) has old slots configuration, sending " "an UPDATE message about %.40s (%s)", - sender->name, sender->human_nodename, - server.cluster->slots[j]->name, server.cluster->slots[j]->human_nodename); - clusterSendUpdate(sender->link, server.cluster->slots[j]); + sender->name, humanNodename(sender), + slot_owner->name, humanNodename(slot_owner)); + clusterSendUpdate(sender->link, slot_owner); /* TODO: instead of exiting the loop send every other * UPDATE packet for other nodes that are the new owner @@ -4157,7 +4197,7 @@ int clusterProcessPacket(clusterLink *link) { failing = clusterLookupNode(hdr->data.fail.about.nodename, CLUSTER_NAMELEN); if (failing && !(failing->flags & (CLUSTER_NODE_FAIL | CLUSTER_NODE_MYSELF))) { serverLog(LL_NOTICE, "FAIL message received from %.40s (%s) about %.40s (%s)", hdr->sender, - sender->human_nodename, hdr->data.fail.about.nodename, failing->human_nodename); + humanNodename(sender), hdr->data.fail.about.nodename, humanNodename(failing)); markNodeAsFailing(failing); } } else { @@ -4192,7 +4232,7 @@ int clusterProcessPacket(clusterLink *link) { server.cluster->mf_replica = sender; pauseActions(PAUSE_DURING_FAILOVER, now + (server.cluster_mf_timeout * CLUSTER_MF_PAUSE_MULT), PAUSE_ACTIONS_CLIENT_WRITE_SET); - serverLog(LL_NOTICE, "Manual failover requested by replica %.40s (%s).", sender->name, sender->human_nodename); + serverLog(LL_NOTICE, "Manual failover requested by replica %.40s (%s).", sender->name, humanNodename(sender)); /* We need to send a ping message to the replica, as it would carry * `server.cluster->mf_primary_offset`, which means the primary paused clients * at offset `server.cluster->mf_primary_offset`, so that the replica would @@ -4209,7 +4249,8 @@ int clusterProcessPacket(clusterLink *link) { if (n->configEpoch >= reportedConfigEpoch) return 1; /* Nothing new. */ serverLog(LL_NOTICE, "Processing UPDATE message received from %.40s (%s) in shard %s about node %.40s (%s) in shard %s. old configEpoch %llu, new configEpoch %llu", - sender->name, sender->human_nodename, sender->shard_id, n->name, n->human_nodename, n->shard_id, + sender->name, humanNodename(sender), sender->shard_id, + n->name, humanNodename(n), n->shard_id, (unsigned long long)n->configEpoch, (unsigned long long)reportedConfigEpoch); /* If in our current config the node is a replica, set it as a primary. */ @@ -4491,7 +4532,7 @@ static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen) { /* If this node is a primary, we send its slots bitmap and configEpoch. * If this node is a replica we send the primary's information instead (the * node is flagged as replica so the receiver knows that it is NOT really - * in charge for this slots. */ + * in charge of these slots. */ primary = (nodeIsReplica(myself) && myself->replicaof) ? myself->replicaof : myself; hdr->ver = htons(CLUSTER_PROTO_VER); @@ -5043,7 +5084,7 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { * request, if the request epoch was greater. */ if (requestCurrentEpoch < server.cluster->currentEpoch) { serverLog(LL_WARNING, "Failover auth denied to %.40s (%s): reqEpoch (%llu) < curEpoch(%llu)", node->name, - node->human_nodename, (unsigned long long)requestCurrentEpoch, + humanNodename(node), (unsigned long long)requestCurrentEpoch, (unsigned long long)server.cluster->currentEpoch); return; } @@ -5051,7 +5092,7 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { /* I already voted for this epoch? Return ASAP. */ if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) { serverLog(LL_WARNING, "Failover auth denied to %.40s (%s): already voted for epoch %llu", node->name, - node->human_nodename, (unsigned long long)server.cluster->currentEpoch); + humanNodename(node), (unsigned long long)server.cluster->currentEpoch); return; } @@ -5061,13 +5102,13 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { if (clusterNodeIsPrimary(node) || primary == NULL || (!nodeFailed(primary) && !force_ack)) { if (clusterNodeIsPrimary(node)) { serverLog(LL_WARNING, "Failover auth denied to %.40s (%s) for epoch %llu: it is a primary node", node->name, - node->human_nodename, (unsigned long long)requestCurrentEpoch); + humanNodename(node), (unsigned long long)requestCurrentEpoch); } else if (primary == NULL) { serverLog(LL_WARNING, "Failover auth denied to %.40s (%s) for epoch %llu: I don't know its primary", - node->name, node->human_nodename, (unsigned long long)requestCurrentEpoch); + node->name, humanNodename(node), (unsigned long long)requestCurrentEpoch); } else if (!nodeFailed(primary)) { serverLog(LL_WARNING, "Failover auth denied to %.40s (%s) for epoch %llu: its primary is up", node->name, - node->human_nodename, (unsigned long long)requestCurrentEpoch); + humanNodename(node), (unsigned long long)requestCurrentEpoch); } return; } @@ -5077,7 +5118,8 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { * slots in the current configuration. */ for (j = 0; j < CLUSTER_SLOTS; j++) { if (bitmapTestBit(claimed_slots, j) == 0) continue; - if (isSlotUnclaimed(j) || server.cluster->slots[j]->configEpoch <= requestConfigEpoch) { + clusterNode *slot_owner = server.cluster->slots[j]; + if (isSlotUnclaimed(j) || slot_owner->configEpoch <= requestConfigEpoch) { continue; } /* If we reached this point we found a slot that in our current slots @@ -5086,7 +5128,8 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { serverLog(LL_WARNING, "Failover auth denied to %.40s (%s): " "slot %d epoch (%llu) > reqConfigEpoch (%llu)", - node->name, node->human_nodename, j, (unsigned long long)server.cluster->slots[j]->configEpoch, + node->name, humanNodename(node), j, + (unsigned long long)slot_owner->configEpoch, (unsigned long long)requestConfigEpoch); /* Send an UPDATE message to the replica. After receiving the UPDATE message, @@ -5095,9 +5138,9 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { serverLog(LL_VERBOSE, "Node %.40s (%s) has old slots configuration, sending " "an UPDATE message about %.40s (%s)", - node->name, node->human_nodename, - server.cluster->slots[j]->name, server.cluster->slots[j]->human_nodename); - clusterSendUpdate(node->link, server.cluster->slots[j]); + node->name, humanNodename(node), + slot_owner->name, humanNodename(slot_owner)); + clusterSendUpdate(node->link, slot_owner); return; } @@ -5105,7 +5148,7 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { server.cluster->lastVoteEpoch = server.cluster->currentEpoch; clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_FSYNC_CONFIG); clusterSendFailoverAuth(node); - serverLog(LL_NOTICE, "Failover auth granted to %.40s (%s) for epoch %llu", node->name, node->human_nodename, + serverLog(LL_NOTICE, "Failover auth granted to %.40s (%s) for epoch %llu", node->name, humanNodename(node), (unsigned long long)server.cluster->currentEpoch); } @@ -5259,7 +5302,7 @@ void clusterFailoverReplaceYourPrimary(void) { if (clusterNodeIsPrimary(myself) || old_primary == NULL) return; serverLog(LL_NOTICE, "Setting myself to primary in shard %.40s after failover; my old primary is %.40s (%s)", - myself->shard_id, old_primary->name, old_primary->human_nodename); + myself->shard_id, old_primary->name, humanNodename(old_primary)); /* 1) Turn this node into a primary. */ clusterSetNodeAsPrimary(myself); @@ -5411,7 +5454,7 @@ void clusterHandleReplicaFailover(void) { server.cluster->failover_auth_time - now, server.cluster->failover_auth_rank, server.cluster->failover_failed_primary_rank, replicationGetReplicaOffset()); /* Now that we have a scheduled election, broadcast our offset - * to all the other replicas so that they'll updated their offsets + * to all the other replicas so that they'll update their offsets * if our offset is better. */ clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_REPLICAS); @@ -5595,7 +5638,7 @@ void clusterHandleReplicaMigration(int max_replicas) { if (target && candidate == myself && (mstime() - target->orphaned_time) > CLUSTER_REPLICA_MIGRATION_DELAY && !(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER)) { serverLog(LL_NOTICE, "Migrating to orphaned primary %.40s (%s) in shard %.40s", target->name, - target->human_nodename, target->shard_id); + humanNodename(target), target->shard_id); /* We are migrating to a different shard that has a completely different * replication history, so a full sync is required. */ clusterSetPrimary(target, 1, 1); @@ -5747,7 +5790,7 @@ static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t now, long * leave the other node time to complete the handshake. */ node->flags |= CLUSTER_NODE_MEET; serverLog(LL_NOTICE, "Sending MEET packet to node %.40s (%s) because there is no inbound link for it", - node->name, node->human_nodename); + node->name, humanNodename(node)); clusterSendPing(node->link, CLUSTERMSG_TYPE_MEET); } @@ -5849,7 +5892,7 @@ void clusterCron(void) { long long cluster_node_conn_attempts = maxConnectionAttemptsPerCron(); while ((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); - /* We free the inbound or outboud link to the node if the link has an + /* We free the inbound or outbound link to the node if the link has an * oversized message send queue and immediately try reconnecting. */ clusterNodeCronFreeLinkOnBufferLimitReached(node); /* The protocol is that function(s) below return non-zero if the node was @@ -5879,7 +5922,7 @@ void clusterCron(void) { } } if (min_pong_node) { - serverLog(LL_DEBUG, "Pinging node %.40s (%s)", min_pong_node->name, min_pong_node->human_nodename); + serverLog(LL_DEBUG, "Pinging node %.40s (%s)", min_pong_node->name, humanNodename(min_pong_node)); clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING); } } @@ -5972,7 +6015,7 @@ void clusterCron(void) { if (clusterNodeIsVotingPrimary(myself)) { markNodeAsFailingIfNeeded(node); } else { - serverLog(LL_NOTICE, "NODE %.40s (%s) possibly failing.", node->name, node->human_nodename); + serverLog(LL_NOTICE, "NODE %.40s (%s) possibly failing.", node->name, humanNodename(node)); } } } @@ -6219,7 +6262,7 @@ void clusterMoveNodeSlots(clusterNode *from_node, clusterNode *to_node, int *slo serverLog(LL_VERBOSE, "Failover occurred in migration source. Update importing " "source for slot %d to node %.40s (%s) in shard %.40s.", - j, to_node->name, to_node->human_nodename, to_node->shard_id); + j, to_node->name, humanNodename(to_node), to_node->shard_id); setImportingSlotSource(j, to_node); importing_processed++; } @@ -6228,7 +6271,7 @@ void clusterMoveNodeSlots(clusterNode *from_node, clusterNode *to_node, int *slo serverLog(LL_VERBOSE, "Failover occurred in migration target." " Slot %d is now being migrated to node %.40s (%s) in shard %.40s.", - j, to_node->name, to_node->human_nodename, to_node->shard_id); + j, to_node->name, humanNodename(to_node), to_node->shard_id); setMigratingSlotDest(j, to_node); migrating_processed++; } @@ -6419,7 +6462,8 @@ int verifyClusterConfigWithData(void) { /* Check if we are assigned to this slot or if we are importing it. * In both cases check the next slot as the configuration makes * sense. */ - if (server.cluster->slots[j] == myself || getImportingSlotSource(j) != NULL) continue; + clusterNode *slot_owner = server.cluster->slots[j]; + if (slot_owner == myself || getImportingSlotSource(j) != NULL) continue; /* If we are here data and cluster config don't agree, and we have * slot 'j' populated even if we are not importing it, nor we are @@ -6428,13 +6472,13 @@ int verifyClusterConfigWithData(void) { update_config++; /* slot is unassigned. Take responsibility for it. */ clusterNode *in = getImportingSlotSource(j); - if (server.cluster->slots[j] == NULL) { + if (slot_owner == NULL) { serverLog(LL_NOTICE, "I have keys for unassigned slot %d. " "Taking responsibility for it.", j); clusterAddSlot(myself, j); - } else if (in != server.cluster->slots[j]) { + } else if (in != slot_owner) { if (in == NULL) { serverLog(LL_NOTICE, "I have keys for slot %d, but the slot is " @@ -6444,10 +6488,8 @@ int verifyClusterConfigWithData(void) { serverLog(LL_NOTICE, "I am importing keys from node %.40s (%s) in shard %.40s to slot %d, " "but the slot is now owned by node %.40s (%s) in shard %.40s. Deleting keys in the slot", - in->name, - in->human_nodename, - in->shard_id, j, server.cluster->slots[j]->name, - server.cluster->slots[j]->human_nodename, server.cluster->slots[j]->shard_id); + in->name, humanNodename(in), in->shard_id, j, + slot_owner->name, humanNodename(slot_owner), slot_owner->shard_id); } delKeysInSlot(j, server.lazyfree_lazy_server_del, true, false); } @@ -7471,10 +7513,10 @@ void clusterCommandSetSlot(client *c) { /* Slot states have been updated on the compatible replicas (if any). * Now execute the command on the primary. */ if (!strcasecmp(c->argv[3]->ptr, "migrating")) { - serverLog(LL_NOTICE, "Migrating slot %d to node %.40s (%s)", slot, n->name, n->human_nodename); + serverLog(LL_NOTICE, "Migrating slot %d to node %.40s (%s)", slot, n->name, humanNodename(n)); setMigratingSlotDest(slot, n); } else if (!strcasecmp(c->argv[3]->ptr, "importing")) { - serverLog(LL_NOTICE, "Importing slot %d from node %.40s (%s)", slot, n->name, n->human_nodename); + serverLog(LL_NOTICE, "Importing slot %d from node %.40s (%s)", slot, n->name, humanNodename(n)); setImportingSlotSource(slot, n); } else if (!strcasecmp(c->argv[3]->ptr, "stable")) { /* CLUSTER SETSLOT STABLE */ @@ -7483,7 +7525,7 @@ void clusterCommandSetSlot(client *c) { setMigratingSlotDest(slot, NULL); } else if (!strcasecmp(c->argv[3]->ptr, "node")) { /* CLUSTER SETSLOT NODE */ - serverLog(LL_NOTICE, "Assigning slot %d to node %.40s (%s) in shard %.40s", slot, n->name, n->human_nodename, + serverLog(LL_NOTICE, "Assigning slot %d to node %.40s (%s) in shard %.40s", slot, n->name, humanNodename(n), n->shard_id); /* If this slot is in migrating status but we have no keys @@ -7507,7 +7549,7 @@ void clusterCommandSetSlot(client *c) { serverLog(LL_NOTICE, "Lost my last slot during slot migration. Reconfiguring myself " "as a replica of %.40s (%s) in shard %.40s", - n->name, n->human_nodename, n->shard_id); + n->name, humanNodename(n), n->shard_id); /* `c` is the primary client if `myself` is a replica, prevent it * from being freed by clusterSetPrimary. */ if (nodeIsReplica(myself)) protectClient(c); @@ -7526,7 +7568,7 @@ void clusterCommandSetSlot(client *c) { serverAssert(n != my_primary); serverLog(LL_NOTICE, "My last slot was migrated to node %.40s (%s) in shard %.40s. I am now an empty primary.", - n->name, n->human_nodename, n->shard_id); + n->name, humanNodename(n), n->shard_id); } /* If this node or this node's primary was importing this slot, diff --git a/tests/support/util.tcl b/tests/support/util.tcl index 5a001c04a8..c7e8e58101 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -201,7 +201,7 @@ proc verify_log_message {srv_idx pattern from_line} { incr from_line set result [exec tail -n +$from_line < [srv $srv_idx stdout]] if {![string match $pattern $result]} { - fail "expected message not found in log file: $pattern" + fail "expected pattern found in log file: $pattern, but instead got: $result" } } diff --git a/tests/unit/cluster/human-announced-nodename.tcl b/tests/unit/cluster/human-announced-nodename.tcl index 1f1505d500..fb065bfe75 100644 --- a/tests/unit/cluster/human-announced-nodename.tcl +++ b/tests/unit/cluster/human-announced-nodename.tcl @@ -1,5 +1,25 @@ # Check if cluster's view of human announced nodename is reported in logs start_cluster 4 0 {tags {external:skip cluster}} { + for {set j 0} {$j < [llength $::servers]} {incr j} { + R $j config set loglevel debug + R $j config set loglevel debug + } + + set RO_node_id [dict get [cluster_get_myself 0] id] + set R0_port [srv 0 port] + set R1_port [srv -1 port] + set R2_port [srv -2 port] + set R3_port [srv -3 port] + + test "Use ip:port in logging when human nodenames are not explicitly set" { + wait_for_log_messages 0 [list "*Sending ping packet to node * (127.0.0.1:$R1_port) *"] 0 1000 10 + wait_for_log_messages 0 [list "*Sending ping packet to node * (127.0.0.1:$R2_port) *"] 0 1000 10 + wait_for_log_messages 0 [list "*Sending ping packet to node * (127.0.0.1:$R3_port) *"] 0 1000 10 + wait_for_log_messages -1 [list "*Sending ping packet to node $RO_node_id (127.0.0.1:$R0_port) *"] 0 1000 10 + wait_for_log_messages -2 [list "*Sending ping packet to node $RO_node_id (127.0.0.1:$R0_port) *"] 0 1000 10 + wait_for_log_messages -3 [list "*Sending ping packet to node $RO_node_id (127.0.0.1:$R0_port) *"] 0 1000 10 + } + test "Set cluster human announced nodename and let it propagate" { for {set j 0} {$j < [llength $::servers]} {incr j} { R $j config set cluster-announce-hostname "host-$j.com" @@ -13,22 +33,13 @@ start_cluster 4 0 {tags {external:skip cluster}} { } else { fail "cluster hostnames were not propagated" } - } - - test "Human nodenames are visible in log messages" { - # Pause instance 0, so everyone thinks it is dead - pause_process [srv 0 pid] - pause_process [srv -1 pid] - - # We're going to use a message we will know will be sent, node unreachable, - # since it includes the other node gossiping. - wait_for_log_messages -2 {"*Node * (nodename-3) reported node * (nodename-0) as not reachable*"} 0 20 500 - wait_for_log_messages -3 {"*Node * (nodename-2) reported node * (nodename-0) as not reachable*"} 0 20 500 - wait_for_log_messages -2 {"*Node * (nodename-3) reported node * (nodename-1) as not reachable*"} 0 20 500 - wait_for_log_messages -3 {"*Node * (nodename-2) reported node * (nodename-1) as not reachable*"} 0 20 500 - - resume_process [srv 0 pid] - resume_process [srv -1 pid] + # Ensure the human nodenames are visible in logs + wait_for_log_messages 0 [list "*Sending ping packet to node * (nodename-1) *"] 0 1000 10 + wait_for_log_messages 0 [list "*Sending ping packet to node * (nodename-2) *"] 0 1000 10 + wait_for_log_messages 0 [list "*Sending ping packet to node * (nodename-3) *"] 0 1000 10 + wait_for_log_messages -1 [list "*Sending ping packet to node $RO_node_id (nodename-0) *"] 0 1000 10 + wait_for_log_messages -2 [list "*Sending ping packet to node $RO_node_id (nodename-0) *"] 0 1000 10 + wait_for_log_messages -3 [list "*Sending ping packet to node $RO_node_id (nodename-0) *"] 0 1000 10 } } diff --git a/tests/unit/cluster/manual-failover.tcl b/tests/unit/cluster/manual-failover.tcl index 56952ff008..a585ba0127 100644 --- a/tests/unit/cluster/manual-failover.tcl +++ b/tests/unit/cluster/manual-failover.tcl @@ -418,6 +418,9 @@ start_cluster 3 1 {tags {external:skip cluster}} { set R2_nodeid [R 2 cluster myid] set R3_nodeid [R 3 cluster myid] + set R0_port [srv 0 port] + set R3_port [srv -3 port] + set R0_shardid [R 0 cluster myshardid] set R3_shardid [R 3 cluster myshardid] assert_equal $R0_shardid $R3_shardid @@ -478,21 +481,21 @@ start_cluster 3 1 {tags {external:skip cluster}} { # verify we print the logs. # Both importing slots and migrating slots are move to R3. - set pattern "*Failover occurred in migration source. Update importing source for slot 0 to node $R3_nodeid () in shard $R3_shardid*" + set pattern "*Failover occurred in migration source. Update importing source for slot 0 to node $R3_nodeid (127.0.0.1:$R3_port) in shard $R3_shardid*" verify_log_message -1 $pattern $loglines1 - set pattern "*Failover occurred in migration target. Slot 5462 is now being migrated to node $R3_nodeid () in shard $R3_shardid*" + set pattern "*Failover occurred in migration target. Slot 5462 is now being migrated to node $R3_nodeid (127.0.0.1:$R3_port) in shard $R3_shardid*" verify_log_message -1 $pattern $loglines1 # Both slots are move to R3. set R0_slots 5462 - set pattern "*A failover occurred in shard $R3_shardid; node $R0_nodeid () lost $R0_slots slot(s) and failed over to node $R3_nodeid*" + set pattern "*A failover occurred in shard $R3_shardid; node $R0_nodeid (127.0.0.1:$R0_port) lost $R0_slots slot(s) and failed over to node $R3_nodeid*" verify_log_message -1 $pattern $loglines1 verify_log_message -2 $pattern $loglines2 # Both importing slots and migrating slots are move to R3. - set pattern "*A failover occurred in migration source. Update importing source of 1 slot(s) to node $R3_nodeid () in shard $R3_shardid*" + set pattern "*A failover occurred in migration source. Update importing source of 1 slot(s) to node $R3_nodeid (127.0.0.1:$R3_port) in shard $R3_shardid*" verify_log_message -1 $pattern $loglines1 - set pattern "*A failover occurred in migration target. Update migrating target of 1 slot(s) to node $R3_nodeid () in shard $R3_shardid*" + set pattern "*A failover occurred in migration target. Update migrating target of 1 slot(s) to node $R3_nodeid (127.0.0.1:$R3_port) in shard $R3_shardid*" verify_log_message -1 $pattern $loglines1 R 1 debug disable-cluster-reconnection 0