Skip to content

Commit a5086a9

Browse files
authored
NIFI-14067 Replaced anonymous classes with lambdas (#9571)
Signed-off-by: David Handermann <exceptionfactory@apache.org>
1 parent 5c5b7ff commit a5086a9

File tree

45 files changed

+1159
-1755
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1159
-1755
lines changed

nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727

2828
import java.util.ArrayList;
2929
import java.util.Collection;
30-
import java.util.Comparator;
3130
import java.util.Date;
3231
import java.util.HashMap;
3332
import java.util.HashSet;
@@ -204,16 +203,13 @@ private void computeLineage() {
204203

205204
Map<String, LineageNode> lastEventMap = new HashMap<>(); // maps FlowFile UUID to last event for that FlowFile
206205
final List<ProvenanceEventRecord> sortedRecords = new ArrayList<>(relevantRecords);
207-
sortedRecords.sort(new Comparator<ProvenanceEventRecord>() {
208-
@Override
209-
public int compare(final ProvenanceEventRecord o1, final ProvenanceEventRecord o2) {
210-
// Sort on Event Time, then Event ID.
211-
final int eventTimeComparison = Long.compare(o1.getEventTime(), o2.getEventTime());
212-
if (eventTimeComparison == 0) {
213-
return Long.compare(o1.getEventId(), o2.getEventId());
214-
} else {
215-
return eventTimeComparison;
216-
}
206+
sortedRecords.sort((o1, o2) -> {
207+
// Sort on Event Time, then Event ID.
208+
final int eventTimeComparison = Long.compare(o1.getEventTime(), o2.getEventTime());
209+
if (eventTimeComparison == 0) {
210+
return Long.compare(o1.getEventId(), o2.getEventId());
211+
} else {
212+
return eventTimeComparison;
217213
}
218214
});
219215

nifi-commons/nifi-security-identity/src/main/java/org/apache/nifi/authorization/util/IdentityMappingUtil.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.slf4j.LoggerFactory;
2424

2525
import java.util.ArrayList;
26-
import java.util.Collections;
2726
import java.util.Comparator;
2827
import java.util.List;
2928
import java.util.function.Supplier;
@@ -121,12 +120,7 @@ private static List<IdentityMapping> getMappings(final NiFiProperties properties
121120
}
122121

123122
// sort the list by the key so users can control the ordering in nifi.properties
124-
Collections.sort(mappings, new Comparator<IdentityMapping>() {
125-
@Override
126-
public int compare(IdentityMapping m1, IdentityMapping m2) {
127-
return m1.getKey().compareTo(m2.getKey());
128-
}
129-
});
123+
mappings.sort(Comparator.comparing(IdentityMapping::getKey));
130124

131125
return mappings;
132126
}

nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/RingBuffer.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -163,12 +163,7 @@ public int removeSelectedElements(final Filter<T> filter) {
163163
}
164164

165165
public List<T> asList() {
166-
return getSelectedElements(new Filter<T>() {
167-
@Override
168-
public boolean select(final T value) {
169-
return true;
170-
}
171-
});
166+
return getSelectedElements(value -> true);
172167
}
173168

174169
public T getOldestElement() {
@@ -252,7 +247,7 @@ public void forEach(final ForEachEvaluator<T> evaluator, final IterationDirectio
252247
}
253248
}
254249

255-
public static interface Filter<S> {
250+
public interface Filter<S> {
256251

257252
boolean select(S value);
258253
}
@@ -262,7 +257,7 @@ public static interface Filter<S> {
262257
*
263258
* @param <S> the type to evaluate
264259
*/
265-
public static interface ForEachEvaluator<S> {
260+
public interface ForEachEvaluator<S> {
266261

267262
/**
268263
* Evaluates the given element and returns {@code true} if the next element should be evaluated, {@code false} otherwise
@@ -273,9 +268,9 @@ public static interface ForEachEvaluator<S> {
273268
boolean evaluate(S value);
274269
}
275270

276-
public static enum IterationDirection {
271+
public enum IterationDirection {
277272

278273
FORWARD,
279-
BACKWARD;
274+
BACKWARD
280275
}
281276
}

nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import java.util.Arrays;
2929
import java.util.Collection;
3030
import java.util.Collections;
31-
import java.util.Comparator;
3231
import java.util.HashSet;
3332
import java.util.List;
3433
import java.util.Map;
@@ -167,14 +166,11 @@ public synchronized Collection<T> recoverRecords() throws IOException {
167166
}
168167

169168
final List<File> orderedJournalFiles = Arrays.asList(journalFiles);
170-
Collections.sort(orderedJournalFiles, new Comparator<File>() {
171-
@Override
172-
public int compare(final File o1, final File o2) {
173-
final long transactionId1 = getMinTransactionId(o1);
174-
final long transactionId2 = getMinTransactionId(o2);
169+
orderedJournalFiles.sort((o1, o2) -> {
170+
final long transactionId1 = getMinTransactionId(o1);
171+
final long transactionId2 = getMinTransactionId(o2);
175172

176-
return Long.compare(transactionId1, transactionId2);
177-
}
173+
return Long.compare(transactionId1, transactionId2);
178174
});
179175

180176
final long snapshotTransactionId = snapshotRecovery.getMaxTransactionId();

nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecordTest.java

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,9 @@
2929
import org.junit.jupiter.api.extension.ExtendWith;
3030
import org.mockito.ArgumentCaptor;
3131
import org.mockito.Mock;
32-
import org.mockito.invocation.InvocationOnMock;
3332
import org.mockito.junit.jupiter.MockitoExtension;
3433
import org.mockito.junit.jupiter.MockitoSettings;
3534
import org.mockito.quality.Strictness;
36-
import org.mockito.stubbing.Answer;
3735
import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
3836
import software.amazon.awssdk.awscore.exception.AwsServiceException;
3937
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
@@ -46,7 +44,6 @@
4644

4745
import java.io.FileInputStream;
4846
import java.util.ArrayList;
49-
import java.util.Arrays;
5047
import java.util.Collection;
5148
import java.util.Collections;
5249
import java.util.HashMap;
@@ -144,7 +141,7 @@ public void testMultipleChunks() throws Exception {
144141
final List<BatchWriteItemRequest> results = captor.getAllValues();
145142
Assertions.assertEquals(2, results.size());
146143

147-
final BatchWriteItemRequest result1 = results.get(0);
144+
final BatchWriteItemRequest result1 = results.getFirst();
148145
assertTrue(result1.hasRequestItems());
149146
assertNotNull(result1.requestItems().get(TABLE_NAME));
150147
assertItemsConvertedProperly(result1.requestItems().get(TABLE_NAME), 25);
@@ -155,7 +152,7 @@ public void testMultipleChunks() throws Exception {
155152
assertItemsConvertedProperly(result2.requestItems().get(TABLE_NAME), 4);
156153

157154
runner.assertAllFlowFilesTransferred(PutDynamoDBRecord.REL_SUCCESS, 1);
158-
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDynamoDBRecord.REL_SUCCESS).get(0);
155+
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDynamoDBRecord.REL_SUCCESS).getFirst();
159156
Assertions.assertEquals("2", flowFile.getAttribute(PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE));
160157
}
161158

@@ -168,7 +165,7 @@ public void testThroughputIssue() throws Exception {
168165
runner.run();
169166

170167
runner.assertAllFlowFilesTransferred(PutDynamoDBRecord.REL_UNPROCESSED, 1);
171-
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDynamoDBRecord.REL_UNPROCESSED).get(0);
168+
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDynamoDBRecord.REL_UNPROCESSED).getFirst();
172169
Assertions.assertEquals("1", flowFile.getAttribute(PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE));
173170
}
174171

@@ -183,7 +180,7 @@ public void testRetryAfterUnprocessed() throws Exception {
183180
Assertions.assertEquals(4, captor.getValue().requestItems().get(TABLE_NAME).size());
184181

185182
runner.assertAllFlowFilesTransferred(PutDynamoDBRecord.REL_SUCCESS, 1);
186-
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDynamoDBRecord.REL_SUCCESS).get(0);
183+
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDynamoDBRecord.REL_SUCCESS).getFirst();
187184
Assertions.assertEquals("2", flowFile.getAttribute(PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE));
188185
}
189186

@@ -196,7 +193,7 @@ public void testErrorDuringInsertion() throws Exception {
196193
runner.run();
197194

198195
runner.assertAllFlowFilesTransferred(PutDynamoDBRecord.REL_FAILURE, 1);
199-
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDynamoDBRecord.REL_FAILURE).get(0);
196+
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDynamoDBRecord.REL_FAILURE).getFirst();
200197
Assertions.assertEquals("0", flowFile.getAttribute(PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE));
201198
}
202199

@@ -212,7 +209,7 @@ public void testGeneratedPartitionKey() throws Exception {
212209
final BatchWriteItemRequest result = captor.getValue();
213210
Assertions.assertEquals(1, result.requestItems().get(TABLE_NAME).size());
214211

215-
final Map<String, AttributeValue> item = result.requestItems().get(TABLE_NAME).iterator().next().putRequest().item();
212+
final Map<String, AttributeValue> item = result.requestItems().get(TABLE_NAME).getFirst().putRequest().item();
216213
Assertions.assertEquals(4, item.size());
217214
Assertions.assertEquals(string("P0"), item.get("partition"));
218215
assertTrue(item.containsKey("generated"));
@@ -317,7 +314,7 @@ private void assertItemsConvertedProperly(final Collection<WriteRequest> writeRe
317314
private void setInsertionError() {
318315
final BatchWriteItemResponse outcome = mock(BatchWriteItemResponse.class);
319316
final Map<String, List<WriteRequest>> unprocessedItems = new HashMap<>();
320-
final List<WriteRequest> writeResults = Arrays.asList(mock(WriteRequest.class));
317+
final List<WriteRequest> writeResults = Collections.singletonList(mock(WriteRequest.class));
321318
unprocessedItems.put("test", writeResults);
322319
when(outcome.unprocessedItems()).thenReturn(unprocessedItems);
323320
when(outcome.hasUnprocessedItems()).thenReturn(true);
@@ -332,17 +329,14 @@ private void setServerError() {
332329
private void setExceedThroughputAtGivenChunk(final int chunkToFail) {
333330
final AtomicInteger numberOfCalls = new AtomicInteger(0);
334331

335-
when(client.batchWriteItem(captor.capture())).then(new Answer<Object>() {
336-
@Override
337-
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
338-
final int calls = numberOfCalls.incrementAndGet();
339-
340-
if (calls >= chunkToFail) {
341-
throw ProvisionedThroughputExceededException.builder().message("Throughput exceeded")
342-
.awsErrorDetails(AwsErrorDetails.builder().errorCode("error code").errorMessage("error message").build()).build();
343-
} else {
344-
return mock(BatchWriteItemResponse.class);
345-
}
332+
when(client.batchWriteItem(captor.capture())).then(invocationOnMock -> {
333+
final int calls = numberOfCalls.incrementAndGet();
334+
335+
if (calls >= chunkToFail) {
336+
throw ProvisionedThroughputExceededException.builder().message("Throughput exceeded")
337+
.awsErrorDetails(AwsErrorDetails.builder().errorCode("error code").errorMessage("error message").build()).build();
338+
} else {
339+
return mock(BatchWriteItemResponse.class);
346340
}
347341
});
348342
}

nifi-extension-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/JdbcCommonTestUtils.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package org.apache.nifi.util.db;
1818

1919
import org.mockito.Mockito;
20-
import org.mockito.invocation.InvocationOnMock;
2120
import org.mockito.stubbing.Answer;
2221

2322
import java.io.ByteArrayInputStream;
@@ -38,12 +37,7 @@ static ResultSet resultSetReturningMetadata(ResultSetMetaData metadata) throws S
3837
when(rs.getMetaData()).thenReturn(metadata);
3938

4039
final AtomicInteger counter = new AtomicInteger(1);
41-
Mockito.doAnswer(new Answer<Boolean>() {
42-
@Override
43-
public Boolean answer(InvocationOnMock invocation) throws Throwable {
44-
return counter.getAndDecrement() > 0;
45-
}
46-
}).when(rs).next();
40+
Mockito.doAnswer((Answer<Boolean>) invocation -> counter.getAndDecrement() > 0).when(rs).next();
4741

4842
return rs;
4943
}

nifi-extension-bundles/nifi-extension-utils/nifi-event-put/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -379,12 +379,7 @@ public synchronized void setNumMessages(final long msgCount) {
379379
}
380380

381381
private void transferRanges(final List<Range> ranges, final Relationship relationship) {
382-
Collections.sort(ranges, new Comparator<Range>() {
383-
@Override
384-
public int compare(final Range o1, final Range o2) {
385-
return Long.compare(o1.getStart(), o2.getStart());
386-
}
387-
});
382+
ranges.sort(Comparator.comparingLong(Range::getStart));
388383

389384
for (int i = 0; i < ranges.size(); i++) {
390385
Range range = ranges.get(i);

nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/flow/resource/hadoop/HDFSExternalResourceProvider.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -262,12 +262,7 @@ private void checkHdfsUriForTimeout(final Configuration config) throws IOExcepti
262262

263263
private FileSystem getFileSystemAsUser(final Configuration config, final UserGroupInformation ugi) throws IOException {
264264
try {
265-
return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
266-
@Override
267-
public FileSystem run() throws Exception {
268-
return FileSystem.get(config);
269-
}
270-
});
265+
return ugi.doAs((PrivilegedExceptionAction<FileSystem>) () -> FileSystem.get(config));
271266
} catch (final InterruptedException e) {
272267
throw new IOException("Unable to create file system: " + e.getMessage(), e);
273268
}

nifi-extension-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java

Lines changed: 33 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -91,56 +91,50 @@ private MessageConsumer createMessageConsumer(final Session session, final Strin
9191
*/
9292
public void consumeMessageSet(final String destinationName, String errorQueueName, final boolean durable, final boolean shared, final String subscriptionName, final String messageSelector,
9393
final String charset, final int batchSize, final Consumer<List<JMSResponse>> messageSetConsumer) {
94-
doWithJmsTemplate(destinationName, durable, shared, subscriptionName, messageSelector, new MessageReceiver() {
95-
@Override
96-
public void consume(Session session, MessageConsumer messageConsumer) throws JMSException {
97-
final List<JMSResponse> jmsResponses = new ArrayList<>();
98-
int batchCounter = 0;
99-
100-
JMSResponse response;
101-
while (batchCounter < batchSize && (response = receiveMessage(session, messageConsumer, charset, errorQueueName)) != null) {
102-
response.setBatchOrder(batchCounter);
103-
jmsResponses.add(response);
104-
batchCounter++;
105-
}
94+
doWithJmsTemplate(destinationName, durable, shared, subscriptionName, messageSelector, (session, messageConsumer) -> {
95+
final List<JMSResponse> jmsResponses = new ArrayList<>();
96+
int batchCounter = 0;
97+
98+
JMSResponse response;
99+
while (batchCounter < batchSize && (response = receiveMessage(session, messageConsumer, charset, errorQueueName)) != null) {
100+
response.setBatchOrder(batchCounter);
101+
jmsResponses.add(response);
102+
batchCounter++;
103+
}
106104

107-
if (!jmsResponses.isEmpty()) {
108-
// Provide the JMSResponse to the processor to handle. It is the responsibility of the
109-
// processor to handle acknowledgment of the message (if Client Acknowledge), and it is
110-
// the responsibility of the processor to handle closing the Message Consumer.
111-
// Both of these actions can be handled by calling the acknowledge() or reject() methods of
112-
// the JMSResponse.
113-
messageSetConsumer.accept(jmsResponses);
114-
}
105+
if (!jmsResponses.isEmpty()) {
106+
// Provide the JMSResponse to the processor to handle. It is the responsibility of the
107+
// processor to handle acknowledgment of the message (if Client Acknowledge), and it is
108+
// the responsibility of the processor to handle closing the Message Consumer.
109+
// Both of these actions can be handled by calling the acknowledge() or reject() methods of
110+
// the JMSResponse.
111+
messageSetConsumer.accept(jmsResponses);
115112
}
116113
});
117114
}
118115

119116
private void doWithJmsTemplate(String destinationName, boolean durable, boolean shared, String subscriptionName, String messageSelector, MessageReceiver messageReceiver) {
120-
this.jmsTemplate.execute(new SessionCallback<Void>() {
121-
@Override
122-
public Void doInJms(final Session session) throws JMSException {
117+
this.jmsTemplate.execute((SessionCallback<Void>) session -> {
123118

124-
final MessageConsumer messageConsumer = createMessageConsumer(session, destinationName, durable, shared, subscriptionName, messageSelector);
119+
final MessageConsumer messageConsumer = createMessageConsumer(session, destinationName, durable, shared, subscriptionName, messageSelector);
120+
try {
121+
messageReceiver.consume(session, messageConsumer);
122+
} catch (Exception e) {
123+
// We need to call recover to ensure that in the event of
124+
// abrupt end or exception the current session will stop message
125+
// delivery and restart with the oldest unacknowledged message
125126
try {
126-
messageReceiver.consume(session, messageConsumer);
127-
} catch (Exception e) {
128-
// We need to call recover to ensure that in the event of
129-
// abrupt end or exception the current session will stop message
130-
// delivery and restart with the oldest unacknowledged message
131-
try {
132-
session.recover();
133-
} catch (Exception e1) {
134-
// likely the session is closed...need to catch this so that the root cause of failure is propagated
135-
processLog.debug("Failed to recover JMS session while handling initial error. The recover error is: ", e1);
136-
}
137-
138-
JmsUtils.closeMessageConsumer(messageConsumer);
139-
throw e;
127+
session.recover();
128+
} catch (Exception e1) {
129+
// likely the session is closed...need to catch this so that the root cause of failure is propagated
130+
processLog.debug("Failed to recover JMS session while handling initial error. The recover error is: ", e1);
140131
}
141132

142-
return null;
133+
JmsUtils.closeMessageConsumer(messageConsumer);
134+
throw e;
143135
}
136+
137+
return null;
144138
}, true);
145139
}
146140

0 commit comments

Comments
 (0)