Skip to content

Commit 06f6fca

Browse files
committed
[FLINK-15571] Add jedis connector base
1 parent 62e88dc commit 06f6fca

File tree

12 files changed

+1536
-0
lines changed

12 files changed

+1536
-0
lines changed

pom.xml

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
Licensed to the Apache Software Foundation (ASF) under one
4+
or more contributor license agreements. See the NOTICE file
5+
distributed with this work for additional information
6+
regarding copyright ownership. The ASF licenses this file
7+
to you under the Apache License, Version 2.0 (the
8+
"License"); you may not use this file except in compliance
9+
with the License. You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing,
14+
software distributed under the License is distributed on an
15+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
KIND, either express or implied. See the License for the
17+
specific language governing permissions and limitations
18+
under the License.
19+
-->
20+
<project xmlns="http://maven.apache.org/POM/4.0.0"
21+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
22+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
23+
24+
<modelVersion>4.0.0</modelVersion>
25+
26+
<parent>
27+
<groupId>org.apache</groupId>
28+
<artifactId>apache</artifactId>
29+
<version>20</version>
30+
</parent>
31+
32+
<groupId>org.apache.flink</groupId>
33+
<artifactId>flink-connector-redis</artifactId>
34+
<version>1.0-SNAPSHOT</version>
35+
<packaging>jar</packaging>
36+
<name>Flink : Connectors : Redis</name>
37+
<url>https://flink.apache.org/</url>
38+
<licenses>
39+
<license>
40+
<name>Apache 2.0 License</name>
41+
<url>https://www.apache.org/licenses/LICENSE-2.0.html</url>
42+
<distribution>repo</distribution>
43+
</license>
44+
</licenses>
45+
46+
<properties>
47+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
48+
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
49+
<java.version>1.8</java.version>
50+
<maven.compiler.source>${java.version}</maven.compiler.source>
51+
<maven.compiler.target>${java.version}</maven.compiler.target>
52+
53+
<flink.version>1.15.1</flink.version>
54+
<jedis.version>4.2.3</jedis.version>
55+
<testcontainers.version>1.17.3</testcontainers.version>
56+
57+
</properties>
58+
59+
<dependencyManagement>
60+
<dependencies>
61+
<dependency>
62+
<groupId>org.apache.flink</groupId>
63+
<artifactId>flink-connector-base</artifactId>
64+
<version>${flink.version}</version>
65+
</dependency>
66+
<dependency>
67+
<groupId>org.apache.flink</groupId>
68+
<artifactId>flink-streaming-java</artifactId>
69+
<version>${flink.version}</version>
70+
</dependency>
71+
<dependency>
72+
<groupId>org.apache.flink</groupId>
73+
<artifactId>flink-test-utils</artifactId>
74+
<version>${flink.version}</version>
75+
</dependency>
76+
<dependency>
77+
<groupId>org.junit.jupiter</groupId>
78+
<artifactId>junit-jupiter-api</artifactId>
79+
<version>5.8.2</version>
80+
<scope>test</scope>
81+
</dependency>
82+
<dependency>
83+
<groupId>org.junit.jupiter</groupId>
84+
<artifactId>junit-jupiter</artifactId>
85+
<version>5.8.2</version>
86+
<scope>test</scope>
87+
</dependency>
88+
<dependency>
89+
<groupId>org.testcontainers</groupId>
90+
<artifactId>testcontainers</artifactId>
91+
<version>${testcontainers.version}</version>
92+
<scope>test</scope>
93+
</dependency>
94+
<dependency>
95+
<groupId>org.testcontainers</groupId>
96+
<artifactId>junit-jupiter</artifactId>
97+
<version>${testcontainers.version}</version>
98+
<scope>test</scope>
99+
</dependency>
100+
<dependency>
101+
<groupId>redis.clients</groupId>
102+
<artifactId>jedis</artifactId>
103+
<version>${jedis.version}</version>
104+
</dependency>
105+
106+
</dependencies>
107+
</dependencyManagement>
108+
109+
<dependencies>
110+
<dependency>
111+
<groupId>org.apache.flink</groupId>
112+
<artifactId>flink-connector-base</artifactId>
113+
</dependency>
114+
<dependency>
115+
<groupId>org.apache.flink</groupId>
116+
<artifactId>flink-streaming-java</artifactId>
117+
</dependency>
118+
<dependency>
119+
<groupId>org.apache.flink</groupId>
120+
<artifactId>flink-test-utils</artifactId>
121+
</dependency>
122+
<dependency>
123+
<groupId>org.junit.jupiter</groupId>
124+
<artifactId>junit-jupiter-api</artifactId>
125+
</dependency>
126+
<dependency>
127+
<groupId>org.junit.jupiter</groupId>
128+
<artifactId>junit-jupiter</artifactId>
129+
</dependency>
130+
<dependency>
131+
<groupId>org.testcontainers</groupId>
132+
<artifactId>testcontainers</artifactId>
133+
</dependency>
134+
<dependency>
135+
<groupId>org.testcontainers</groupId>
136+
<artifactId>junit-jupiter</artifactId>
137+
</dependency>
138+
<dependency>
139+
<groupId>redis.clients</groupId>
140+
<artifactId>jedis</artifactId>
141+
</dependency>
142+
</dependencies>
143+
144+
</project>
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.connector.redis.shared;
19+
20+
import org.apache.flink.connector.redis.sink2.RedisAction;
21+
import redis.clients.jedis.JedisCluster;
22+
import redis.clients.jedis.JedisPool;
23+
import redis.clients.jedis.JedisSentinelPool;
24+
import redis.clients.jedis.commands.JedisCommands;
25+
26+
public class JedisConnector implements AutoCloseable {
27+
28+
private transient JedisCluster jedisCluster;
29+
private transient JedisPool jedisPool;
30+
private transient JedisSentinelPool jedisSentinelPool;
31+
32+
public JedisConnector(JedisCluster jedisCluster) {
33+
this.jedisCluster = jedisCluster;
34+
}
35+
36+
public JedisConnector(JedisPool jedisPool) {
37+
this.jedisPool = jedisPool;
38+
}
39+
40+
public JedisConnector(JedisSentinelPool jedisSentinelPool) {
41+
this.jedisSentinelPool = jedisSentinelPool;
42+
}
43+
44+
public JedisCommands getJedisCommands() {
45+
if (jedisCluster != null) return jedisCluster;
46+
if (jedisPool != null) return jedisPool.getResource();
47+
if (jedisSentinelPool != null) return jedisSentinelPool.getResource();
48+
49+
throw new IllegalArgumentException("No redis connection found");
50+
51+
}
52+
53+
public void execute(RedisAction action) {
54+
switch (action.command) {
55+
case SET:
56+
getJedisCommands().set(action.key, action.value);
57+
break;
58+
default:
59+
throw new IllegalArgumentException("Cannot process such data type: " + action.command);
60+
}
61+
}
62+
63+
@Override
64+
public void close() {
65+
if (jedisCluster != null) jedisCluster.close();
66+
if (jedisPool != null) jedisPool.close();
67+
if (jedisSentinelPool != null) jedisSentinelPool.close();
68+
}
69+
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.flink.connector.redis.shared;
18+
19+
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
20+
import org.apache.flink.connector.redis.shared.config.JedisClusterConfig;
21+
import org.apache.flink.connector.redis.shared.config.JedisConfig;
22+
import org.apache.flink.connector.redis.shared.config.JedisPoolConfig;
23+
import org.apache.flink.connector.redis.shared.config.JedisSentinelConfig;
24+
import redis.clients.jedis.JedisCluster;
25+
import redis.clients.jedis.JedisPool;
26+
import redis.clients.jedis.JedisSentinelPool;
27+
28+
import java.util.Objects;
29+
30+
/**
31+
* The builder for {@link JedisConnector}.
32+
*/
33+
public class JedisConnectorBuilder {
34+
35+
/**
36+
* Initialize the {@link JedisConnector} based on the instance type.
37+
* @param jedisConfig configuration base
38+
* @return @throws IllegalArgumentException if not valid configuration is provided
39+
*/
40+
public static JedisConnector build(JedisConfig jedisConfig){
41+
if(jedisConfig instanceof JedisPoolConfig){
42+
JedisPoolConfig jedisPoolConfig = (JedisPoolConfig) jedisConfig;
43+
return JedisConnectorBuilder.build(jedisPoolConfig);
44+
} else if (jedisConfig instanceof JedisClusterConfig) {
45+
JedisClusterConfig jedisClusterConfig = (JedisClusterConfig) jedisConfig;
46+
return JedisConnectorBuilder.build(jedisClusterConfig);
47+
} else if (jedisConfig instanceof JedisSentinelConfig) {
48+
JedisSentinelConfig jedisSentinelConfig = (JedisSentinelConfig) jedisConfig;
49+
return JedisConnectorBuilder.build(jedisSentinelConfig);
50+
} else {
51+
throw new IllegalArgumentException("Jedis configuration not found");
52+
}
53+
}
54+
55+
/**
56+
* Builds container for single Redis environment.
57+
*
58+
* @param jedisPoolConfig configuration for JedisPool
59+
* @return container for single Redis environment
60+
* @throws NullPointerException if jedisPoolConfig is null
61+
*/
62+
public static JedisConnector build(JedisPoolConfig jedisPoolConfig) {
63+
Objects.requireNonNull(jedisPoolConfig, "Redis pool config should not be Null");
64+
65+
GenericObjectPoolConfig genericObjectPoolConfig = getGenericObjectPoolConfig(jedisPoolConfig);
66+
67+
JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(),
68+
jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(),
69+
jedisPoolConfig.getDatabase());
70+
return new JedisConnector(jedisPool);
71+
}
72+
73+
/**
74+
* Builds container for Redis Cluster environment.
75+
*
76+
* @param jedisClusterConfig configuration for JedisCluster
77+
* @return container for Redis Cluster environment
78+
* @throws NullPointerException if jedisClusterConfig is null
79+
*/
80+
public static JedisConnector build(JedisClusterConfig jedisClusterConfig) {
81+
Objects.requireNonNull(jedisClusterConfig, "Redis cluster config should not be Null");
82+
83+
GenericObjectPoolConfig genericObjectPoolConfig = getGenericObjectPoolConfig(jedisClusterConfig);
84+
85+
JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(),
86+
jedisClusterConfig.getConnectionTimeout(),
87+
jedisClusterConfig.getConnectionTimeout(),
88+
jedisClusterConfig.getMaxRedirections(),
89+
jedisClusterConfig.getPassword(),
90+
genericObjectPoolConfig);
91+
return new JedisConnector(jedisCluster);
92+
}
93+
94+
/**
95+
* Builds container for Redis Sentinel environment.
96+
*
97+
* @param jedisSentinelConfig configuration for JedisSentinel
98+
* @return container for Redis sentinel environment
99+
* @throws NullPointerException if jedisSentinelConfig is null
100+
*/
101+
public static JedisConnector build(JedisSentinelConfig jedisSentinelConfig) {
102+
Objects.requireNonNull(jedisSentinelConfig, "Redis sentinel config should not be Null");
103+
104+
GenericObjectPoolConfig genericObjectPoolConfig = getGenericObjectPoolConfig(jedisSentinelConfig);
105+
106+
JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(),
107+
jedisSentinelConfig.getSentinels(), genericObjectPoolConfig,
108+
jedisSentinelConfig.getConnectionTimeout(), jedisSentinelConfig.getSoTimeout(),
109+
jedisSentinelConfig.getPassword(), jedisSentinelConfig.getDatabase());
110+
return new JedisConnector(jedisSentinelPool);
111+
}
112+
113+
public static GenericObjectPoolConfig getGenericObjectPoolConfig(JedisConfig jedisConfig) {
114+
GenericObjectPoolConfig genericObjectPoolConfig = jedisConfig.getTestWhileIdle()
115+
? new redis.clients.jedis.JedisPoolConfig()
116+
: new GenericObjectPoolConfig<>();
117+
genericObjectPoolConfig.setMaxIdle(jedisConfig.getMaxIdle());
118+
genericObjectPoolConfig.setMaxTotal(jedisConfig.getMaxTotal());
119+
genericObjectPoolConfig.setMinIdle(jedisConfig.getMinIdle());
120+
genericObjectPoolConfig.setTestOnBorrow(jedisConfig.getTestOnBorrow());
121+
genericObjectPoolConfig.setTestOnReturn(jedisConfig.getTestOnReturn());
122+
123+
return genericObjectPoolConfig;
124+
}
125+
}

0 commit comments

Comments
 (0)