Skip to content

Commit 1e66b6b

Browse files
committed
Redis Sonar fixes
1 parent 12bc24d commit 1e66b6b

File tree

4 files changed

+54
-44
lines changed

4 files changed

+54
-44
lines changed

spring-integration-redis/src/main/java/org/springframework/integration/redis/channel/SubscribableRedisChannel.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,11 @@ public class SubscribableRedisChannel extends AbstractMessageChannel
7373
private volatile boolean initialized;
7474

7575
// defaults
76-
private volatile Executor taskExecutor = new SimpleAsyncTaskExecutor();
76+
private Executor taskExecutor = new SimpleAsyncTaskExecutor();
7777

78-
private volatile RedisSerializer<?> serializer = new StringRedisSerializer();
78+
private RedisSerializer<?> serializer = new StringRedisSerializer();
7979

80-
private volatile MessageConverter messageConverter = new SimpleMessageConverter();
80+
private MessageConverter messageConverter = new SimpleMessageConverter();
8181

8282
public SubscribableRedisChannel(RedisConnectionFactory connectionFactory, String topicName) {
8383
Assert.notNull(connectionFactory, "'connectionFactory' must not be null");
@@ -125,7 +125,8 @@ public boolean unsubscribe(MessageHandler handler) {
125125

126126
@Override
127127
protected boolean doSend(Message<?> message, long arg1) {
128-
this.redisTemplate.convertAndSend(this.topicName, this.messageConverter.fromMessage(message, Object.class));
128+
Object value = this.messageConverter.fromMessage(message, Object.class);
129+
this.redisTemplate.convertAndSend(this.topicName, value); // NOSONAR - null can be sent
129130
return true;
130131
}
131132

@@ -136,9 +137,8 @@ public void onInit() throws Exception {
136137
}
137138
super.onInit();
138139
if (this.maxSubscribers == null) {
139-
Integer maxSubscribers =
140-
getIntegrationProperty(IntegrationProperties.CHANNELS_MAX_BROADCAST_SUBSCRIBERS, Integer.class);
141-
this.setMaxSubscribers(maxSubscribers);
140+
setMaxSubscribers(
141+
getIntegrationProperty(IntegrationProperties.CHANNELS_MAX_BROADCAST_SUBSCRIBERS, Integer.class));
142142
}
143143
if (this.messageConverter == null) {
144144
this.messageConverter = new SimpleMessageConverter();
@@ -214,8 +214,10 @@ public void handleMessage(Object payload) {
214214
SubscribableRedisChannel.this.dispatcher.dispatch(siMessage);
215215
}
216216
catch (MessageDispatchingException e) {
217-
String topicName = SubscribableRedisChannel.this.topicName;
218-
topicName = StringUtils.hasText(topicName) ? topicName : "unknown";
217+
String topicName =
218+
StringUtils.hasText(SubscribableRedisChannel.this.topicName)
219+
? SubscribableRedisChannel.this.topicName
220+
: "unknown";
219221
throw new MessageDeliveryException(siMessage, e.getMessage()
220222
+ " for redis-channel '"
221223
+ topicName

spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpoint.java

Lines changed: 35 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.integration.redis.inbound;
1818

19+
import java.util.Optional;
1920
import java.util.concurrent.Executor;
2021
import java.util.concurrent.TimeUnit;
2122

@@ -187,30 +188,9 @@ public String getComponentType() {
187188

188189
@SuppressWarnings("unchecked")
189190
private void popMessageAndSend() {
190-
Message<Object> message = null;
191+
byte[] value = popForValue();
191192

192-
byte[] value = null;
193-
try {
194-
if (this.rightPop) {
195-
value = this.boundListOperations.rightPop(this.receiveTimeout, TimeUnit.MILLISECONDS);
196-
}
197-
else {
198-
value = this.boundListOperations.leftPop(this.receiveTimeout, TimeUnit.MILLISECONDS);
199-
}
200-
}
201-
catch (Exception e) {
202-
this.listening = false;
203-
if (this.active) {
204-
logger.error("Failed to execute listening task. Will attempt to resubmit in " + this.recoveryInterval
205-
+ " milliseconds.", e);
206-
this.publishException(e);
207-
this.sleepBeforeRecoveryAttempt();
208-
}
209-
else {
210-
logger.debug("Failed to execute listening task. " + e.getClass() + ": " + e.getMessage());
211-
}
212-
return;
213-
}
193+
Message<Object> message = null;
214194

215195
if (value != null) {
216196
if (this.expectMessage) {
@@ -226,7 +206,9 @@ private void popMessageAndSend() {
226206
if (this.serializer != null) {
227207
payload = this.serializer.deserialize(value);
228208
}
229-
message = this.getMessageBuilderFactory().withPayload(payload).build();
209+
if (payload != null) {
210+
message = getMessageBuilderFactory().withPayload(payload).build();
211+
}
230212
}
231213
}
232214

@@ -245,6 +227,31 @@ private void popMessageAndSend() {
245227
}
246228
}
247229

230+
private byte[] popForValue() {
231+
byte[] value = null;
232+
try {
233+
if (this.rightPop) {
234+
value = this.boundListOperations.rightPop(this.receiveTimeout, TimeUnit.MILLISECONDS);
235+
}
236+
else {
237+
value = this.boundListOperations.leftPop(this.receiveTimeout, TimeUnit.MILLISECONDS);
238+
}
239+
}
240+
catch (Exception e) {
241+
this.listening = false;
242+
if (this.active) {
243+
logger.error("Failed to execute listening task. Will attempt to resubmit in " + this.recoveryInterval
244+
+ " milliseconds.", e);
245+
publishException(e);
246+
sleepBeforeRecoveryAttempt();
247+
}
248+
else {
249+
logger.debug("Failed to execute listening task. " + e.getClass() + ": " + e.getMessage());
250+
}
251+
}
252+
return value;
253+
}
254+
248255
@Override
249256
protected void doStart() {
250257
if (!this.active) {
@@ -293,7 +300,8 @@ protected void doStop(Runnable callback) {
293300
@Override
294301
protected void doStop() {
295302
super.doStop();
296-
this.active = this.listening = false;
303+
this.active = false;
304+
this.listening = false;
297305
}
298306

299307
public boolean isListening() {
@@ -304,12 +312,12 @@ public boolean isListening() {
304312
* Returns the size of the Queue specified by {@link #boundListOperations}. The queue is
305313
* represented by a Redis list. If the queue does not exist <code>0</code>
306314
* is returned. See also http://redis.io/commands/llen
307-
*
308315
* @return Size of the queue. Never negative.
309316
*/
310317
@ManagedMetric
311318
public long getQueueSize() {
312-
return this.boundListOperations.size();
319+
return Optional.ofNullable(this.boundListOperations.size())
320+
.orElse(0L);
313321
}
314322

315323
/**

spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/RedisStoreMessageSource.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,10 @@ public RedisStoreMessageSource(RedisConnectionFactory connectionFactory,
8888
Assert.notNull(keyExpression, "'keyExpression' must not be null");
8989
Assert.notNull(connectionFactory, "'connectionFactory' must not be null");
9090

91-
StringRedisTemplate redisTemplate = new StringRedisTemplate();
92-
redisTemplate.setConnectionFactory(connectionFactory);
93-
redisTemplate.afterPropertiesSet();
91+
this.redisTemplate = new StringRedisTemplate();
92+
this.redisTemplate.setConnectionFactory(connectionFactory);
93+
this.redisTemplate.afterPropertiesSet();
9494

95-
this.redisTemplate = redisTemplate;
9695
this.keyExpression = keyExpression;
9796
}
9897

spring-integration-redis/src/main/java/org/springframework/integration/redis/outbound/RedisPublishingMessageHandler.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2007-2016 the original author or authors.
2+
* Copyright 2007-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -34,6 +34,7 @@
3434
/**
3535
* @author Mark Fisher
3636
* @author Artem Bilan
37+
*
3738
* @since 2.1
3839
*/
3940
public class RedisPublishingMessageHandler extends AbstractMessageHandler {
@@ -86,7 +87,7 @@ public String getComponentType() {
8687
}
8788

8889
@Override
89-
protected void onInit() throws Exception {
90+
protected void onInit() {
9091
Assert.notNull(this.topicExpression, "'topicExpression' must not be null.");
9192
if (this.messageConverter instanceof BeanFactoryAware) {
9293
((BeanFactoryAware) this.messageConverter).setBeanFactory(getBeanFactory());
@@ -98,9 +99,9 @@ protected void onInit() throws Exception {
9899

99100
@Override
100101
@SuppressWarnings("unchecked")
101-
protected void handleMessageInternal(Message<?> message) throws Exception {
102+
protected void handleMessageInternal(Message<?> message) {
102103
String topic = this.topicExpression.getValue(this.evaluationContext, message, String.class);
103-
Object value = this.messageConverter.fromMessage(message, null);
104+
Object value = this.messageConverter.fromMessage(message, Object.class);
104105

105106
if (value instanceof byte[]) {
106107
this.template.convertAndSend(topic, value);

0 commit comments

Comments
 (0)