Skip to content

Commit 1ddd501

Browse files
committed
[FLINK-15571] Change Redis Sink to AsyncSink implementation
1 parent 9934c61 commit 1ddd501

File tree

9 files changed

+267
-86
lines changed

9 files changed

+267
-86
lines changed
Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,22 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.flink.connector.redis.streams.sink.command;
17+
package org.apache.flink.connector.redis.streams.sink;
1818

1919

2020
import org.apache.flink.connector.redis.streams.sink.connection.JedisConnector;
2121
import redis.clients.jedis.StreamEntryID;
2222

23+
import java.io.Serializable;
2324
import java.util.Map;
2425

25-
public class StreamRedisCommand implements RedisCommand {
26+
public class RedisStreamsCommand implements Serializable {
27+
28+
// private transient StreamEntryID streamId = null;
2629
public final String key;
2730
public final Map<String, String> value;
2831

29-
private StreamRedisCommand(String key, Map<String, String> value) {
32+
private RedisStreamsCommand(String key, Map<String, String> value) {
3033
this.key = key;
3134
this.value = value;
3235
}
@@ -35,9 +38,27 @@ public static Builder builder() {
3538
return new Builder();
3639
}
3740

38-
@Override
3941
public void send(JedisConnector connector) {
40-
connector.getJedisCommands().xadd(key, StreamEntryID.NEW_ENTRY, value);
42+
// this.streamId = connector.getJedisCommands()
43+
// .xadd(key, (this.streamId != null) ? this.streamId : StreamEntryID.NEW_ENTRY, value);
44+
connector.getJedisCommands()
45+
.xadd(key, StreamEntryID.NEW_ENTRY, value);
46+
}
47+
48+
public boolean sendCorrectly() {
49+
// return streamId != null;
50+
return true;
51+
}
52+
53+
public boolean sendIncorrectly() {
54+
return !sendCorrectly();
55+
}
56+
57+
public long getMessageSize() {
58+
return value.entrySet().stream()
59+
.map(k -> k.getKey().length()+k.getValue().length())
60+
.reduce(Integer::sum)
61+
.orElse(0);
4162
}
4263

4364
public static class Builder {
@@ -54,8 +75,8 @@ public Builder withValue(Map<String, String> value) {
5475
return this;
5576
}
5677

57-
public StreamRedisCommand build() {
58-
return new StreamRedisCommand(key, value);
78+
public RedisStreamsCommand build() {
79+
return new RedisStreamsCommand(key, value);
5980
}
6081
}
6182
}

flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisSerializer.java renamed to flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsCommandSerializer.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,13 @@
1616
*/
1717
package org.apache.flink.connector.redis.streams.sink;
1818

19-
import org.apache.flink.api.common.functions.Function;
20-
import org.apache.flink.connector.redis.streams.sink.command.RedisCommand;
21-
22-
import java.io.Serializable;
19+
import org.apache.flink.connector.base.sink.writer.ElementConverter;
2320

2421
/**
2522
* Function that creates the description how the input data should be mapped to redis type.
2623
*
2724
* @param <T> The type of the element handled by this {@code RedisSerializer}
2825
*/
29-
public interface RedisSerializer<T> extends Function, Serializable {
26+
public interface RedisStreamsCommandSerializer<T> extends ElementConverter<T, RedisStreamsCommand> {
3027

31-
RedisCommand getMessage(T input);
3228
}

flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsSink.java

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,28 +16,61 @@
1616
*/
1717
package org.apache.flink.connector.redis.streams.sink;
1818

19-
import org.apache.flink.api.connector.sink2.Sink;
20-
import org.apache.flink.api.connector.sink2.SinkWriter;
19+
import org.apache.flink.connector.base.sink.AsyncSinkBase;
20+
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
21+
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
2122
import org.apache.flink.connector.redis.streams.sink.config.JedisConfig;
2223
import org.apache.flink.connector.redis.streams.sink.connection.JedisConnector;
2324
import org.apache.flink.connector.redis.streams.sink.connection.JedisConnectorBuilder;
25+
import org.apache.flink.core.io.SimpleVersionedSerializer;
2426

2527
import java.io.IOException;
28+
import java.util.Collection;
29+
import java.util.Collections;
2630

27-
public class RedisStreamsSink<T> implements Sink<T> {
31+
public class RedisStreamsSink<T> extends AsyncSinkBase<T, RedisStreamsCommand> {
2832

2933
private final JedisConfig jedisConfig;
30-
private final RedisSerializer<T> serializer;
3134

32-
public RedisStreamsSink(JedisConfig jedisConfig, RedisSerializer<T> serializer) {
35+
public RedisStreamsSink(JedisConfig jedisConfig,
36+
RedisStreamsCommandSerializer<T> converter,
37+
AsyncSinkWriterConfiguration asyncConfig) {
38+
super(converter,
39+
asyncConfig.getMaxBatchSize(),
40+
asyncConfig.getMaxInFlightRequests(),
41+
asyncConfig.getMaxBufferedRequests(),
42+
asyncConfig.getMaxBatchSizeInBytes(),
43+
asyncConfig.getMaxTimeInBufferMS(),
44+
asyncConfig.getMaxRecordSizeInBytes());
3345
this.jedisConfig = jedisConfig;
34-
this.serializer = serializer;
3546
}
3647

3748
@Override
38-
public SinkWriter<T> createWriter(InitContext initContext) throws IOException {
49+
public RedisStreamsWriter<T> createWriter(InitContext initContext) throws IOException {
50+
return restoreWriter(initContext, Collections.emptyList());
51+
}
52+
53+
@Override
54+
public RedisStreamsWriter<T> restoreWriter(InitContext initContext, Collection<BufferedRequestState<RedisStreamsCommand>> recoveredState) throws IOException {
55+
AsyncSinkWriterConfiguration asyncConfig = AsyncSinkWriterConfiguration.builder()
56+
.setMaxBatchSize(getMaxBatchSize())
57+
.setMaxBatchSizeInBytes(getMaxBatchSizeInBytes())
58+
.setMaxInFlightRequests(getMaxInFlightRequests())
59+
.setMaxBufferedRequests(getMaxBufferedRequests())
60+
.setMaxTimeInBufferMS(getMaxTimeInBufferMS())
61+
.setMaxRecordSizeInBytes(getMaxRecordSizeInBytes())
62+
.build();
3963
JedisConnector connection = JedisConnectorBuilder.build(jedisConfig);
40-
return new RedisStreamsWriter<>(connection, this.serializer);
64+
return new RedisStreamsWriter<>(connection,
65+
getElementConverter(),
66+
asyncConfig,
67+
initContext,
68+
recoveredState);
69+
}
70+
71+
@Override
72+
public SimpleVersionedSerializer<BufferedRequestState<RedisStreamsCommand>> getWriterStateSerializer() {
73+
return new RedisStreamsStateSerializer();
4174
}
4275

4376
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package org.apache.flink.connector.redis.streams.sink;
2+
3+
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
4+
import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper;
5+
import org.apache.flink.core.io.SimpleVersionedSerializer;
6+
7+
import java.io.*;
8+
import java.util.*;
9+
10+
public class RedisStreamsStateSerializer implements SimpleVersionedSerializer<BufferedRequestState<RedisStreamsCommand>> {
11+
12+
@Override
13+
public int getVersion() {
14+
return 1;
15+
}
16+
17+
@Override
18+
public byte[] serialize(BufferedRequestState<RedisStreamsCommand> obj) throws IOException {
19+
Collection<RequestEntryWrapper<RedisStreamsCommand>> bufferState =
20+
obj.getBufferedRequestEntries();
21+
22+
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
23+
final DataOutputStream out = new DataOutputStream(baos)) {
24+
25+
out.writeInt(getVersion());
26+
out.writeInt(bufferState.size());
27+
28+
for (RequestEntryWrapper<RedisStreamsCommand> wrapper : bufferState) {
29+
RedisStreamsCommand command = wrapper.getRequestEntry();
30+
writeString(out, command.key);
31+
out.writeInt(command.value.size());
32+
for (Map.Entry<String,String> entry : command.value.entrySet()) {
33+
writeString(out, entry.getKey());
34+
writeString(out, entry.getValue());
35+
}
36+
}
37+
38+
out.flush();
39+
return baos.toByteArray();
40+
}
41+
}
42+
43+
@Override
44+
public BufferedRequestState<RedisStreamsCommand> deserialize(int version, byte[] serialized) throws IOException {
45+
try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
46+
final DataInputStream in = new DataInputStream(bais)) {
47+
48+
int byteVersion = in.readInt();
49+
50+
int bufferSize = in.readInt();
51+
List<RequestEntryWrapper<RedisStreamsCommand>> state = new ArrayList<>();
52+
for (int bs = 0; bs < bufferSize; bs++) {
53+
String key = readString(in);
54+
55+
int valueSize = in.readInt();
56+
Map<String,String> values = new HashMap<>();
57+
for (int i = 0; i < valueSize; i++) {
58+
String eKey = readString(in);
59+
String eValue = readString(in);
60+
values.put(eKey, eValue);
61+
}
62+
63+
RedisStreamsCommand command = RedisStreamsCommand.builder()
64+
.withKey(key)
65+
.withValue(values)
66+
.build();
67+
68+
state.add(new RequestEntryWrapper<>(command, command.getMessageSize()));
69+
}
70+
return new BufferedRequestState<>(state);
71+
}
72+
}
73+
74+
private void writeString(final DataOutputStream out, String value) throws IOException {
75+
out.writeInt(value.length());
76+
out.writeBytes(value);
77+
}
78+
private String readString(final DataInputStream in) throws IOException {
79+
int sizeToRead = in.readInt();
80+
byte[] bytesRead = new byte[sizeToRead];
81+
int sizeRead = in.read(bytesRead);
82+
if (sizeToRead == sizeRead) return new String(bytesRead);
83+
else throw new IOException(String.format("Expected to read %s but read %s", sizeToRead, sizeRead));
84+
}
85+
86+
}

flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsWriter.java

Lines changed: 25 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,46 +16,44 @@
1616
*/
1717
package org.apache.flink.connector.redis.streams.sink;
1818

19-
import org.apache.flink.api.connector.sink2.SinkWriter;
20-
import org.apache.flink.connector.redis.streams.sink.command.RedisCommand;
19+
import org.apache.flink.api.connector.sink2.Sink;
20+
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
21+
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
22+
import org.apache.flink.connector.base.sink.writer.ElementConverter;
23+
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
2124
import org.apache.flink.connector.redis.streams.sink.connection.JedisConnector;
2225

23-
import java.io.IOException;
24-
import java.util.ArrayDeque;
25-
import java.util.Queue;
26+
import java.util.Collection;
27+
import java.util.List;
28+
import java.util.function.Consumer;
29+
import java.util.stream.Collectors;
2630

27-
public class RedisStreamsWriter<T> implements SinkWriter<T> {
31+
class RedisStreamsWriter<T> extends AsyncSinkWriter<T, RedisStreamsCommand> {
2832

2933
private final JedisConnector jedisConnector;
30-
private final RedisSerializer<T> serializer;
31-
private final Queue<RedisCommand> queue = new ArrayDeque<>();
3234

33-
public RedisStreamsWriter(JedisConnector jedisConnector, RedisSerializer<T> serializer) {
35+
public RedisStreamsWriter(JedisConnector jedisConnector,
36+
ElementConverter<T, RedisStreamsCommand> elementConverter,
37+
AsyncSinkWriterConfiguration asyncConfig,
38+
Sink.InitContext initContext,
39+
Collection<BufferedRequestState<RedisStreamsCommand>> recoveredState) {
40+
super(elementConverter, initContext, asyncConfig, recoveredState);
3441
this.jedisConnector = jedisConnector;
35-
this.serializer = serializer;
36-
}
37-
38-
39-
@Override
40-
public void write(T input, Context context) throws IOException, InterruptedException {
41-
RedisCommand message = serializer.getMessage(input);
42-
queue.add(message);
4342
}
4443

4544
@Override
46-
public void flush(boolean endOfInput) throws IOException, InterruptedException {
47-
flush();
48-
}
45+
protected void submitRequestEntries(List<RedisStreamsCommand> requestEntries, Consumer<List<RedisStreamsCommand>> requestResult) {
46+
List<RedisStreamsCommand> errors = requestEntries.stream()
47+
.peek(command -> command.send(this.jedisConnector))
48+
.filter(RedisStreamsCommand::sendIncorrectly)
49+
.collect(Collectors.toList());
4950

50-
private void flush() {
51-
while(!this.queue.isEmpty()) {
52-
RedisCommand element = this.queue.remove();
53-
element.send(this.jedisConnector);
54-
}
51+
requestResult.accept(errors);
5552
}
5653

5754
@Override
58-
public void close() throws Exception {
59-
jedisConnector.close();
55+
protected long getSizeInBytes(RedisStreamsCommand command) {
56+
return command.getMessageSize();
6057
}
58+
6159
}

flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/command/RedisCommand.java

Lines changed: 0 additions & 26 deletions
This file was deleted.

flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/connection/JedisConnector.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
import redis.clients.jedis.JedisSentinelPool;
2323
import redis.clients.jedis.commands.JedisCommands;
2424

25-
public class JedisConnector implements AutoCloseable {
25+
import java.io.Serializable;
26+
27+
public class JedisConnector implements AutoCloseable, Serializable {
2628

2729
private transient JedisCluster jedisCluster;
2830
private transient JedisPool jedisPool;

0 commit comments

Comments
 (0)