diff --git a/pom.xml b/pom.xml index d6b5bc6d6e..810da48202 100644 --- a/pom.xml +++ b/pom.xml @@ -348,6 +348,11 @@ src/test/java/redis/clients/jedis/commands/jedis/ClusterStreamsCommandsTest.java src/test/java/redis/clients/jedis/commands/jedis/PooledStreamsCommandsTest.java src/test/java/redis/clients/jedis/resps/StreamEntryDeletionResultTest.java + **/Maintenance*.java + **/Push*.java + **/Rebind*.java + src/test/java/redis/clients/jedis/upgrade/*.java + src/test/java/redis/clients/jedis/util/server/*.java diff --git a/src/main/java/redis/clients/jedis/Connection.java b/src/main/java/redis/clients/jedis/Connection.java index de473d0b8e..3104c06226 100644 --- a/src/main/java/redis/clients/jedis/Connection.java +++ b/src/main/java/redis/clients/jedis/Connection.java @@ -1,14 +1,17 @@ package redis.clients.jedis; +import static redis.clients.jedis.PushConsumerChain.PROPAGATE_ALL_HANDLER; import static redis.clients.jedis.util.SafeEncoder.encode; import java.io.Closeable; import java.io.IOException; +import java.lang.ref.WeakReference; import java.net.Socket; import java.net.SocketAddress; import java.net.SocketException; import java.nio.ByteBuffer; import java.nio.CharBuffer; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -16,9 +19,12 @@ import java.util.function.Supplier; import java.util.concurrent.atomic.AtomicReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import redis.clients.jedis.Protocol.Command; import redis.clients.jedis.Protocol.Keyword; import redis.clients.jedis.annots.Experimental; +import redis.clients.jedis.annots.VisibleForTesting; import redis.clients.jedis.args.ClientAttributeOption; import redis.clients.jedis.args.Rawable; import redis.clients.jedis.authentication.AuthXManager; @@ -30,8 +36,10 @@ import redis.clients.jedis.util.IOUtils; import redis.clients.jedis.util.RedisInputStream; import redis.clients.jedis.util.RedisOutputStream; +import redis.clients.jedis.util.SafeEncoder; public class Connection implements Closeable { + public static Logger logger = LoggerFactory.getLogger(Connection.class); private ConnectionPool memberOf; protected RedisProtocol protocol; @@ -39,6 +47,9 @@ public class Connection implements Closeable { private Socket socket; private RedisOutputStream outputStream; private RedisInputStream inputStream; + private boolean relaxedTimeoutEnabled = false; + private int relaxedTimeout = safeToInt(TimeoutOptions.DISABLED_TIMEOUT.toMillis()); + private int relaxedBlockingTimeout = safeToInt(TimeoutOptions.DISABLED_TIMEOUT.toMillis()); private int soTimeout = 0; private int infiniteSoTimeout = 0; private boolean broken = false; @@ -48,7 +59,11 @@ public class Connection implements Closeable { protected String version; private AtomicReference currentCredentials = new AtomicReference<>(null); private AuthXManager authXManager; + private boolean isBlocking = false; + private boolean isRelaxed = false; + private boolean rebindRequested = false; + protected PushConsumerChain pushConsumer; public Connection() { this(Protocol.DEFAULT_HOST, Protocol.DEFAULT_PORT); } @@ -68,15 +83,63 @@ public Connection(final HostAndPort hostAndPort, final JedisClientConfig clientC public Connection(final JedisSocketFactory socketFactory) { this.socketFactory = socketFactory; this.authXManager = null; + + initPushConsumers(null); } public Connection(final JedisSocketFactory socketFactory, JedisClientConfig clientConfig) { this.socketFactory = socketFactory; this.soTimeout = clientConfig.getSocketTimeoutMillis(); this.infiniteSoTimeout = clientConfig.getBlockingSocketTimeoutMillis(); + this.relaxedTimeout = safeToInt(clientConfig.getTimeoutOptions().getRelaxedTimeout().toMillis()); + this.relaxedBlockingTimeout = safeToInt(clientConfig.getTimeoutOptions().getRelaxedBlockingTimeout().toMillis()); + this.relaxedTimeoutEnabled = TimeoutOptions.isRelaxedTimeoutEnabled(relaxedTimeout) || + TimeoutOptions.isRelaxedTimeoutEnabled(relaxedBlockingTimeout); + initPushConsumers(clientConfig); initializeFromClientConfig(clientConfig); } + + protected void initPushConsumers(JedisClientConfig config) { + /* + * Default consumers to process push messages. + * Marks all @{link PushMessage}s as processed, except for pub/sub. + * Pub/sub messages are propagated to the client. + */ + this.pushConsumer = PushConsumerChain.of( + PushConsumerChain.CONSUME_ALL_HANDLER, + PushConsumerChain.PUBSUB_ONLY_HANDLER + ); + + if (config != null) { + + /* + * Add consumer to handle server maintenance events. + * Maintenance events are propagated to the registered {@link MaintenanceEventListener}s. + */ + MaintenanceEventHandler maintenanceEventHandler = config.getMaintenanceEventHandler(); + if (maintenanceEventHandler != null) { + this.pushConsumer.add(new MaintenanceEventConsumer(maintenanceEventHandler)); + + if (config.isProactiveRebindEnabled()) { + maintenanceEventHandler.addListener(new ConnectionRebindHandler()); + } + + if (TimeoutOptions.isRelaxedTimeoutEnabled(config.getTimeoutOptions().getRelaxedTimeout())) { + maintenanceEventHandler.addListener(new AdaptiveTimeoutHandler(Connection.this)); + } + } + + /* + * Add consumer to notify registered {@link PushListener}s. + */ + PushHandler pushHandler = config.getPushHandler(); + if (pushHandler != null) { + this.pushConsumer.add(new ListenerNotificationConsumer(pushHandler)); + } + } + } + @Override public String toString() { return getClass().getSimpleName() + "{" + socketFactory + "}"; @@ -152,7 +215,8 @@ public void setTimeoutInfinite() { public void rollbackTimeout() { try { - socket.setSoTimeout(this.soTimeout); + int timeout = getDesiredTimeout(); + socket.setSoTimeout(timeout); } catch (SocketException ex) { setBroken(); throw new JedisConnectionException(ex); @@ -175,9 +239,11 @@ public T executeCommand(final CommandObject commandObject) { return commandObject.getBuilder().build(getOne()); } else { try { + isBlocking = true; setTimeoutInfinite(); return commandObject.getBuilder().build(getOne()); } finally { + isBlocking = false; rollbackTimeout(); } } @@ -261,7 +327,7 @@ public void close() { if (this.memberOf != null) { ConnectionPool pool = this.memberOf; this.memberOf = null; - if (isBroken()) { + if (isBroken() || isRebindRequested()) { pool.returnBrokenResource(this); } else { pool.returnResource(this); @@ -271,6 +337,10 @@ public void close() { } } + private boolean isRebindRequested() { + return rebindRequested; + } + /** * Close the socket and disconnect the server. */ @@ -303,7 +373,7 @@ public void setBroken() { public String getStatusCodeReply() { flush(); - final byte[] resp = (byte[]) readProtocolWithCheckingBroken(); + final byte[] resp = (byte[]) readProtocolWithCheckingBroken(pushConsumer); if (null == resp) { return null; } else { @@ -322,12 +392,12 @@ public String getBulkReply() { public byte[] getBinaryBulkReply() { flush(); - return (byte[]) readProtocolWithCheckingBroken(); + return (byte[]) readProtocolWithCheckingBroken(pushConsumer); } public Long getIntegerReply() { flush(); - return (Long) readProtocolWithCheckingBroken(); + return (Long) readProtocolWithCheckingBroken(pushConsumer); } public List getMultiBulkReply() { @@ -337,7 +407,7 @@ public List getMultiBulkReply() { @SuppressWarnings("unchecked") public List getBinaryMultiBulkReply() { flush(); - return (List) readProtocolWithCheckingBroken(); + return (List) readProtocolWithCheckingBroken(pushConsumer); } /** @@ -346,28 +416,28 @@ public List getBinaryMultiBulkReply() { @Deprecated @SuppressWarnings("unchecked") public List getUnflushedObjectMultiBulkReply() { - return (List) readProtocolWithCheckingBroken(); + return (List) readProtocolWithCheckingBroken(pushConsumer); } @SuppressWarnings("unchecked") public Object getUnflushedObject() { - return readProtocolWithCheckingBroken(); + return readProtocolWithCheckingBroken(pushConsumer); } public List getObjectMultiBulkReply() { flush(); - return (List) readProtocolWithCheckingBroken(); + return (List) readProtocolWithCheckingBroken(pushConsumer); } @SuppressWarnings("unchecked") public List getIntegerMultiBulkReply() { flush(); - return (List) readProtocolWithCheckingBroken(); + return (List) readProtocolWithCheckingBroken(pushConsumer); } public Object getOne() { flush(); - return readProtocolWithCheckingBroken(); + return readProtocolWithCheckingBroken(pushConsumer); } protected void flush() { @@ -380,21 +450,39 @@ protected void flush() { } @Experimental - protected Object protocolRead(RedisInputStream is) { - return Protocol.read(is); + protected Object protocolRead(RedisInputStream is, PushConsumer handler) { + return Protocol.read(is, handler); } @Experimental protected void protocolReadPushes(RedisInputStream is) { } + protected Object readProtocolWithCheckingBroken(PushConsumer handler) { + if (broken) { + throw new JedisConnectionException("Attempting to read from a broken connection."); + } + + try { + return protocolRead(inputStream, handler); + } catch (JedisConnectionException exc) { + broken = true; + throw exc; + } + } + + /** + * @deprecated Use {@link #readProtocolWithCheckingBroken(PushConsumer)} + * @return + */ + @Deprecated protected Object readProtocolWithCheckingBroken() { if (broken) { throw new JedisConnectionException("Attempting to read from a broken connection."); } try { - return protocolRead(inputStream); + return protocolRead(inputStream, PROPAGATE_ALL_HANDLER); } catch (JedisConnectionException exc) { broken = true; throw exc; @@ -424,7 +512,7 @@ public List getMany(final int count) { final List responses = new ArrayList<>(count); for (int i = 0; i < count; i++) { try { - responses.add(readProtocolWithCheckingBroken()); + responses.add(readProtocolWithCheckingBroken(pushConsumer)); } catch (JedisDataException e) { responses.add(e); } @@ -614,4 +702,279 @@ protected boolean isTokenBasedAuthenticationEnabled() { protected AuthXManager getAuthXManager() { return authXManager; } + + @Experimental + @VisibleForTesting + PushConsumerChain getPushConsumer() { + return this.pushConsumer; + } + + @Experimental + public boolean isRelaxedTimeoutActive() { + return isRelaxed; + } + + /** + * Calculate the desired timeout based on current state (blocking/non-blocking and relaxed/normal). + * When relaxed timeouts are enabled, use configured relaxed timeout if available, otherwise fallback to default timeout. + */ + private int getDesiredTimeout() { + if (!isRelaxed) { + if (!isBlocking) { + return soTimeout; + } else { + return infiniteSoTimeout; + } + } else { + if (!isBlocking) { + // Use relaxed timeout if configured, otherwise fallback to normal timeout + return TimeoutOptions.isRelaxedTimeoutDisabled(relaxedTimeout) ? soTimeout : relaxedTimeout; + } else { + // Use relaxed blocking timeout if configured, otherwise fallback to infinite timeout + return TimeoutOptions.isRelaxedTimeoutDisabled(relaxedBlockingTimeout) ? infiniteSoTimeout : relaxedBlockingTimeout; + } + } + } + + @Experimental + public void relaxTimeouts() { + if (!relaxedTimeoutEnabled) { + return; + } + + if (!isRelaxed) { + isRelaxed = true; + try { + if (isConnected()) { + socket.setSoTimeout(getDesiredTimeout()); + } + } catch (SocketException ex) { + setBroken(); + throw new JedisConnectionException(ex); + } + } + } + + @Experimental + public void disableRelaxedTimeout() { + if (isRelaxed) { + isRelaxed = false; + try { + if (isConnected()) { + socket.setSoTimeout(getDesiredTimeout()); + } + } catch (SocketException ex) { + setBroken(); + throw new JedisConnectionException(ex); + } + } + } + + private static int safeToInt(long millis) { + if (millis > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } + + return (int) millis; + } + /** + * Push consumer that delegates to a {@link PushHandler} for listener notification. + */ + private static class ListenerNotificationConsumer implements PushConsumer { + private final PushHandler pushHandler; + + public ListenerNotificationConsumer(PushHandler pushHandler) { + this.pushHandler = pushHandler; + } + + @Override + public void accept(PushConsumerContext context) { + if (pushHandler != null) { + notifyListeners(context.getMessage()); + } + } + + private void notifyListeners(PushMessage pushMessage) { + try { + pushHandler.getPushListeners().forEach(pushListener -> { + try { + pushListener.onPush(pushMessage); + } catch (Exception e) { + // ignore + } + }); + } catch (Exception e) { + // Log notification failures + } + } + } + + + private static class MaintenanceEventConsumer implements PushConsumer { + private final MaintenanceEventHandler eventHandler; + + public MaintenanceEventConsumer(MaintenanceEventHandler eventHandler) { + this.eventHandler = eventHandler; + } + + @Override + public void accept(PushConsumerContext context) { + PushMessage message = context.getMessage(); + + switch ( message.getType()) { + case "MOVING": + onMoving(message); + break; + case "MIGRATING": + onMigrating(); + break; + case "MIGRATED": + onMigrated(); + break; + case "FAILING_OVER": + onFailOver(); + break; + case "FAILED_OVER": + onFailedOver(); + break; + } + } + private void onMoving(PushMessage message) { + RebindEvent rebindEvent = getRebindTarget(message); + if (rebindEvent == null) { + return; + } + eventHandler.getListeners().forEach(listener -> listener.onRebind(rebindEvent.target,rebindEvent.rebindTimeout)); + } + + private void onMigrating() { + eventHandler.getListeners().forEach(MaintenanceEventListener::onMigrating); + } + + private void onMigrated() { + eventHandler.getListeners().forEach(MaintenanceEventListener::onMigrated); + } + + private void onFailOver() { + eventHandler.getListeners().forEach(MaintenanceEventListener::onFailOver); + } + + private void onFailedOver() { + eventHandler.getListeners().forEach(MaintenanceEventListener::onFailedOver); + } + + private RebindEvent getRebindTarget(PushMessage message) { + // Extract domain/ip and port from the message + // MOVING push message format: ["MOVING", slot, "host:port"] + List content = message.getContent(); + + if (content.size() < 3) { + logger.warn("MOVING push message is malformed: {}", message); + return null; + } + + Object timeObject = content.get(1); // Get the 3rd element (index 2) + if (!(timeObject instanceof Long)) { + logger.warn("Invalid re-bind message format, expected 2rd element to be a