Skip to content

Commit d393e57

Browse files
committed
NIFI-15440: Addressed issues that occurred after rebase
1 parent 208d6ff commit d393e57

File tree

3 files changed

+87
-21
lines changed

3 files changed

+87
-21
lines changed

nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -872,8 +872,7 @@ public List<ConnectorAction> getAvailableActions() {
872872
final List<ConnectorAction> actions = new ArrayList<>();
873873
final ConnectorState currentState = getCurrentState();
874874
final boolean dataQueued = activeFlowContext.getManagedProcessGroup().isDataQueued();
875-
final boolean stopped = currentState == ConnectorState.STOPPED || currentState == ConnectorState.DISABLED
876-
|| currentState == ConnectorState.UPDATED || currentState == ConnectorState.UPDATE_FAILED;
875+
final boolean stopped = isStopped();
877876

878877
actions.add(createStartAction(currentState, stopped));
879878
actions.add(createStopAction(currentState));
@@ -887,16 +886,40 @@ public List<ConnectorAction> getAvailableActions() {
887886
return actions;
888887
}
889888

889+
private boolean isStopped() {
890+
final ConnectorState currentState = getCurrentState();
891+
if (currentState == ConnectorState.STOPPED) {
892+
return true;
893+
}
894+
if (currentState == ConnectorState.UPDATED || currentState == ConnectorState.UPDATE_FAILED) {
895+
return !isActiveThread(getActiveFlowContext().getManagedProcessGroup());
896+
}
897+
return false;
898+
}
899+
900+
private boolean isActiveThread(final ProcessGroup processGroup) {
901+
for (final ProcessorNode processor : processGroup.getProcessors()) {
902+
if (processor.getActiveThreadCount() > 0) {
903+
return true;
904+
}
905+
}
906+
907+
for (final ProcessGroup childGroup : processGroup.getProcessGroups()) {
908+
if (isActiveThread(childGroup)) {
909+
return true;
910+
}
911+
}
912+
913+
return false;
914+
}
915+
890916
private ConnectorAction createStartAction(final ConnectorState currentState, final boolean stopped) {
891917
final boolean allowed;
892918
final String reason;
893919

894-
if (currentState == ConnectorState.DISABLED) {
895-
allowed = false;
896-
reason = "Connector is disabled";
897-
} else if (!stopped) {
920+
if (!stopped) {
898921
allowed = false;
899-
reason = "Connector must be stopped";
922+
reason = "Connector is not stopped";
900923
} else {
901924
allowed = true;
902925
reason = null;

nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.nifi.components.connector.secrets.ParameterProviderSecretsManager;
2828
import org.apache.nifi.components.connector.secrets.SecretsManager;
2929
import org.apache.nifi.components.connector.services.CounterService;
30+
import org.apache.nifi.components.connector.ConnectorValidationTrigger;
3031
import org.apache.nifi.components.state.StateManagerProvider;
3132
import org.apache.nifi.components.validation.ValidationState;
3233
import org.apache.nifi.components.validation.ValidationStatus;
@@ -159,6 +160,7 @@ public void setup() {
159160
when(flowController.getFlowFileEventRepository()).thenReturn(mock(FlowFileEventRepository.class));
160161
when(flowController.getConnectorRepository()).thenReturn(connectorRepository);
161162
when(flowController.getValidationTrigger()).thenReturn(mock(ValidationTrigger.class));
163+
when(flowController.getConnectorValidationTrigger()).thenReturn(mock(ConnectorValidationTrigger.class));
162164

163165
doAnswer(invocation -> {
164166
return createConnection(invocation.getArgument(0), invocation.getArgument(1), invocation.getArgument(2), invocation.getArgument(3), invocation.getArgument(4));

nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java

Lines changed: 55 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,17 @@
2121
import org.apache.nifi.components.connector.AbstractConnector;
2222
import org.apache.nifi.components.connector.ConfigurationStep;
2323
import org.apache.nifi.components.connector.components.FlowContext;
24+
import org.apache.nifi.flow.Bundle;
2425
import org.apache.nifi.flow.ConnectableComponent;
2526
import org.apache.nifi.flow.ConnectableComponentType;
27+
import org.apache.nifi.flow.Position;
2628
import org.apache.nifi.flow.ScheduledState;
2729
import org.apache.nifi.flow.VersionedConnection;
2830
import org.apache.nifi.flow.VersionedExternalFlow;
2931
import org.apache.nifi.flow.VersionedProcessGroup;
3032
import org.apache.nifi.flow.VersionedProcessor;
3133

34+
import java.util.Collections;
3235
import java.util.List;
3336
import java.util.Map;
3437
import java.util.Set;
@@ -41,19 +44,16 @@ protected void onStepConfigured(final String stepName, final FlowContext working
4144

4245
@Override
4346
public VersionedExternalFlow getInitialFlow() {
44-
final VersionedProcessor generate = new VersionedProcessor();
45-
generate.setName("GenerateFlowFile");
46-
generate.setType("org.apache.nifi.processors.tests.system.GenerateFlowFile");
47-
generate.setIdentifier("gen-1");
48-
generate.setGroupIdentifier("1234");
49-
generate.setProperties(Map.of("File Size", "1 KB"));
50-
51-
final VersionedProcessor terminate = new VersionedProcessor();
52-
terminate.setName("TerminateFlowFile");
53-
terminate.setType("org.apache.nifi.processors.tests.system.TerminateFlowFile");
54-
terminate.setIdentifier("term-1");
55-
terminate.setGroupIdentifier("1234");
56-
terminate.setScheduledState(ScheduledState.DISABLED);
47+
final Bundle bundle = new Bundle();
48+
bundle.setGroup("org.apache.nifi");
49+
bundle.setArtifact("nifi-system-test-extensions-nar");
50+
bundle.setVersion("2.7.0-SNAPSHOT");
51+
52+
final VersionedProcessor generate = createVersionedProcessor("gen-1", "1234", "GenerateFlowFile",
53+
"org.apache.nifi.processors.tests.system.GenerateFlowFile", bundle, Map.of("File Size", "1 KB"), ScheduledState.RUNNING);
54+
55+
final VersionedProcessor terminate = createVersionedProcessor("term-1", "1234", "TerminateFlowFile",
56+
"org.apache.nifi.processors.tests.system.TerminateFlowFile", bundle, Collections.emptyMap(), ScheduledState.DISABLED);
5757

5858
final ConnectableComponent source = new ConnectableComponent();
5959
source.setId(generate.getIdentifier());
@@ -66,10 +66,18 @@ public VersionedExternalFlow getInitialFlow() {
6666
destination.setGroupId("1234");
6767

6868
final VersionedConnection connection = new VersionedConnection();
69+
connection.setIdentifier("generate-to-terminate-1");
6970
connection.setSource(source);
7071
connection.setDestination(destination);
7172
connection.setGroupIdentifier("1234");
72-
connection.setIdentifier("generate-to-terminate-1");
73+
connection.setSelectedRelationships(Set.of("success"));
74+
connection.setBackPressureDataSizeThreshold("1 GB");
75+
connection.setBackPressureObjectThreshold(10_000L);
76+
connection.setBends(Collections.emptyList());
77+
connection.setLabelIndex(1);
78+
connection.setFlowFileExpiration("0 sec");
79+
connection.setPrioritizers(Collections.emptyList());
80+
connection.setzIndex(1L);
7381

7482
final VersionedProcessGroup rootGroup = new VersionedProcessGroup();
7583
rootGroup.setName("Data Queuing Connector");
@@ -79,6 +87,7 @@ public VersionedExternalFlow getInitialFlow() {
7987

8088
final VersionedExternalFlow flow = new VersionedExternalFlow();
8189
flow.setFlowContents(rootGroup);
90+
flow.setParameterContexts(Collections.emptyMap());
8291
return flow;
8392
}
8493

@@ -95,4 +104,36 @@ public List<ConfigurationStep> getConfigurationSteps() {
95104
@Override
96105
public void applyUpdate(final FlowContext workingFlowContext, final FlowContext activeFlowContext) {
97106
}
107+
108+
private VersionedProcessor createVersionedProcessor(final String identifier, final String groupIdentifier, final String name,
109+
final String type, final Bundle bundle, final Map<String, String> properties,
110+
final ScheduledState scheduledState) {
111+
final VersionedProcessor processor = new VersionedProcessor();
112+
processor.setIdentifier(identifier);
113+
processor.setGroupIdentifier(groupIdentifier);
114+
processor.setName(name);
115+
processor.setType(type);
116+
processor.setBundle(bundle);
117+
processor.setProperties(properties);
118+
processor.setPropertyDescriptors(Collections.emptyMap());
119+
processor.setScheduledState(scheduledState);
120+
121+
processor.setBulletinLevel("WARN");
122+
processor.setSchedulingStrategy("TIMER_DRIVEN");
123+
processor.setSchedulingPeriod("0 sec");
124+
processor.setExecutionNode("ALL");
125+
processor.setConcurrentlySchedulableTaskCount(1);
126+
processor.setPenaltyDuration("30 sec");
127+
processor.setYieldDuration("1 sec");
128+
processor.setRunDurationMillis(0L);
129+
processor.setPosition(new Position(0, 0));
130+
131+
processor.setAutoTerminatedRelationships(Collections.emptySet());
132+
processor.setRetryCount(10);
133+
processor.setRetriedRelationships(Collections.emptySet());
134+
processor.setBackoffMechanism("PENALIZE_FLOWFILE");
135+
processor.setMaxBackoffPeriod("10 mins");
136+
137+
return processor;
138+
}
98139
}

0 commit comments

Comments
 (0)