Skip to content

Commit a339750

Browse files
authored
NIFI-15480 Added ability to drop FlowFiles selectively using a Predicate (#10848)
* NIFI-15480: Added ability to drop FlowFiles selectively using a Predicate Added system tests to verify selective dropping of flowfiles from a Connector. Also made significant updates so that diagnostic bundles are captured when a system test fails and diagnostic bundle also contains details of each Connection's FlowFileQueue. This was necessary in aiding the debugging of the new feature but is also something that has been missing for some time. * NIFI-15480: Ensure that when we perform a selective drop on a FlowFileQueue that we pass appropriate SWAP_FILE_DELETED / SWAP_FILE_RENAMED events to the FlowFile Repository
1 parent a65897d commit a339750

File tree

54 files changed

+2257
-125
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+2257
-125
lines changed

nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/HashMapSnapshot.java

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -175,14 +175,18 @@ public void update(final Collection<T> records) {
175175

176176
switch (updateType) {
177177
case DELETE:
178-
recordMap.remove(recordId);
178+
if (recordId != null) {
179+
recordMap.remove(recordId);
180+
}
179181
break;
180182
case SWAP_OUT:
181183
final String location = serdeFactory.getLocation(record);
182184
if (location == null) {
183185
logger.error(logMessage, recordId, UpdateType.SWAP_OUT, "Swapped Out to", "lost");
184186
} else {
185-
recordMap.remove(recordId);
187+
if (recordId != null) {
188+
recordMap.remove(recordId);
189+
}
186190
this.swapLocations.add(location);
187191
}
188192
break;
@@ -193,10 +197,30 @@ public void update(final Collection<T> records) {
193197
} else {
194198
swapLocations.remove(swapLocation);
195199
}
196-
recordMap.put(recordId, record);
200+
if (recordId != null) {
201+
recordMap.put(recordId, record);
202+
}
203+
break;
204+
case SWAP_FILE_DELETED:
205+
final String deletedSwapLocation = serdeFactory.getLocation(record);
206+
if (deletedSwapLocation != null) {
207+
swapLocations.remove(deletedSwapLocation);
208+
}
209+
break;
210+
case SWAP_FILE_RENAMED:
211+
final String originalLocation = serdeFactory.getOriginalLocation(record);
212+
final String newLocation = serdeFactory.getLocation(record);
213+
if (originalLocation != null) {
214+
swapLocations.remove(originalLocation);
215+
}
216+
if (newLocation != null) {
217+
swapLocations.add(newLocation);
218+
}
197219
break;
198220
default:
199-
recordMap.put(recordId, record);
221+
if (recordId != null) {
222+
recordMap.put(recordId, record);
223+
}
200224
break;
201225
}
202226
}
@@ -209,6 +233,9 @@ public int getRecordCount() {
209233

210234
@Override
211235
public T lookup(final Object recordId) {
236+
if (recordId == null) {
237+
return null;
238+
}
212239
return recordMap.get(recordId);
213240
}
214241

nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -477,8 +477,10 @@ public JournalRecovery recoverRecords(final Map<Object, T> recordMap, final Set<
477477

478478
switch (updateType) {
479479
case DELETE: {
480-
idsRemoved.add(recordId);
481-
transactionRecordMap.remove(recordId);
480+
if (recordId != null) {
481+
idsRemoved.add(recordId);
482+
transactionRecordMap.remove(recordId);
483+
}
482484
break;
483485
}
484486
case SWAP_IN: {
@@ -488,7 +490,9 @@ public JournalRecovery recoverRecords(final Map<Object, T> recordMap, final Set<
488490
} else {
489491
swapLocationsRemoved.add(location);
490492
swapLocationsAdded.remove(location);
491-
transactionRecordMap.put(recordId, record);
493+
if (recordId != null) {
494+
transactionRecordMap.put(recordId, record);
495+
}
492496
}
493497
break;
494498
}
@@ -499,15 +503,40 @@ public JournalRecovery recoverRecords(final Map<Object, T> recordMap, final Set<
499503
} else {
500504
swapLocationsRemoved.remove(location);
501505
swapLocationsAdded.add(location);
502-
idsRemoved.add(recordId);
503-
transactionRecordMap.remove(recordId);
506+
if (recordId != null) {
507+
idsRemoved.add(recordId);
508+
transactionRecordMap.remove(recordId);
509+
}
504510
}
505511

506512
break;
507513
}
514+
case SWAP_FILE_DELETED: {
515+
final String location = serde.getLocation(record);
516+
if (location != null) {
517+
swapLocationsRemoved.add(location);
518+
swapLocationsAdded.remove(location);
519+
}
520+
break;
521+
}
522+
case SWAP_FILE_RENAMED: {
523+
final String originalLocation = serde.getOriginalLocation(record);
524+
final String newLocation = serde.getLocation(record);
525+
if (originalLocation != null) {
526+
swapLocationsRemoved.add(originalLocation);
527+
swapLocationsAdded.remove(originalLocation);
528+
}
529+
if (newLocation != null) {
530+
swapLocationsAdded.add(newLocation);
531+
swapLocationsRemoved.remove(newLocation);
532+
}
533+
break;
534+
}
508535
default: {
509-
transactionRecordMap.put(recordId, record);
510-
idsRemoved.remove(recordId);
536+
if (recordId != null) {
537+
transactionRecordMap.put(recordId, record);
538+
idsRemoved.remove(recordId);
539+
}
511540
break;
512541
}
513542
}

nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDe.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,19 @@ default void readHeader(DataInputStream in) throws IOException {
136136
*/
137137
String getLocation(T record);
138138

139+
/**
140+
* Returns the original external location of the given record; this is used when a
141+
* swap file is renamed. For {@link UpdateType#SWAP_FILE_RENAMED} records, this
142+
* returns the original location before the rename, while {@link #getLocation(Object)}
143+
* returns the new location after the rename.
144+
*
145+
* @param record to get original location of
146+
* @return original location, or null if not applicable
147+
*/
148+
default String getOriginalLocation(T record) {
149+
return null;
150+
}
151+
139152
/**
140153
* Returns the version that this SerDe will use when writing. This used used
141154
* when serializing/deserializing the edit logs so that if the version

nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDeFactory.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,17 @@ public interface SerDeFactory<T> {
5757
* @return location
5858
*/
5959
String getLocation(T record);
60+
61+
/**
62+
* Returns the original external location of the given record; this is used when a
63+
* swap file is renamed. For {@link UpdateType#SWAP_FILE_RENAMED} records, this
64+
* returns the original location before the rename, while {@link #getLocation(Object)}
65+
* returns the new location after the rename.
66+
*
67+
* @param record to get original location of
68+
* @return original location, or null if not applicable
69+
*/
70+
default String getOriginalLocation(T record) {
71+
return null;
72+
}
6073
}

nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/UpdateType.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,13 @@ public enum UpdateType {
4545
* Used to indicate that a Record that was previously Swapped Out is now
4646
* being Swapped In
4747
*/
48-
SWAP_IN;
48+
SWAP_IN,
49+
/**
50+
* Used to indicate that a Swap File has been deleted
51+
*/
52+
SWAP_FILE_DELETED,
53+
/**
54+
* Used to indicate that a Swap File has been renamed
55+
*/
56+
SWAP_FILE_RENAMED;
4957
}

nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616
*/
1717
package org.apache.nifi.controller.queue;
1818

19+
import org.apache.nifi.components.connector.DropFlowFileSummary;
1920
import org.apache.nifi.controller.repository.FlowFileRecord;
2021
import org.apache.nifi.controller.repository.SwapSummary;
2122
import org.apache.nifi.controller.status.FlowFileAvailability;
23+
import org.apache.nifi.flowfile.FlowFile;
2224
import org.apache.nifi.flowfile.FlowFilePrioritizer;
2325
import org.apache.nifi.processor.FlowFileFilter;
2426

@@ -27,6 +29,7 @@
2729
import java.util.List;
2830
import java.util.Set;
2931
import java.util.concurrent.TimeUnit;
32+
import java.util.function.Predicate;
3033

3134
public interface FlowFileQueue {
3235

@@ -217,6 +220,18 @@ public interface FlowFileQueue {
217220
*/
218221
DropFlowFileStatus cancelDropFlowFileRequest(String requestIdentifier);
219222

223+
/**
224+
* Synchronously drops all FlowFiles in this queue that match the given predicate. This method filters
225+
* FlowFiles in the active queue, swap queue, and any swapped-out swap files. The FlowFile Repository
226+
* and Provenance Repository are updated atomically after all matching FlowFiles have been identified.
227+
*
228+
* @param predicate the predicate used to determine which FlowFiles should be dropped; FlowFiles for which
229+
* the predicate returns <code>true</code> will be dropped
230+
* @return a summary of the FlowFiles that were dropped, including the count and total size in bytes
231+
* @throws IOException if an error occurs while reading or writing swap files
232+
*/
233+
DropFlowFileSummary dropFlowFiles(Predicate<FlowFile> predicate) throws IOException;
234+
220235
/**
221236
* <p>
222237
* Initiates a request to obtain a listing of FlowFiles in this queue. This method returns a

nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LocalQueuePartitionDiagnostics.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,14 @@ public interface LocalQueuePartitionDiagnostics {
2929
boolean isAnyActiveFlowFilePenalized();
3030

3131
boolean isAllActiveFlowFilesPenalized();
32+
33+
/**
34+
* @return the QueueSize representing the penalized FlowFiles (count and total content size)
35+
*/
36+
QueueSize getPenalizedQueueSize();
37+
38+
/**
39+
* @return the total QueueSize across all swap files (count and total content size)
40+
*/
41+
QueueSize getTotalSwapFileQueueSize();
3242
}

nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,16 @@ public interface FlowFileSwapManager {
126126
*/
127127
void purge();
128128

129+
/**
130+
* Deletes the swap file at the given location without updating the FlowFile Repository.
131+
* This method is intended for use after the FlowFile Repository has already been updated
132+
* to reflect that the FlowFiles are no longer in this swap file.
133+
*
134+
* @param swapLocation the location of the swap file to delete
135+
* @throws IOException if unable to delete the swap file
136+
*/
137+
void deleteSwapFile(String swapLocation) throws IOException;
138+
129139
/**
130140
* Returns the ID of the queue that the given swap file belongs to
131141
* @param swapLocation the swap location

nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,14 @@ public interface RepositoryRecord {
8282
*/
8383
String getSwapLocation();
8484

85+
/**
86+
* @return For SWAP_FILE_RENAMED records, provides the original swap location before the rename.
87+
* For other record types, returns <code>null</code>.
88+
*/
89+
default String getOriginalSwapLocation() {
90+
return null;
91+
}
92+
8593
/**
8694
* @return a List of Content Claims that are "transient," meaning that they existed only for the
8795
* life of the Process Session in which they were created and should not be persisted.

nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,5 @@
2121
*/
2222
public enum RepositoryRecordType {
2323

24-
UPDATE, CREATE, DELETE, CONTENTMISSING, SWAP_IN, SWAP_OUT, CLEANUP_TRANSIENT_CLAIMS;
24+
UPDATE, CREATE, DELETE, CONTENTMISSING, SWAP_IN, SWAP_OUT, CLEANUP_TRANSIENT_CLAIMS, SWAP_FILE_DELETED, SWAP_FILE_RENAMED;
2525
}

0 commit comments

Comments
 (0)