Skip to content

Commit ca768ef

Browse files
authored
NIFI-15448 Add option for using Predefined Schemas in GenerateRecord (#10752)
Signed-off-by: David Handermann <exceptionfactory@apache.org>
1 parent d9f9147 commit ca768ef

File tree

3 files changed

+1005
-30
lines changed

3 files changed

+1005
-30
lines changed

nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateRecord.java

Lines changed: 120 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.apache.nifi.avro.AvroTypeUtil;
3232
import org.apache.nifi.components.AllowableValue;
3333
import org.apache.nifi.components.PropertyDescriptor;
34+
import org.apache.nifi.components.ValidationContext;
35+
import org.apache.nifi.components.ValidationResult;
3436
import org.apache.nifi.expression.ExpressionLanguageScope;
3537
import org.apache.nifi.flowfile.FlowFile;
3638
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -42,6 +44,7 @@
4244
import org.apache.nifi.processor.exception.ProcessException;
4345
import org.apache.nifi.processor.util.StandardValidators;
4446
import org.apache.nifi.processors.standard.faker.FakerUtils;
47+
import org.apache.nifi.processors.standard.faker.PredefinedRecordSchema;
4548
import org.apache.nifi.schema.access.SchemaNotFoundException;
4649
import org.apache.nifi.serialization.RecordSetWriter;
4750
import org.apache.nifi.serialization.RecordSetWriterFactory;
@@ -65,6 +68,7 @@
6568
import java.time.ZoneId;
6669
import java.time.format.DateTimeFormatter;
6770
import java.util.ArrayList;
71+
import java.util.Collection;
6872
import java.util.Date;
6973
import java.util.HashMap;
7074
import java.util.List;
@@ -84,9 +88,12 @@
8488
@WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"),
8589
@WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile"),
8690
})
87-
@CapabilityDescription("This processor creates FlowFiles with records having random value for the specified fields. GenerateRecord is useful " +
88-
"for testing, configuration, and simulation. It uses either user-defined properties to define a record schema or a provided schema and generates the specified number of records using " +
89-
"random data for the fields in the schema.")
91+
@CapabilityDescription("""
92+
This processor creates FlowFiles with records having random value for the specified fields. GenerateRecord is useful
93+
for testing, configuration, and simulation. It uses one of three methods to define a record schema: (1) a provided Avro Schema Text,
94+
(2) a Predefined Schema template such as Person, Order, Event, Sensor, Product, Stock Trade, or Complete Example covering all data types,
95+
or (3) user-defined dynamic properties. The processor generates the specified number of records using random data for the fields in the schema.
96+
""")
9097
@DynamicProperties({
9198
@DynamicProperty(
9299
name = "Field name in generated record",
@@ -122,16 +129,21 @@ public class GenerateRecord extends AbstractProcessor {
122129

123130
static final PropertyDescriptor NULLABLE_FIELDS = new PropertyDescriptor.Builder()
124131
.name("Nullable Fields")
125-
.description("Whether the generated fields will be nullable. Note that this property is ignored if Schema Text is set. Also it only affects the schema of the generated data, " +
126-
"not whether any values will be null. If this property is true, see 'Null Value Percentage' to set the probability that any generated field will be null.")
132+
.description("""
133+
Whether the generated fields will be nullable. Note that this property is ignored if Schema Text is set.
134+
Also it only affects the schema of the generated data, not whether any values will be null.
135+
If this property is true, see 'Null Value Percentage' to set the probability that any generated field will be null.
136+
""")
127137
.allowableValues("true", "false")
128138
.defaultValue("true")
129139
.required(true)
130140
.build();
131141
static final PropertyDescriptor NULL_PERCENTAGE = new PropertyDescriptor.Builder()
132142
.name("Null Value Percentage")
133-
.description("The percent probability (0-100%) that a generated value for any nullable field will be null. Set this property to zero to have no null values, or 100 to have all " +
134-
"null values.")
143+
.description("""
144+
The percent probability (0-100%) that a generated value for any nullable field will be null.
145+
Set this property to zero to have no null values, or 100 to have all null values.
146+
""")
135147
.addValidator(StandardValidators.createLongValidator(0L, 100L, true))
136148
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
137149
.required(true)
@@ -141,18 +153,34 @@ public class GenerateRecord extends AbstractProcessor {
141153

142154
static final PropertyDescriptor SCHEMA_TEXT = new PropertyDescriptor.Builder()
143155
.name("Schema Text")
144-
.description("The text of an Avro-formatted Schema used to generate record data. If this property is set, any user-defined properties are ignored.")
156+
.description("""
157+
The text of an Avro-formatted Schema used to generate record data.
158+
Only one of Schema Text, Predefined Schema, or user-defined dynamic properties should be configured.
159+
""")
145160
.addValidator(new AvroSchemaValidator())
146161
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
147162
.required(false)
148163
.build();
149164

165+
static final PropertyDescriptor PREDEFINED_SCHEMA = new PropertyDescriptor.Builder()
166+
.name("Predefined Schema")
167+
.description("""
168+
Select a predefined schema template for quick record generation. Predefined schemas provide ready-to-use
169+
templates with multiple fields covering various data types including nested records, arrays, maps, dates, timestamps, etc.
170+
Only one of Schema Text, Predefined Schema, or user-defined dynamic properties should be configured.
171+
Note: This feature is intended for quick testing purposes only. Predefined schemas may change between NiFi versions.
172+
""")
173+
.allowableValues(PredefinedRecordSchema.class)
174+
.required(false)
175+
.build();
176+
150177
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(
151178
RECORD_WRITER,
152179
NUM_RECORDS,
153180
NULLABLE_FIELDS,
154181
NULL_PERCENTAGE,
155-
SCHEMA_TEXT
182+
SCHEMA_TEXT,
183+
PREDEFINED_SCHEMA
156184
);
157185

158186
static final Relationship REL_SUCCESS = new Relationship.Builder()
@@ -188,6 +216,46 @@ public Set<Relationship> getRelationships() {
188216
return RELATIONSHIPS;
189217
}
190218

219+
@Override
220+
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
221+
final List<ValidationResult> results = new ArrayList<>();
222+
223+
final boolean hasSchemaText = validationContext.getProperty(SCHEMA_TEXT).isSet();
224+
final boolean hasPredefinedSchema = validationContext.getProperty(PREDEFINED_SCHEMA).isSet();
225+
final boolean hasDynamicProperties = validationContext.getProperties().keySet().stream()
226+
.anyMatch(PropertyDescriptor::isDynamic);
227+
228+
int configuredCount = 0;
229+
if (hasSchemaText) {
230+
configuredCount++;
231+
}
232+
if (hasPredefinedSchema) {
233+
configuredCount++;
234+
}
235+
if (hasDynamicProperties) {
236+
configuredCount++;
237+
}
238+
239+
if (configuredCount == 0) {
240+
results.add(new ValidationResult.Builder()
241+
.subject("Schema Configuration")
242+
.valid(false)
243+
.explanation("At least one schema configuration must be provided: Schema Text, Predefined Schema, or user-defined dynamic properties")
244+
.build());
245+
} else if (configuredCount > 1) {
246+
results.add(new ValidationResult.Builder()
247+
.subject("Schema Configuration")
248+
.valid(false)
249+
.explanation("Only one schema configuration should be provided. Found multiple configurations: "
250+
+ (hasSchemaText ? "Schema Text, " : "")
251+
+ (hasPredefinedSchema ? "Predefined Schema, " : "")
252+
+ (hasDynamicProperties ? "Dynamic Properties" : ""))
253+
.build());
254+
}
255+
256+
return results;
257+
}
258+
191259
@OnScheduled
192260
public void onScheduled(final ProcessContext context) {
193261
// Force the en-US Locale for more predictable results
@@ -198,6 +266,8 @@ public void onScheduled(final ProcessContext context) {
198266
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
199267

200268
final String schemaText = context.getProperty(SCHEMA_TEXT).evaluateAttributeExpressions().getValue();
269+
final String predefinedSchemaName = context.getProperty(PREDEFINED_SCHEMA).getValue();
270+
final PredefinedRecordSchema predefinedSchema = PredefinedRecordSchema.fromName(predefinedSchemaName);
201271
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
202272
final int numRecords = context.getProperty(NUM_RECORDS).evaluateAttributeExpressions().asInteger();
203273
final boolean nullable = context.getProperty(NULLABLE_FIELDS).asBoolean();
@@ -210,46 +280,57 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
210280
try {
211281
flowFile = session.write(flowFile, out -> {
212282
final RecordSchema recordSchema;
213-
final boolean usingSchema;
283+
final SchemaSource schemaSource;
214284
if (StringUtils.isNotEmpty(schemaText)) {
285+
// Schema Text takes highest precedence
215286
final Schema avroSchema = new Schema.Parser().parse(schemaText);
216287
recordSchema = AvroTypeUtil.createSchema(avroSchema);
217-
usingSchema = true;
288+
schemaSource = SchemaSource.SCHEMA_TEXT;
289+
} else if (predefinedSchema != null) {
290+
// Predefined schema takes second precedence
291+
recordSchema = predefinedSchema.getSchema(nullable);
292+
schemaSource = SchemaSource.PREDEFINED;
218293
} else {
219294
// Generate RecordSchema from user-defined properties
220295
final Map<String, String> fields = getFields(context);
221296
recordSchema = generateRecordSchema(fields, nullable);
222-
usingSchema = false;
297+
schemaSource = SchemaSource.DYNAMIC_PROPERTIES;
223298
}
224299
try {
225300
final RecordSchema writeSchema = writerFactory.getSchema(attributes, recordSchema);
226301
try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out, attributes)) {
227302
writer.beginRecordSet();
228303

229304
Record record;
230-
List<RecordField> writeFieldNames = writeSchema.getFields();
231-
Map<String, Object> recordEntries = new HashMap<>();
232305
for (int i = 0; i < numRecords; i++) {
233-
for (RecordField writeRecordField : writeFieldNames) {
234-
final String writeFieldName = writeRecordField.getFieldName();
235-
final Object writeFieldValue;
236-
if (usingSchema) {
237-
writeFieldValue = generateValueFromRecordField(writeRecordField, faker, nullPercentage);
238-
} else {
239-
final boolean nullValue =
240-
nullPercentage > 0 && faker.number().numberBetween(0, 100) <= nullPercentage;
241-
242-
if (nullValue) {
243-
writeFieldValue = null;
306+
if (schemaSource == SchemaSource.PREDEFINED) {
307+
// Use the predefined schema's optimized value generation
308+
final Map<String, Object> recordEntries = predefinedSchema.generateValues(faker, recordSchema, nullPercentage);
309+
record = new MapRecord(recordSchema, recordEntries);
310+
} else {
311+
// Use original logic for Schema Text or dynamic properties
312+
List<RecordField> writeFieldNames = writeSchema.getFields();
313+
Map<String, Object> recordEntries = new HashMap<>();
314+
for (RecordField writeRecordField : writeFieldNames) {
315+
final String writeFieldName = writeRecordField.getFieldName();
316+
final Object writeFieldValue;
317+
if (schemaSource == SchemaSource.SCHEMA_TEXT) {
318+
writeFieldValue = generateValueFromRecordField(writeRecordField, faker, nullPercentage);
244319
} else {
245-
final String propertyValue = context.getProperty(writeFieldName).getValue();
246-
writeFieldValue = FakerUtils.getFakeData(propertyValue, faker);
320+
final boolean nullValue =
321+
nullPercentage > 0 && faker.number().numberBetween(0, 100) <= nullPercentage;
322+
323+
if (nullValue) {
324+
writeFieldValue = null;
325+
} else {
326+
final String propertyValue = context.getProperty(writeFieldName).getValue();
327+
writeFieldValue = FakerUtils.getFakeData(propertyValue, faker);
328+
}
247329
}
330+
recordEntries.put(writeFieldName, writeFieldValue);
248331
}
249-
250-
recordEntries.put(writeFieldName, writeFieldValue);
332+
record = new MapRecord(recordSchema, recordEntries);
251333
}
252-
record = new MapRecord(recordSchema, recordEntries);
253334
writer.write(record);
254335
}
255336

@@ -403,4 +484,13 @@ protected RecordSchema generateRecordSchema(final Map<String, String> fields, fi
403484
}
404485
return new SimpleRecordSchema(recordFields);
405486
}
487+
488+
/**
489+
* Enum to track which source is being used for the record schema.
490+
*/
491+
private enum SchemaSource {
492+
SCHEMA_TEXT,
493+
PREDEFINED,
494+
DYNAMIC_PROPERTIES
495+
}
406496
}

0 commit comments

Comments
 (0)