Skip to content

fix: lettuce连接池问题修复,以及增加AutoCloseable接口进行资源管理 #176

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@
<version>1.11.0</version>
</dependency>

<!-- memory cache -->
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.9.3</version>
<optional>true</optional>
</dependency>

<!-- Test -->
<dependency>
<groupId>org.junit.jupiter</groupId>
Expand Down
60 changes: 49 additions & 11 deletions src/main/java/org/crazycake/shiro/LettuceRedisClusterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* @author Teamo
* @since 2022/05/19
*/
public class LettuceRedisClusterManager implements IRedisManager {
public class LettuceRedisClusterManager implements IRedisManager, AutoCloseable {

/**
* Comma-separated list of "host:port" pairs to bootstrap from. This represents an
Expand Down Expand Up @@ -82,14 +82,18 @@ public class LettuceRedisClusterManager implements IRedisManager {
*/
private ClusterClientOptions clusterClientOptions = ClusterClientOptions.create();

/**
* RedisClusterClient.
*/
private RedisClusterClient redisClusterClient;

private void initialize() {
if (genericObjectPool == null) {
synchronized (LettuceRedisClusterManager.class) {
if (genericObjectPool == null) {
RedisClusterClient redisClusterClient = RedisClusterClient.create(getClusterRedisURI());
redisClusterClient = RedisClusterClient.create(getClusterRedisURI());
redisClusterClient.setOptions(clusterClientOptions);
StatefulRedisClusterConnection<byte[], byte[]> connect = redisClusterClient.connect(new ByteArrayCodec());
genericObjectPool = ConnectionPoolSupport.createGenericObjectPool(() -> connect, genericObjectPoolConfig);
genericObjectPool = ConnectionPoolSupport.createGenericObjectPool(() -> redisClusterClient.connect(new ByteArrayCodec()), genericObjectPoolConfig);
}
}
}
Expand All @@ -106,6 +110,12 @@ private StatefulRedisClusterConnection<byte[], byte[]> getStatefulConnection() {
}
}

private void returnObject(StatefulRedisClusterConnection<byte[], byte[]> connection) {
if (connection != null) {
genericObjectPool.returnObject(connection);
}
}

private List<RedisURI> getClusterRedisURI() {
Objects.requireNonNull(nodes, "nodes must not be null!");
return nodes.stream().map(node -> {
Expand All @@ -128,14 +138,18 @@ public byte[] get(byte[] key) {
return null;
}
byte[] value = null;
try (StatefulRedisClusterConnection<byte[], byte[]> connection = getStatefulConnection()) {
StatefulRedisClusterConnection<byte[], byte[]> connection = null;
try {
connection = getStatefulConnection();
if (isAsync) {
RedisAdvancedClusterAsyncCommands<byte[], byte[]> async = connection.async();
value = LettuceFutures.awaitOrCancel(async.get(key), timeout.getSeconds(), TimeUnit.SECONDS);
} else {
RedisAdvancedClusterCommands<byte[], byte[]> sync = connection.sync();
value = sync.get(key);
}
} finally {
returnObject(connection);
}
return value;
}
Expand All @@ -145,7 +159,9 @@ public byte[] set(byte[] key, byte[] value, int expire) {
if (key == null) {
return null;
}
try (StatefulRedisClusterConnection<byte[], byte[]> connection = getStatefulConnection()) {
StatefulRedisClusterConnection<byte[], byte[]> connection = null;
try {
connection = getStatefulConnection();
if (isAsync) {
RedisAdvancedClusterAsyncCommands<byte[], byte[]> async = connection.async();
if (expire > 0) {
Expand All @@ -161,28 +177,35 @@ public byte[] set(byte[] key, byte[] value, int expire) {
sync.set(key, value);
}
}
} finally {
returnObject(connection);
}
return value;
}

@Override
public void del(byte[] key) {
try (StatefulRedisClusterConnection<byte[], byte[]> connection = getStatefulConnection()) {
StatefulRedisClusterConnection<byte[], byte[]> connection = null;
try {
connection = getStatefulConnection();
if (isAsync) {
RedisAdvancedClusterAsyncCommands<byte[], byte[]> async = connection.async();
async.del(key);
} else {
RedisAdvancedClusterCommands<byte[], byte[]> sync = connection.sync();
sync.del(key);
}
} finally {
returnObject(connection);
}
}

@Override
public Long dbSize(byte[] pattern) {
AtomicLong dbSize = new AtomicLong(0L);

try (StatefulRedisClusterConnection<byte[], byte[]> connection = getStatefulConnection()) {
StatefulRedisClusterConnection<byte[], byte[]> connection = null;
try {
connection = getStatefulConnection();
if (isAsync) {
RedisAdvancedClusterAsyncCommands<byte[], byte[]> async = connection.async();
Partitions parse = ClusterPartitionParser.parse(LettuceFutures.awaitOrCancel(async.clusterNodes(), timeout.getSeconds(), TimeUnit.SECONDS));
Expand Down Expand Up @@ -214,15 +237,18 @@ public Long dbSize(byte[] pattern) {
}
});
}
} finally {
returnObject(connection);
}
return dbSize.get();
}

@Override
public Set<byte[]> keys(byte[] pattern) {
Set<byte[]> keys = new HashSet<>();

try (StatefulRedisClusterConnection<byte[], byte[]> connection = getStatefulConnection()) {
StatefulRedisClusterConnection<byte[], byte[]> connection = null;
try {
connection = getStatefulConnection();
if (isAsync) {
RedisAdvancedClusterAsyncCommands<byte[], byte[]> async = connection.async();
Partitions parse = ClusterPartitionParser.parse(LettuceFutures.awaitOrCancel(async.clusterNodes(), timeout.getSeconds(), TimeUnit.SECONDS));
Expand Down Expand Up @@ -254,10 +280,22 @@ public Set<byte[]> keys(byte[] pattern) {
}
});
}
} finally {
returnObject(connection);
}
return keys;
}

@Override
public void close() throws Exception {
if (genericObjectPool != null) {
genericObjectPool.close();
}
if (redisClusterClient != null) {
redisClusterClient.shutdown();
}
}

public List<String> getNodes() {
return nodes;
}
Expand Down
37 changes: 33 additions & 4 deletions src/main/java/org/crazycake/shiro/LettuceRedisManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* @author Teamo
* @since 2022/05/18
*/
public class LettuceRedisManager extends AbstractLettuceRedisManager {
public class LettuceRedisManager extends AbstractLettuceRedisManager<StatefulRedisConnection<byte[], byte[]>> {

/**
* Redis server host.
Expand All @@ -33,14 +33,18 @@ public class LettuceRedisManager extends AbstractLettuceRedisManager {
*/
private volatile GenericObjectPool<StatefulRedisConnection<byte[], byte[]>> genericObjectPool;

@SuppressWarnings({"unchecked", "rawtypes"})
/**
* Redis client.
*/
private RedisClient redisClient;

private void initialize() {
if (genericObjectPool == null) {
synchronized (LettuceRedisManager.class) {
if (genericObjectPool == null) {
RedisClient redisClient = RedisClient.create(createRedisURI());
redisClient = RedisClient.create(createRedisURI());
redisClient.setOptions(getClientOptions());
GenericObjectPoolConfig genericObjectPoolConfig = getGenericObjectPoolConfig();
GenericObjectPoolConfig<StatefulRedisConnection<byte[], byte[]>> genericObjectPoolConfig = getGenericObjectPoolConfig();
genericObjectPool = ConnectionPoolSupport.createGenericObjectPool(() -> redisClient.connect(new ByteArrayCodec()), genericObjectPoolConfig);
}
}
Expand Down Expand Up @@ -72,6 +76,23 @@ protected StatefulRedisConnection<byte[], byte[]> getStatefulConnection() {
}
}

@Override
protected void returnObject(StatefulRedisConnection<byte[], byte[]> connect) {
if (connect != null) {
genericObjectPool.returnObject(connect);
}
}

@Override
public void close() throws Exception {
if (genericObjectPool != null) {
genericObjectPool.close();
}
if (redisClient != null) {
redisClient.shutdown();
}
}

public String getHost() {
return host;
}
Expand All @@ -87,4 +108,12 @@ public int getPort() {
public void setPort(int port) {
this.port = port;
}

public GenericObjectPool<StatefulRedisConnection<byte[], byte[]>> getGenericObjectPool() {
return genericObjectPool;
}

public void setGenericObjectPool(GenericObjectPool<StatefulRedisConnection<byte[], byte[]>> genericObjectPool) {
this.genericObjectPool = genericObjectPool;
}
}
45 changes: 38 additions & 7 deletions src/main/java/org/crazycake/shiro/LettuceRedisSentinelManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* @author Teamo
* @since 2022/05/19
*/
public class LettuceRedisSentinelManager extends AbstractLettuceRedisManager {
public class LettuceRedisSentinelManager extends AbstractLettuceRedisManager<StatefulRedisMasterReplicaConnection<byte[], byte[]>> {
private static final String DEFAULT_MASTER_NAME = "mymaster";

private String masterName = DEFAULT_MASTER_NAME;
Expand All @@ -35,18 +35,24 @@ public class LettuceRedisSentinelManager extends AbstractLettuceRedisManager {
*/
private volatile GenericObjectPool<StatefulRedisMasterReplicaConnection<byte[], byte[]>> genericObjectPool;

@SuppressWarnings({"unchecked", "rawtypes"})
/**
* RedisClient.
*/
private RedisClient redisClient;

private void initialize() {
if (genericObjectPool == null) {
synchronized (LettuceRedisSentinelManager.class) {
if (genericObjectPool == null) {
RedisURI redisURI = this.createSentinelRedisURI();
RedisClient redisClient = RedisClient.create(redisURI);
redisClient = RedisClient.create();
redisClient.setOptions(getClientOptions());
StatefulRedisMasterReplicaConnection<byte[], byte[]> connect = MasterReplica.connect(redisClient, new ByteArrayCodec(), redisURI);
connect.setReadFrom(readFrom);
GenericObjectPoolConfig genericObjectPoolConfig = getGenericObjectPoolConfig();
genericObjectPool = ConnectionPoolSupport.createGenericObjectPool(() -> connect, genericObjectPoolConfig);
GenericObjectPoolConfig<StatefulRedisMasterReplicaConnection<byte[], byte[]>> genericObjectPoolConfig = getGenericObjectPoolConfig();
genericObjectPool = ConnectionPoolSupport.createGenericObjectPool(() -> {
StatefulRedisMasterReplicaConnection<byte[], byte[]> connect = MasterReplica.connect(redisClient, new ByteArrayCodec(), redisURI);
connect.setReadFrom(readFrom);
return connect;
}, genericObjectPoolConfig);
}
}
}
Expand All @@ -64,6 +70,23 @@ protected StatefulRedisMasterReplicaConnection<byte[], byte[]> getStatefulConnec
}
}

@Override
protected void returnObject(StatefulRedisMasterReplicaConnection<byte[], byte[]> connect) {
if (connect != null) {
genericObjectPool.returnObject(connect);
}
}

@Override
public void close() throws Exception {
if (genericObjectPool != null) {
genericObjectPool.close();
}
if (redisClient != null) {
redisClient.shutdown();
}
}

private RedisURI createSentinelRedisURI() {
Objects.requireNonNull(nodes, "nodes must not be null!");

Expand Down Expand Up @@ -118,4 +141,12 @@ public ReadFrom getReadFrom() {
public void setReadFrom(ReadFrom readFrom) {
this.readFrom = readFrom;
}

public GenericObjectPool<StatefulRedisMasterReplicaConnection<byte[], byte[]>> getGenericObjectPool() {
return genericObjectPool;
}

public void setGenericObjectPool(GenericObjectPool<StatefulRedisMasterReplicaConnection<byte[], byte[]>> genericObjectPool) {
this.genericObjectPool = genericObjectPool;
}
}
Loading