Skip to content

Commit 7dc9cf2

Browse files
markap14bobpaulin
andauthored
NIFI-15514: Ensure that Parameter Contexts are assigned to all Proces… (#10815)
* NIFI-15514: Ensure that Parameter Contexts are assigned to all Process Groups in a Connector and not just the top-level group. Ensure that all components are started when Connector starts instead of just Processors and Controller Services - When Working Context is recreated, ensure that we appropriately apply Parameter Context to newly created Process Group(s) - Ensure that when we cleanup unused assets for Connectors that we consider any assets that are referenced in either the Working or Active context instead of just the Active context - Ensure that when we stop Process Group we call all tasks in background threads instead of calling .thenRun which could potentially run in the foreground thread * NIFI-15514: Update parameter context assignment to occur during sync. * NIFI-15514: Fixes around ensuring that processors/controller services are properly configured and notified of any configuration changes when parameters change - Removed the updateParameterContexts from ProcessGroup.updateFlow, which was added in a previous commit as we went a different direction for the fix --------- Co-authored-by: Bob Paulin <bob@bobpaulin.com>
1 parent 4e3292b commit 7dc9cf2

File tree

23 files changed

+1213
-109
lines changed

23 files changed

+1213
-109
lines changed

nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1661,10 +1661,10 @@ private void initiateStart(final ScheduledExecutorService taskScheduler, final l
16611661
if (validationStatus != ValidationStatus.VALID) {
16621662
LOG.debug("Cannot start {} because Processor is currently not valid; will try again after 5 seconds", StandardProcessorNode.this);
16631663

1664-
startupAttemptCount.incrementAndGet();
1665-
if (startupAttemptCount.get() == 240 || startupAttemptCount.get() % 7200 == 0) {
1664+
final long attempt = startupAttemptCount.getAndIncrement();
1665+
if (attempt % 7200 == 0) {
16661666
final ValidationState validationState = getValidationState();
1667-
procLog.error("Encountering difficulty starting. (Validation State is {}: {}). Will continue trying to start.",
1667+
procLog.warn("Encountering difficulty starting. (Validation State is {}: {}). Will continue trying to start.",
16681668
validationState, validationState.getValidationErrors());
16691669
}
16701670

nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/flow/AbstractFlowManager.java

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -648,21 +648,24 @@ public ParameterContext createParameterContext(final String id, final String nam
648648
final Map<String, Parameter> parameters, final List<String> inheritedContextIds,
649649
final ParameterProviderConfiguration parameterProviderConfiguration) {
650650

651-
return createParameterContext(id, name, description, parameters, inheritedContextIds, parameterProviderConfiguration, true);
651+
final ParameterReferenceManager referenceManager = new StandardParameterReferenceManager(this::getRootGroup);
652+
return createParameterContext(id, name, description, parameters, inheritedContextIds, parameterProviderConfiguration, referenceManager, true);
652653
}
653654

654655
protected ParameterContext createParameterContext(final String id, final String name, final String description,
655656
final Map<String, Parameter> parameters, final List<String> inheritedContextIds,
656-
final ParameterProviderConfiguration parameterProviderConfiguration, final boolean register) {
657+
final ParameterProviderConfiguration parameterProviderConfiguration, final ParameterReferenceManager referenceManager,
658+
final boolean register) {
657659

658-
final boolean namingConflict = parameterContextManager.getParameterContexts().stream()
660+
if (register) {
661+
final boolean namingConflict = parameterContextManager.getParameterContexts().stream()
659662
.anyMatch(paramContext -> paramContext.getName().equals(name));
660663

661-
if (namingConflict) {
662-
throw new IllegalStateException("Cannot create Parameter Context with name '" + name + "' because a Parameter Context already exists with that name");
664+
if (namingConflict) {
665+
throw new IllegalStateException("Cannot create Parameter Context with name '" + name + "' because a Parameter Context already exists with that name");
666+
}
663667
}
664668

665-
final ParameterReferenceManager referenceManager = new StandardParameterReferenceManager(this::getRootGroup);
666669
final ParameterContext parameterContext = new StandardParameterContext.Builder()
667670
.id(id)
668671
.name(name)
@@ -693,19 +696,13 @@ protected ParameterContext createParameterContext(final String id, final String
693696
}
694697

695698
@Override
696-
public ParameterContext duplicateParameterContext(final String id, final ParameterContext source) {
699+
public ParameterContext createEmptyParameterContext(final String id, final String name, final String description, final ProcessGroup rootGroup) {
697700
final Map<String, Parameter> parameterMap = new HashMap<>();
698-
for (final Parameter parameter : source.getParameters().values()) {
699-
parameterMap.put(parameter.getDescriptor().getName(), parameter);
700-
}
701-
702701
final List<String> inheritedContextIds = new ArrayList<>();
703-
for (final ParameterContext inherited : source.getInheritedParameterContexts()) {
704-
inheritedContextIds.add(inherited.getIdentifier());
705-
}
706702

707-
return createParameterContext(id, source.getName(), source.getDescription(),
708-
parameterMap, inheritedContextIds, source.getParameterProviderConfiguration(), false);
703+
final ParameterReferenceManager parameterReferenceManager = new StandardParameterReferenceManager(() -> rootGroup);
704+
return createParameterContext(id, name, description,
705+
parameterMap, inheritedContextIds, null, parameterReferenceManager, false);
709706
}
710707

711708
@Override

nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1385,6 +1385,10 @@ private ProcessGroup addProcessGroup(final ProcessGroup destination, final Versi
13851385

13861386
destination.addProcessGroup(group);
13871387

1388+
// Connectors will have a single parameter context so if we are creating a group set the context of the parent process group.
1389+
if (connectorId != null) {
1390+
group.setParameterContext(destination.getParameterContext());
1391+
}
13881392
synchronize(group, proposed, versionedParameterContexts, parameterProviderReferences, topLevelGroup, true);
13891393

13901394
return group;
@@ -2143,35 +2147,40 @@ private void verifyNotInherited(final String parameterContextId) {
21432147

21442148
private void updateParameterContext(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, VersionedParameterContext> versionedParameterContexts,
21452149
final Map<String, ParameterProviderReference> parameterProviderReferences, final ComponentIdGenerator componentIdGenerator) {
2146-
// Update the Parameter Context
2150+
2151+
// If proposed parameter context is null, set group's parameter context to null and we're done.
21472152
final ParameterContext currentParamContext = group.getParameterContext();
21482153
final String proposedParameterContextName = proposed.getParameterContextName();
2149-
if (proposedParameterContextName == null && currentParamContext != null) {
2154+
if (proposedParameterContextName == null) {
21502155
group.setParameterContext(null);
2151-
} else if (proposedParameterContextName != null) {
2152-
final VersionedParameterContext versionedParameterContext = versionedParameterContexts.get(proposedParameterContextName);
2153-
if (versionedParameterContext != null) {
2154-
createMissingParameterProvider(versionedParameterContext, versionedParameterContext.getParameterProvider(), parameterProviderReferences, componentIdGenerator);
2155-
if (currentParamContext == null) {
2156-
// Create a new Parameter Context based on the parameters provided
2157-
final ParameterContext contextByName = getParameterContextByName(versionedParameterContext.getName());
2158-
final ParameterContext selectedParameterContext;
2159-
if (contextByName == null) {
2160-
final String parameterContextId = componentIdGenerator.generateUuid(versionedParameterContext.getName(),
2161-
versionedParameterContext.getName(), versionedParameterContext.getName());
2162-
selectedParameterContext = createParameterContext(versionedParameterContext, parameterContextId, versionedParameterContexts,
2163-
parameterProviderReferences, componentIdGenerator);
2164-
} else {
2165-
selectedParameterContext = contextByName;
2166-
addMissingConfiguration(versionedParameterContext, selectedParameterContext, versionedParameterContexts, parameterProviderReferences, componentIdGenerator);
2167-
}
2156+
return;
2157+
}
21682158

2169-
group.setParameterContext(selectedParameterContext);
2170-
} else {
2171-
// Update the current Parameter Context so that it has any Parameters included in the proposed context
2172-
addMissingConfiguration(versionedParameterContext, currentParamContext, versionedParameterContexts, parameterProviderReferences, componentIdGenerator);
2173-
}
2159+
// No versioned parameter context with a matching name. Nothing to do.
2160+
final VersionedParameterContext versionedParameterContext = versionedParameterContexts.get(proposedParameterContextName);
2161+
if (versionedParameterContext == null) {
2162+
return;
2163+
}
2164+
2165+
createMissingParameterProvider(versionedParameterContext, versionedParameterContext.getParameterProvider(), parameterProviderReferences, componentIdGenerator);
2166+
if (currentParamContext == null) {
2167+
// Create a new Parameter Context based on the parameters provided
2168+
final ParameterContext contextByName = getParameterContextByName(versionedParameterContext.getName());
2169+
final ParameterContext selectedParameterContext;
2170+
if (contextByName == null) {
2171+
final String parameterContextId = componentIdGenerator.generateUuid(versionedParameterContext.getName(),
2172+
versionedParameterContext.getName(), versionedParameterContext.getName());
2173+
selectedParameterContext = createParameterContext(versionedParameterContext, parameterContextId, versionedParameterContexts,
2174+
parameterProviderReferences, componentIdGenerator);
2175+
} else {
2176+
selectedParameterContext = contextByName;
2177+
addMissingConfiguration(versionedParameterContext, selectedParameterContext, versionedParameterContexts, parameterProviderReferences, componentIdGenerator);
21742178
}
2179+
2180+
group.setParameterContext(selectedParameterContext);
2181+
} else {
2182+
// Update the current Parameter Context so that it has any Parameters included in the proposed context
2183+
addMissingConfiguration(versionedParameterContext, currentParamContext, versionedParameterContexts, parameterProviderReferences, componentIdGenerator);
21752184
}
21762185
}
21772186

nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/parameter/StandardParameterContext.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.Map;
4747
import java.util.Objects;
4848
import java.util.Optional;
49+
import java.util.Set;
4950
import java.util.Stack;
5051
import java.util.concurrent.atomic.AtomicLong;
5152
import java.util.concurrent.locks.Lock;
@@ -147,6 +148,7 @@ public void setParameters(final Map<String, Parameter> updatedParameters) {
147148
} finally {
148149
writeLock.unlock();
149150
}
151+
150152
alertReferencingComponents(parameterUpdates);
151153
}
152154

@@ -172,21 +174,25 @@ private Map<ParameterDescriptor, Parameter> getProposedParameters(final Map<Stri
172174
* @param parameterUpdates A map from parameter name to ParameterUpdate (empty if none are applicable)
173175
*/
174176
private void alertReferencingComponents(final Map<String, ParameterUpdate> parameterUpdates) {
175-
if (!parameterUpdates.isEmpty()) {
176-
logger.debug("Parameter Context {} was updated. {} parameters changed ({}). Notifying all affected components.", this, parameterUpdates.size(), parameterUpdates);
177-
178-
for (final ProcessGroup processGroup : parameterReferenceManager.getProcessGroupsBound(this)) {
179-
try {
180-
processGroup.onParameterContextUpdated(parameterUpdates);
181-
} catch (final Exception e) {
182-
logger.error("Failed to notify {} that Parameter Context was updated", processGroup, e);
183-
}
177+
if (parameterUpdates.isEmpty()) {
178+
logger.debug("{} updated. No parameters changed so no existing components are affected.", this);
179+
return;
180+
}
181+
182+
logger.debug("Parameter Context {} was updated. {} parameters changed ({}). Notifying all affected components.", this, parameterUpdates.size(), parameterUpdates);
183+
for (final ProcessGroup processGroup : getBoundProcessGroups()) {
184+
try {
185+
processGroup.onParameterContextUpdated(parameterUpdates);
186+
} catch (final Exception e) {
187+
logger.error("Failed to notify {} that Parameter Context was updated", processGroup, e);
184188
}
185-
} else {
186-
logger.debug("Parameter Context {} was updated. {} parameters changed ({}). No existing components are affected.", this, parameterUpdates.size(), parameterUpdates);
187189
}
188190
}
189191

192+
protected Set<ProcessGroup> getBoundProcessGroups() {
193+
return parameterReferenceManager.getProcessGroupsBound(this);
194+
}
195+
190196
/**
191197
* Returns a map from parameter name to ParameterUpdate for any actual updates to parameters.
192198
* @param currentParameters The current parameters

nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -608,7 +608,9 @@ private void setProperty(final PropertyDescriptor descriptor, final PropertyConf
608608
if (!propertyConfiguration.equals(propertyModComparisonValue)) {
609609
try {
610610
final String oldValue = propertyModComparisonValue == null ? null : propertyModComparisonValue.getEffectiveValue(getParameterContext());
611-
onPropertyModified(descriptor, oldValue, resolvedValue);
611+
if (!Objects.equals(oldValue, resolvedValue)) {
612+
onPropertyModified(descriptor, oldValue, resolvedValue);
613+
}
612614
} catch (final Exception e) {
613615
// nothing really to do here...
614616
logger.error("Failed to notify {} that property {} changed", this, descriptor, e);

nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -388,13 +388,16 @@ ParameterContext createParameterContext(String id, String name, String descripti
388388
List<String> inheritedContextIds, ParameterProviderConfiguration parameterProviderConfiguration);
389389

390390
/**
391-
* Creates a duplicate of the given ParameterContext with the provided id. This does not register the Parameter Context
392-
* with the ParameterContextManager.
391+
* Creates an empty Parameter Context with the given ID for the provided root process group. This Parameter Context is not
392+
* registered with the ParameterContextManager.
393+
*
393394
* @param id the id of the new ParameterContext
394-
* @param source the ParameterContext to duplicate
395+
* @param name the name of the new ParameterContext
396+
* @param description the description of the new ParameterContext
397+
* @param rootGroup the root process group
395398
* @return the duplicated ParameterContext
396399
*/
397-
ParameterContext duplicateParameterContext(String id, ParameterContext source);
400+
ParameterContext createEmptyParameterContext(String id, String name, String description, ProcessGroup rootGroup);
398401

399402
/**
400403
* Performs the given ParameterContext-related action, and then resolves all inherited ParameterContext references.

nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/ConnectorParameterLookup.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,23 @@ private Parameter createParameter(final ParameterValue parameterValue) {
6161
.build();
6262
}
6363

64+
public VersionedParameterContext createVersionedParameterContext(final String name) {
65+
final VersionedParameterContext context = new VersionedParameterContext();
66+
context.setName(name);
67+
68+
final Set<VersionedParameter> versionedParameters = new HashSet<>();
69+
for (final ParameterValue parameterValue : parameterValues) {
70+
final VersionedParameter versionedParameter = new VersionedParameter();
71+
versionedParameter.setName(parameterValue.getName());
72+
versionedParameter.setValue(parameterValue.getValue());
73+
versionedParameter.setSensitive(parameterValue.isSensitive());
74+
versionedParameters.add(versionedParameter);
75+
}
76+
77+
context.setParameters(versionedParameters);
78+
return context;
79+
}
80+
6481
@Override
6582
public Optional<Parameter> getParameter(final String parameterName) {
6683
return Optional.ofNullable(parameters.get(parameterName));

nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -224,22 +224,9 @@ private void waitForState(final ConnectorNode connector, final Set<ConnectorStat
224224
}
225225

226226
private void cleanUpAssets(final ConnectorNode connector) {
227-
final FrameworkFlowContext activeFlowContext = connector.getActiveFlowContext();
228-
final ConnectorConfiguration activeConfiguration = activeFlowContext.getConfigurationContext().toConnectorConfiguration();
229-
230227
final Set<String> referencedAssetIds = new HashSet<>();
231-
for (final NamedStepConfiguration namedStepConfiguration : activeConfiguration.getNamedStepConfigurations()) {
232-
final StepConfiguration stepConfiguration = namedStepConfiguration.configuration();
233-
final Map<String, ConnectorValueReference> stepPropertyValues = stepConfiguration.getPropertyValues();
234-
if (stepPropertyValues == null) {
235-
continue;
236-
}
237-
for (final ConnectorValueReference valueReference : stepPropertyValues.values()) {
238-
if (valueReference instanceof AssetReference assetReference) {
239-
referencedAssetIds.addAll(assetReference.getAssetIdentifiers());
240-
}
241-
}
242-
}
228+
collectReferencedAssetIds(connector.getActiveFlowContext(), referencedAssetIds);
229+
collectReferencedAssetIds(connector.getWorkingFlowContext(), referencedAssetIds);
243230

244231
logger.debug("Found {} assets referenced for Connector [{}]", referencedAssetIds.size(), connector.getIdentifier());
245232

@@ -258,6 +245,26 @@ private void cleanUpAssets(final ConnectorNode connector) {
258245
}
259246
}
260247

248+
private void collectReferencedAssetIds(final FrameworkFlowContext flowContext, final Set<String> referencedAssetIds) {
249+
if (flowContext == null) {
250+
return;
251+
}
252+
253+
final ConnectorConfiguration configuration = flowContext.getConfigurationContext().toConnectorConfiguration();
254+
for (final NamedStepConfiguration namedStepConfiguration : configuration.getNamedStepConfigurations()) {
255+
final StepConfiguration stepConfiguration = namedStepConfiguration.configuration();
256+
final Map<String, ConnectorValueReference> stepPropertyValues = stepConfiguration.getPropertyValues();
257+
if (stepPropertyValues == null) {
258+
continue;
259+
}
260+
for (final ConnectorValueReference valueReference : stepPropertyValues.values()) {
261+
if (valueReference instanceof AssetReference assetReference) {
262+
referencedAssetIds.addAll(assetReference.getAssetIdentifiers());
263+
}
264+
}
265+
}
266+
}
267+
261268
@Override
262269
public void configureConnector(final ConnectorNode connector, final String stepName, final StepConfiguration configuration) throws FlowUpdateException {
263270
connector.setConfiguration(stepName, configuration);

0 commit comments

Comments
 (0)