Skip to content

Commit c4c7bb9

Browse files
kimwanyoungartembilan
authored andcommitted
GH-10083: Apply Nullability to redis module
Related to: 10083 Apply maintainer feedback: - Use `@SuppressWarnings("NullAway.Init")` for `onInit()`-initialized fields - Remove unnecessary null checks and `@SuppressWarnings("NullAway")` - Move `@Nullable` to return type position - Make `connectionFactory` `final` with proper null assertions - Clean up imports and apply diamond operator - Remove unnecessary `@Nullable` annotations and related null-safety code in `SubscribableRedisChannel` - Format `@Nullable` annotations inline in code - Remove unused imports - Add comments explaining NullAway suppression reasons - Complete JSpecify nullability migration for spring-integration-redis module - Added `@Nullable` annotations to optional fields (`taskExecutor`, `serializer`, `stopCallback`, etc.) - Applied `@SuppressWarnings("NullAway.Init")` to dependency-injected fields - Enhanced null safety checks for `@Nullable` field usage - Fixed NullAway compilation errors in inbound package - Remove `@SuppressWarnings("NullAway")` and add null-safety checks in `ReactiveRedisStreamMessageProducer` * Replaced all usages of `org.springframework.lang.Nullable` with `org.jspecify.annotations.Nullable` in the `spring-integration-redis` module. * Added `@NullMarked` to package-info.java files across all `spring-integration-redis` subpackages. * Ensured compilation passes with JSpecify nullability annotations applied. Signed-off-by: Kim Wanyoung <[email protected]>
1 parent ebec0da commit c4c7bb9

26 files changed

+167
-86
lines changed

spring-integration-redis/src/main/java/org/springframework/integration/redis/channel/SubscribableRedisChannel.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import java.util.concurrent.Executor;
2020

21+
import org.jspecify.annotations.Nullable;
22+
2123
import org.springframework.beans.factory.BeanFactory;
2224
import org.springframework.beans.factory.BeanFactoryAware;
2325
import org.springframework.core.task.SimpleAsyncTaskExecutor;
@@ -67,15 +69,16 @@ public class SubscribableRedisChannel extends AbstractMessageChannel
6769

6870
private final String topicName;
6971

70-
private Executor taskExecutor;
72+
private @Nullable Executor taskExecutor;
7173

74+
@SuppressWarnings("NullAway.Init")
7275
private final BroadcastingDispatcher dispatcher = new BroadcastingDispatcher(true);
7376

7477
private RedisSerializer<?> serializer = new StringRedisSerializer();
7578

7679
private MessageConverter messageConverter = new SimpleMessageConverter();
7780

78-
private volatile Integer maxSubscribers;
81+
private volatile @Nullable Integer maxSubscribers;
7982

8083
private volatile boolean initialized;
8184

@@ -218,6 +221,12 @@ private class MessageListenerDelegate {
218221
@SuppressWarnings({"unused"})
219222
public void handleMessage(Object payload) {
220223
Message<?> siMessage = SubscribableRedisChannel.this.messageConverter.toMessage(payload, null);
224+
if (siMessage == null) {
225+
if (logger.isDebugEnabled()) {
226+
logger.debug("MessageConverter returned null for payload: " + payload);
227+
}
228+
return;
229+
}
221230
try {
222231
SubscribableRedisChannel.this.dispatcher.dispatch(siMessage);
223232
}
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/**
22
* Provides classes related to Redis-backed channels.
33
*/
4+
@org.jspecify.annotations.NullMarked
45
package org.springframework.integration.redis.channel;
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/**
22
* Provides classes for configuration - parsers, namespace handlers.
33
*/
4+
@org.jspecify.annotations.NullMarked
45
package org.springframework.integration.redis.config;
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/**
22
* Events generated by the redis module
33
*/
4+
@org.jspecify.annotations.NullMarked
45
package org.springframework.integration.redis.event;

spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/ReactiveRedisStreamMessageProducer.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
import org.springframework.integration.endpoint.MessageProducerSupport;
4040
import org.springframework.integration.redis.support.RedisHeaders;
4141
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
42-
import org.springframework.lang.Nullable;
42+
import org.jspecify.annotations.Nullable;
4343
import org.springframework.messaging.Message;
4444
import org.springframework.messaging.MessagingException;
4545
import org.springframework.messaging.converter.MessageConversionException;
@@ -59,6 +59,7 @@
5959
*
6060
* @since 5.4
6161
*/
62+
6263
public class ReactiveRedisStreamMessageProducer extends MessageProducerSupport {
6364

6465
private final ReactiveRedisConnectionFactory reactiveConnectionFactory;
@@ -71,10 +72,12 @@ public class ReactiveRedisStreamMessageProducer extends MessageProducerSupport {
7172
.pollTimeout(Duration.ZERO)
7273
.onErrorResume(this::handleReceiverError);
7374

75+
@SuppressWarnings("NullAway.Init")
7476
private ReactiveStreamOperations<String, ?, ?> reactiveStreamOperations;
7577

76-
private StreamReceiver.StreamReceiverOptions<String, ?> streamReceiverOptions;
78+
private StreamReceiver.@Nullable StreamReceiverOptions<String, ?> streamReceiverOptions;
7779

80+
@SuppressWarnings("NullAway.Init")
7881
private StreamReceiver<String, ?> streamReceiver;
7982

8083
private ReadOffset readOffset = ReadOffset.latest();
@@ -83,11 +86,10 @@ public class ReactiveRedisStreamMessageProducer extends MessageProducerSupport {
8386

8487
private boolean autoAck = true;
8588

86-
@Nullable
89+
@SuppressWarnings("NullAway.Init")
8790
private String consumerGroup;
8891

89-
@Nullable
90-
private String consumerName;
92+
private @Nullable String consumerName;
9193

9294
private boolean createConsumerGroup;
9395

@@ -165,7 +167,7 @@ public void setCreateConsumerGroup(boolean createConsumerGroup) {
165167
* @param streamReceiverOptions the desired receiver options
166168
* */
167169
public void setStreamReceiverOptions(
168-
@Nullable StreamReceiver.StreamReceiverOptions<String, ?> streamReceiverOptions) {
170+
StreamReceiver.@Nullable StreamReceiverOptions<String, ?> streamReceiverOptions) {
169171

170172
Assert.isTrue(!this.receiverBuilderOptionSet,
171173
"The 'streamReceiverOptions' is mutually exclusive with 'pollTimeout', 'batchSize', " +
@@ -294,23 +296,21 @@ protected void doStart() {
294296
if (this.createConsumerGroup) {
295297
consumerGroupMono =
296298
this.reactiveStreamOperations.createGroup(this.streamKey, this.consumerGroup) // NOSONAR
297-
.onErrorReturn(this.consumerGroup);
299+
.onErrorReturn(this.consumerGroup);
298300
}
299301

300-
Consumer consumer = Consumer.from(this.consumerGroup, this.consumerName); // NOSONAR
302+
Consumer consumer = Consumer.from(this.consumerGroup, this.consumerName);
301303

302304
if (offset.getOffset().equals(ReadOffset.latest())) {
303305
// for consumer group offset id should be equal to '>'
304306
offset = StreamOffset.create(this.streamKey, ReadOffset.lastConsumed());
305307
}
306308

307-
events =
308-
this.autoAck
309-
? this.streamReceiver.receiveAutoAck(consumer, offset)
310-
: this.streamReceiver.receive(consumer, offset);
309+
events = this.autoAck
310+
? this.streamReceiver.receiveAutoAck(consumer, offset)
311+
: this.streamReceiver.receive(consumer, offset);
311312

312313
events = consumerGroupMono.thenMany(events);
313-
314314
}
315315

316316
Flux<? extends Message<?>> messageFlux =
@@ -347,9 +347,9 @@ private <T> Publisher<T> handleReceiverError(Throwable error) {
347347
failedMessage = buildMessageFromRecord(record, false);
348348
}
349349
}
350-
MessagingException conversionException =
351-
new MessageConversionException(failedMessage, // NOSONAR
352-
"Cannot deserialize Redis Stream Record", error);
350+
MessagingException conversionException = (failedMessage != null)
351+
? new MessageConversionException(failedMessage, "Cannot deserialize Redis Stream Record", error)
352+
: new MessageConversionException("Cannot deserialize Redis Stream Record", error);
353353
if (!sendErrorMessageIfNecessary(null, conversionException)) {
354354
logger.getLog().error(conversionException);
355355
}

spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/RedisInboundChannelAdapter.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.util.List;
2323
import java.util.concurrent.Executor;
2424

25+
import org.jspecify.annotations.Nullable;
26+
2527
import org.springframework.beans.factory.BeanFactoryAware;
2628
import org.springframework.data.redis.connection.RedisConnectionFactory;
2729
import org.springframework.data.redis.listener.ChannelTopic;
@@ -55,8 +57,10 @@ public class RedisInboundChannelAdapter extends MessageProducerSupport {
5557

5658
private volatile MessageConverter messageConverter = new SimpleMessageConverter();
5759

60+
@SuppressWarnings("NullAway.Init")
5861
private volatile String[] topics;
5962

63+
@SuppressWarnings("NullAway.Init")
6064
private volatile String[] topicPatterns;
6165

6266
private volatile RedisSerializer<?> serializer = new StringRedisSerializer();
@@ -150,7 +154,7 @@ protected void doStop() {
150154
this.container.stop();
151155
}
152156

153-
private Message<?> convertMessage(Object object, String source) {
157+
private @Nullable Message<?> convertMessage(Object object, String source) {
154158
MessageHeaders messageHeaders = null;
155159
if (StringUtils.hasText(source)) {
156160
messageHeaders = new MessageHeaders(Collections.singletonMap(RedisHeaders.MESSAGE_SOURCE, source));

spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/RedisQueueInboundGateway.java

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
import org.springframework.jmx.export.annotation.ManagedMetric;
3838
import org.springframework.jmx.export.annotation.ManagedOperation;
3939
import org.springframework.jmx.export.annotation.ManagedResource;
40-
import org.springframework.lang.Nullable;
40+
import org.jspecify.annotations.Nullable;
4141
import org.springframework.messaging.Message;
4242
import org.springframework.messaging.MessagingException;
4343
import org.springframework.scheduling.SchedulingAwareRunnable;
@@ -66,13 +66,15 @@ public class RedisQueueInboundGateway extends MessagingGatewaySupport
6666

6767
private final BoundListOperations<String, byte[]> boundListOperations;
6868

69+
@SuppressWarnings("NullAway.Init")
6970
private ApplicationEventPublisher applicationEventPublisher;
7071

7172
private boolean serializerExplicitlySet;
7273

74+
@SuppressWarnings("NullAway.Init")
7375
private Executor taskExecutor;
7476

75-
private RedisSerializer<?> serializer;
77+
private @Nullable RedisSerializer<?> serializer;
7678

7779
private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
7880

@@ -82,7 +84,7 @@ public class RedisQueueInboundGateway extends MessagingGatewaySupport
8284

8385
private volatile boolean listening;
8486

85-
private volatile Runnable stopCallback;
87+
private volatile @Nullable Runnable stopCallback;
8688

8789
/**
8890
* @param queueName Must not be an empty String
@@ -155,11 +157,14 @@ protected void onInit() {
155157
this.taskExecutor = new SimpleAsyncTaskExecutor((beanName == null ? "" : beanName + "-")
156158
+ getComponentType());
157159
}
158-
if (!(this.taskExecutor instanceof ErrorHandlingTaskExecutor) && getBeanFactory() != null) {
160+
Executor executor = this.taskExecutor;
161+
if (!(executor instanceof ErrorHandlingTaskExecutor) && getBeanFactory() != null) {
159162
MessagePublishingErrorHandler errorHandler = new MessagePublishingErrorHandler();
160163
errorHandler.setBeanFactory(getBeanFactory());
161-
errorHandler.setDefaultErrorChannel(getErrorChannel());
162-
this.taskExecutor = new ErrorHandlingTaskExecutor(this.taskExecutor, errorHandler);
164+
if (getErrorChannel() != null) {
165+
errorHandler.setDefaultErrorChannel(getErrorChannel());
166+
}
167+
this.taskExecutor = new ErrorHandlingTaskExecutor(executor, errorHandler);
163168
}
164169
}
165170

@@ -230,7 +235,7 @@ private void getRequestSendAndProduceReply(byte[] value, String uuid) {
230235
if (requestMessage != null) {
231236
Message<?> replyMessage = sendAndReceiveMessage(requestMessage);
232237
if (replyMessage != null) {
233-
byte[] replyPayload = null;
238+
@Nullable byte[] replyPayload = null;
234239
if (this.extractPayload) {
235240
replyPayload = extractReplyPayload(replyMessage);
236241
}
@@ -249,7 +254,7 @@ private void getRequestSendAndProduceReply(byte[] value, String uuid) {
249254
@Nullable
250255
@SuppressWarnings("unchecked")
251256
private Message<Object> prepareRequestMessage(byte[] value) {
252-
Message<Object> requestMessage;
257+
Message<Object> requestMessage = null;
253258
if (this.extractPayload) {
254259
Object payload = value;
255260
if (this.serializer != null) {
@@ -262,9 +267,11 @@ private Message<Object> prepareRequestMessage(byte[] value) {
262267
}
263268
else {
264269
try {
265-
requestMessage = (Message<Object>) this.serializer.deserialize(value);
266-
if (requestMessage == null) {
267-
return null;
270+
if (this.serializer != null) {
271+
requestMessage = (Message<Object>) this.serializer.deserialize(value);
272+
if (requestMessage == null) {
273+
return null;
274+
}
268275
}
269276
}
270277
catch (Exception e) {
@@ -275,14 +282,17 @@ private Message<Object> prepareRequestMessage(byte[] value) {
275282
}
276283

277284
@SuppressWarnings("unchecked")
278-
private byte[] extractReplyPayload(Message<?> replyMessage) {
279-
byte[] value;
285+
private @Nullable byte[] extractReplyPayload(Message<?> replyMessage) {
286+
byte[] value = null;
280287
if (!(replyMessage.getPayload() instanceof byte[])) {
281288
if (replyMessage.getPayload() instanceof String && !this.serializerExplicitlySet) {
282289
value = StringRedisSerializer.UTF_8.serialize((String) replyMessage.getPayload());
283290
}
284291
else {
285-
value = ((RedisSerializer<Object>) this.serializer).serialize(replyMessage.getPayload());
292+
RedisSerializer<?> serializer = this.serializer;
293+
if (serializer != null) {
294+
value = ((RedisSerializer<Object>) serializer).serialize(replyMessage.getPayload());
295+
}
286296
}
287297
}
288298
else {
@@ -384,9 +394,12 @@ public void run() {
384394
if (isActive()) {
385395
restart();
386396
}
387-
else if (RedisQueueInboundGateway.this.stopCallback != null) {
388-
RedisQueueInboundGateway.this.stopCallback.run();
389-
RedisQueueInboundGateway.this.stopCallback = null;
397+
else {
398+
Runnable callback = RedisQueueInboundGateway.this.stopCallback;
399+
if (callback != null) {
400+
callback.run();
401+
RedisQueueInboundGateway.this.stopCallback = null;
402+
}
390403
}
391404
}
392405
}

spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpoint.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.concurrent.Executor;
2121
import java.util.concurrent.TimeUnit;
2222

23+
import org.jspecify.annotations.Nullable;
2324
import org.springframework.beans.factory.BeanClassLoaderAware;
2425
import org.springframework.beans.factory.BeanFactory;
2526
import org.springframework.context.ApplicationEventPublisher;
@@ -66,11 +67,13 @@ public class RedisQueueMessageDrivenEndpoint extends MessageProducerSupport
6667

6768
private final BoundListOperations<String, byte[]> boundListOperations;
6869

70+
@SuppressWarnings("NullAway.Init")
6971
private ApplicationEventPublisher applicationEventPublisher;
7072

73+
@SuppressWarnings("NullAway.Init")
7174
private Executor taskExecutor;
7275

73-
private RedisSerializer<?> serializer;
76+
private @Nullable RedisSerializer<?> serializer;
7477

7578
private boolean serializerExplicitlySet;
7679

@@ -84,7 +87,7 @@ public class RedisQueueMessageDrivenEndpoint extends MessageProducerSupport
8487

8588
private volatile boolean listening;
8689

87-
private volatile Runnable stopCallback;
90+
private volatile @Nullable Runnable stopCallback;
8891

8992
/**
9093
* @param queueName Must not be an empty String
@@ -175,10 +178,12 @@ protected void onInit() {
175178
+ getComponentType());
176179
}
177180
BeanFactory beanFactory = getBeanFactory();
178-
if (!(this.taskExecutor instanceof ErrorHandlingTaskExecutor) && beanFactory != null) {
181+
if (!(this.taskExecutor instanceof ErrorHandlingTaskExecutor)) {
179182
MessagePublishingErrorHandler errorHandler =
180183
new MessagePublishingErrorHandler(ChannelResolverUtils.getChannelResolver(beanFactory));
181-
errorHandler.setDefaultErrorChannel(getErrorChannel());
184+
if (getErrorChannel() != null) {
185+
errorHandler.setDefaultErrorChannel(getErrorChannel());
186+
}
182187
this.taskExecutor = new ErrorHandlingTaskExecutor(this.taskExecutor, errorHandler);
183188
}
184189
}
@@ -190,14 +195,16 @@ public String getComponentType() {
190195

191196
@SuppressWarnings("unchecked")
192197
private void popMessageAndSend() {
193-
byte[] value = popForValue();
198+
@Nullable byte[] value = popForValue();
194199

195200
Message<Object> message = null;
196201

197202
if (value != null) {
198203
if (this.expectMessage) {
199204
try {
200-
message = (Message<Object>) this.serializer.deserialize(value);
205+
if (this.serializer != null) {
206+
message = (Message<Object>) serializer.deserialize(value);
207+
}
201208
}
202209
catch (Exception e) {
203210
throw new MessagingException("Deserialization of Message failed.", e);
@@ -229,8 +236,8 @@ private void popMessageAndSend() {
229236
}
230237
}
231238

232-
private byte[] popForValue() {
233-
byte[] value = null;
239+
private @Nullable byte[] popForValue() {
240+
@Nullable byte[] value = null;
234241
try {
235242
if (this.rightPop) {
236243
value = this.boundListOperations.rightPop(this.receiveTimeout, TimeUnit.MILLISECONDS);
@@ -347,9 +354,12 @@ public void run() {
347354
if (isActive()) {
348355
restart();
349356
}
350-
else if (RedisQueueMessageDrivenEndpoint.this.stopCallback != null) {
351-
RedisQueueMessageDrivenEndpoint.this.stopCallback.run();
352-
RedisQueueMessageDrivenEndpoint.this.stopCallback = null;
357+
else {
358+
Runnable callback = RedisQueueMessageDrivenEndpoint.this.stopCallback;
359+
if (callback != null) {
360+
callback.run();
361+
RedisQueueMessageDrivenEndpoint.this.stopCallback = null;
362+
}
353363
}
354364
}
355365
}

0 commit comments

Comments
 (0)