Skip to content

Commit 1abd854

Browse files
committed
NIFI-15511: Added endpoints for purging all FlowFiles for a given Connector; added method to ConnectorClient to call these endpoints; updated system tests to use these endpoints when tearing down flows; some bug fixes
1 parent 9e21398 commit 1abd854

File tree

11 files changed

+397
-1
lines changed

11 files changed

+397
-1
lines changed

nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1591,17 +1591,29 @@ private DropFlowFileStatus handleDropAllFlowFiles(String dropRequestId, Function
15911591
aggregateDropFlowFileStatus.setState(null);
15921592

15931593
AtomicBoolean processedAtLeastOne = new AtomicBoolean(false);
1594+
final List<CompletableFuture<Void>> completionFutures = new ArrayList<>();
15941595

15951596
connections.stream()
15961597
.map(Connection::getFlowFileQueue)
15971598
.map(function::apply)
15981599
.forEach(additionalDropFlowFileStatus -> {
15991600
aggregate(aggregateDropFlowFileStatus, additionalDropFlowFileStatus);
16001601
processedAtLeastOne.set(true);
1602+
completionFutures.add(additionalDropFlowFileStatus.getCompletionFuture());
16011603
});
16021604

16031605
if (processedAtLeastOne.get()) {
16041606
resultDropFlowFileStatus = aggregateDropFlowFileStatus;
1607+
1608+
// When all individual drop requests complete, mark the aggregate as complete
1609+
CompletableFuture.allOf(completionFutures.toArray(new CompletableFuture[0]))
1610+
.whenComplete((result, throwable) -> {
1611+
if (throwable != null) {
1612+
aggregateDropFlowFileStatus.setState(DropFlowFileState.FAILURE, throwable.getMessage());
1613+
} else {
1614+
aggregateDropFlowFileStatus.setState(DropFlowFileState.COMPLETE);
1615+
}
1616+
});
16051617
} else {
16061618
resultDropFlowFileStatus = null;
16071619
}

nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,12 @@ public interface ConnectorNode extends ComponentAuthorizable, VersionedComponent
211211
*/
212212
void verifyCancelDrainFlowFiles() throws IllegalStateException;
213213

214+
/**
215+
* Verifies that the Connector can have its FlowFiles purged.
216+
* @throws IllegalStateException if not in a state where FlowFiles can be purged
217+
*/
218+
void verifyCanPurgeFlowFiles() throws IllegalStateException;
219+
214220
/**
215221
* Purges all FlowFiles from the Connector, immediately dropping the data.
216222
*

nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,19 @@ public void verifyCancelDrainFlowFiles() throws IllegalStateException {
494494
}
495495
}
496496

497+
@Override
498+
public void verifyCanPurgeFlowFiles() throws IllegalStateException {
499+
final ConnectorState desiredState = getDesiredState();
500+
if (desiredState != ConnectorState.STOPPED) {
501+
throw new IllegalStateException("Cannot purge FlowFiles for " + this + " because its desired state is currently " + desiredState + "; it must be STOPPED.");
502+
}
503+
504+
final ConnectorState currentState = getCurrentState();
505+
if (currentState != ConnectorState.STOPPED) {
506+
throw new IllegalStateException("Cannot purge FlowFiles for " + this + " because its current state is " + currentState + "; it must be STOPPED.");
507+
}
508+
}
509+
497510
@Override
498511
public Future<Void> purgeFlowFiles(final String requestor) {
499512
requireStopped("purge FlowFiles", ConnectorState.PURGING);
@@ -520,7 +533,7 @@ private void requireStopped(final String action, final ConnectorState newState)
520533
while (!stateUpdated) {
521534
final ConnectorState currentState = getCurrentState();
522535
if (currentState != ConnectorState.STOPPED) {
523-
throw new IllegalStateException("Cannot " + action + " for " + this + " because its current state is currently " + currentState + "; it must be STOPPED.");
536+
throw new IllegalStateException("Cannot " + action + " for " + this + " because its current state is " + currentState + "; it must be STOPPED.");
524537
}
525538

526539
stateUpdated = stateTransition.trySetCurrentState(ConnectorState.STOPPED, newState);

nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,22 @@ Set<ControllerServiceEntity> getConnectorControllerServices(String connectorId,
251251

252252
Optional<Asset> getConnectorAsset(String assetId);
253253

254+
/**
255+
* Verifies that the connector is in a state where FlowFiles can be purged.
256+
*
257+
* @param connectorId the connector ID
258+
* @throws IllegalStateException if the connector is not in a state where FlowFiles can be purged
259+
*/
260+
void verifyPurgeConnectorFlowFiles(String connectorId);
261+
262+
/**
263+
* Purges all FlowFiles from the connector.
264+
*
265+
* @param connectorId the connector ID
266+
* @param requestor the identity of the user requesting the purge (used for provenance events)
267+
*/
268+
void purgeConnectorFlowFiles(String connectorId, String requestor);
269+
254270
// ----------------------------------------
255271
// Synchronization methods
256272
// ----------------------------------------

nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3859,6 +3859,16 @@ public Optional<Asset> getConnectorAsset(final String assetId) {
38593859
return connectorDAO.getAsset(assetId);
38603860
}
38613861

3862+
@Override
3863+
public void verifyPurgeConnectorFlowFiles(final String connectorId) {
3864+
connectorDAO.verifyPurgeFlowFiles(connectorId);
3865+
}
3866+
3867+
@Override
3868+
public void purgeConnectorFlowFiles(final String connectorId, final String requestor) {
3869+
connectorDAO.purgeFlowFiles(connectorId, requestor);
3870+
}
3871+
38623872
@Override
38633873
public ReportingTaskEntity updateReportingTask(final Revision revision, final ReportingTaskDTO reportingTaskDTO) {
38643874
// get the component, ensure we have access to it, and perform the update request

nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
7878
import org.apache.nifi.web.api.dto.ConfigurationStepConfigurationDTO;
7979
import org.apache.nifi.web.api.dto.ConnectorDTO;
80+
import org.apache.nifi.web.api.dto.DropRequestDTO;
8081
import org.apache.nifi.web.api.dto.VerifyConnectorConfigStepRequestDTO;
8182
import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
8283
import org.apache.nifi.web.api.entity.AssetEntity;
@@ -88,6 +89,7 @@
8889
import org.apache.nifi.web.api.entity.ConnectorRunStatusEntity;
8990
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
9091
import org.apache.nifi.web.api.entity.ControllerServicesEntity;
92+
import org.apache.nifi.web.api.entity.DropRequestEntity;
9193
import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
9294
import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
9395
import org.apache.nifi.web.api.entity.SearchResultsEntity;
@@ -123,6 +125,7 @@ public class ConnectorResource extends ApplicationResource {
123125

124126
private static final Logger logger = LoggerFactory.getLogger(ConnectorResource.class);
125127
private static final String VERIFICATION_REQUEST_TYPE = "verification-request";
128+
private static final String PURGE_REQUEST_TYPE = "purge-request";
126129
private static final String FILENAME_HEADER = "Filename";
127130
private static final String CONTENT_TYPE_HEADER = "Content-Type";
128131
private static final String UPLOAD_CONTENT_TYPE = "application/octet-stream";
@@ -137,6 +140,8 @@ public class ConnectorResource extends ApplicationResource {
137140
private final RequestManager<VerifyConnectorConfigStepRequestEntity, List<ConfigVerificationResultDTO>> configVerificationRequestManager =
138141
new AsyncRequestManager<>(100, 1L, "Connector Configuration Step Verification");
139142

143+
private final RequestManager<ConnectorEntity, Void> purgeRequestManager = new AsyncRequestManager<>(100, 1L, "Connector FlowFile Purge");
144+
140145
@Context
141146
private ServletContext servletContext;
142147

@@ -792,6 +797,201 @@ public Response cancelDrain(
792797
);
793798
}
794799

800+
801+
@POST
802+
@Consumes(MediaType.WILDCARD)
803+
@Produces(MediaType.APPLICATION_JSON)
804+
@Path("/{id}/purge-requests")
805+
@Operation(
806+
summary = "Creates a request to purge the FlowFiles for this connector",
807+
responses = {
808+
@ApiResponse(
809+
responseCode = "202", description = "The request has been accepted. A HTTP response header will contain the URI where the response can be polled.",
810+
content = @Content(schema = @Schema(implementation = DropRequestEntity.class))
811+
),
812+
@ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
813+
@ApiResponse(responseCode = "401", description = "Client could not be authenticated."),
814+
@ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."),
815+
@ApiResponse(responseCode = "404", description = "The specified resource could not be found."),
816+
@ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.")
817+
},
818+
description = "This will create a request to purge all FlowFiles from the connector. The connector must be in a STOPPED state before purging can begin. "
819+
+ "This is an asynchronous operation. The client should poll the returned URI to get the status of the purge request.",
820+
security = {
821+
@SecurityRequirement(name = "Write - /connectors/{uuid}")
822+
}
823+
)
824+
public Response createPurgeRequest(
825+
@Parameter(
826+
description = "The connector id.",
827+
required = true
828+
)
829+
@PathParam("id") final String id) {
830+
831+
if (isReplicateRequest()) {
832+
return replicate(HttpMethod.POST);
833+
}
834+
835+
final ConnectorEntity requestConnectorEntity = new ConnectorEntity();
836+
requestConnectorEntity.setId(id);
837+
838+
return withWriteLock(
839+
serviceFacade,
840+
requestConnectorEntity,
841+
lookup -> {
842+
final Authorizable connector = lookup.getConnector(id);
843+
connector.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
844+
},
845+
() -> serviceFacade.verifyPurgeConnectorFlowFiles(id),
846+
(connectorEntity) -> performAsyncPurge(connectorEntity, id, NiFiUserUtils.getNiFiUser())
847+
);
848+
}
849+
850+
851+
@GET
852+
@Consumes(MediaType.WILDCARD)
853+
@Produces(MediaType.APPLICATION_JSON)
854+
@Path("/{id}/purge-requests/{purge-request-id}")
855+
@Operation(
856+
summary = "Gets the current status of a purge request for the specified connector",
857+
responses = {
858+
@ApiResponse(responseCode = "200", content = @Content(schema = @Schema(implementation = DropRequestEntity.class))),
859+
@ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
860+
@ApiResponse(responseCode = "401", description = "Client could not be authenticated."),
861+
@ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."),
862+
@ApiResponse(responseCode = "404", description = "The specified resource could not be found."),
863+
@ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.")
864+
},
865+
security = {
866+
@SecurityRequirement(name = "Only the user that submitted the request can get it")
867+
}
868+
)
869+
public Response getPurgeRequest(
870+
@Parameter(
871+
description = "The connector id.",
872+
required = true
873+
)
874+
@PathParam("id") final String connectorId,
875+
@Parameter(
876+
description = "The purge request id.",
877+
required = true
878+
)
879+
@PathParam("purge-request-id") final String purgeRequestId) {
880+
881+
if (isReplicateRequest()) {
882+
return replicate(HttpMethod.GET);
883+
}
884+
885+
final NiFiUser user = NiFiUserUtils.getNiFiUser();
886+
887+
final AsynchronousWebRequest<ConnectorEntity, Void> asyncRequest = purgeRequestManager.getRequest(PURGE_REQUEST_TYPE, purgeRequestId, user);
888+
final DropRequestEntity purgeRequestEntity = createPurgeRequestEntity(asyncRequest, connectorId, purgeRequestId);
889+
return generateOkResponse(purgeRequestEntity).build();
890+
}
891+
892+
893+
@DELETE
894+
@Consumes(MediaType.WILDCARD)
895+
@Produces(MediaType.APPLICATION_JSON)
896+
@Path("/{id}/purge-requests/{purge-request-id}")
897+
@Operation(
898+
summary = "Cancels and/or removes a request to purge the FlowFiles for this connector",
899+
responses = {
900+
@ApiResponse(responseCode = "200", content = @Content(schema = @Schema(implementation = DropRequestEntity.class))),
901+
@ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
902+
@ApiResponse(responseCode = "401", description = "Client could not be authenticated."),
903+
@ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."),
904+
@ApiResponse(responseCode = "404", description = "The specified resource could not be found."),
905+
@ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.")
906+
},
907+
security = {
908+
@SecurityRequirement(name = "Only the user that submitted the request can remove it")
909+
}
910+
)
911+
public Response removePurgeRequest(
912+
@Parameter(
913+
description = "The connector id.",
914+
required = true
915+
)
916+
@PathParam("id") final String connectorId,
917+
@Parameter(
918+
description = "The purge request id.",
919+
required = true
920+
)
921+
@PathParam("purge-request-id") final String purgeRequestId) {
922+
923+
if (isReplicateRequest()) {
924+
return replicate(HttpMethod.DELETE);
925+
}
926+
927+
final NiFiUser user = NiFiUserUtils.getNiFiUser();
928+
final boolean twoPhaseRequest = isTwoPhaseRequest(httpServletRequest);
929+
final boolean executionPhase = isExecutionPhase(httpServletRequest);
930+
931+
if (!twoPhaseRequest || executionPhase) {
932+
final AsynchronousWebRequest<ConnectorEntity, Void> asyncRequest = purgeRequestManager.removeRequest(PURGE_REQUEST_TYPE, purgeRequestId, user);
933+
934+
if (!asyncRequest.isComplete()) {
935+
asyncRequest.cancel();
936+
}
937+
938+
final DropRequestEntity purgeRequestEntity = createPurgeRequestEntity(asyncRequest, connectorId, purgeRequestId);
939+
return generateOkResponse(purgeRequestEntity).build();
940+
}
941+
942+
if (isValidationPhase(httpServletRequest)) {
943+
purgeRequestManager.getRequest(PURGE_REQUEST_TYPE, purgeRequestId, user);
944+
return generateContinueResponse().build();
945+
} else if (isCancellationPhase(httpServletRequest)) {
946+
return generateOkResponse().build();
947+
} else {
948+
throw new IllegalStateException("This request does not appear to be part of the two phase commit.");
949+
}
950+
}
951+
952+
private Response performAsyncPurge(final ConnectorEntity connectorEntity, final String connectorId, final NiFiUser user) {
953+
final String requestId = generateUuid();
954+
logger.debug("Generated Purge Request with ID {} for Connector {}", requestId, connectorId);
955+
956+
final List<UpdateStep> updateSteps = Collections.singletonList(new StandardUpdateStep("Purge FlowFiles"));
957+
958+
final AsynchronousWebRequest<ConnectorEntity, Void> request =
959+
new StandardAsynchronousWebRequest<>(requestId, connectorEntity, connectorId, user, updateSteps);
960+
961+
final Consumer<AsynchronousWebRequest<ConnectorEntity, Void>> updateTask = asyncRequest -> {
962+
try {
963+
serviceFacade.purgeConnectorFlowFiles(connectorId, user.getIdentity());
964+
asyncRequest.markStepComplete(null);
965+
} catch (final Exception e) {
966+
logger.error("Failed to purge FlowFiles for Connector {}", connectorId, e);
967+
asyncRequest.fail("Failed to purge FlowFiles due to " + e);
968+
}
969+
};
970+
971+
purgeRequestManager.submitRequest(PURGE_REQUEST_TYPE, requestId, request, updateTask);
972+
973+
final DropRequestEntity purgeRequestEntity = createPurgeRequestEntity(request, connectorId, requestId);
974+
final URI location = URI.create(purgeRequestEntity.getDropRequest().getUri());
975+
return Response.status(Response.Status.ACCEPTED).location(location).entity(purgeRequestEntity).build();
976+
}
977+
978+
private DropRequestEntity createPurgeRequestEntity(final AsynchronousWebRequest<ConnectorEntity, Void> asyncRequest,
979+
final String connectorId, final String requestId) {
980+
final DropRequestDTO dto = new DropRequestDTO();
981+
dto.setId(requestId);
982+
dto.setUri(generateResourceUri("connectors", connectorId, "purge-requests", requestId));
983+
dto.setSubmissionTime(asyncRequest.getLastUpdated());
984+
dto.setLastUpdated(asyncRequest.getLastUpdated());
985+
dto.setPercentCompleted(asyncRequest.getPercentComplete());
986+
dto.setFinished(asyncRequest.isComplete());
987+
dto.setFailureReason(asyncRequest.getFailureReason());
988+
dto.setState(asyncRequest.getState());
989+
990+
final DropRequestEntity entity = new DropRequestEntity();
991+
entity.setDropRequest(dto);
992+
return entity;
993+
}
994+
795995
/**
796996
* Gets the configuration step names for the specified connector.
797997
*

nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ public interface ConnectorDAO {
5454

5555
void verifyCancelDrainFlowFile(String id);
5656

57+
void verifyPurgeFlowFiles(String id);
58+
59+
void purgeFlowFiles(String id, String requestor);
60+
5761
void updateConnectorConfigurationStep(String id, String configurationStepName, ConfigurationStepConfigurationDTO configurationStepConfiguration);
5862

5963
void applyConnectorUpdate(String id, ConnectorUpdateContext updateContext);

nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import java.util.Map;
5454
import java.util.Optional;
5555
import java.util.Set;
56+
import java.util.concurrent.ExecutionException;
5657
import java.util.stream.Collectors;
5758

5859
@Repository
@@ -150,6 +151,25 @@ public void verifyCancelDrainFlowFile(final String id) {
150151
connector.verifyCancelDrainFlowFiles();
151152
}
152153

154+
@Override
155+
public void verifyPurgeFlowFiles(final String id) {
156+
final ConnectorNode connector = getConnector(id);
157+
connector.verifyCanPurgeFlowFiles();
158+
}
159+
160+
@Override
161+
public void purgeFlowFiles(final String id, final String requestor) {
162+
final ConnectorNode connector = getConnector(id);
163+
try {
164+
connector.purgeFlowFiles(requestor).get();
165+
} catch (final InterruptedException e) {
166+
Thread.currentThread().interrupt();
167+
throw new IllegalStateException("Thread was interrupted while purging FlowFiles for Connector " + id, e);
168+
} catch (final ExecutionException e) {
169+
throw new IllegalStateException("Failed to purge FlowFiles for Connector " + id, e.getCause());
170+
}
171+
}
172+
153173
@Override
154174
public void updateConnectorConfigurationStep(final String id, final String configurationStepName, final ConfigurationStepConfigurationDTO configurationStepDto) {
155175
final ConnectorNode connector = getConnector(id);

0 commit comments

Comments
 (0)