3232import org .apache .kafka .connect .data .Schema ;
3333import org .apache .kafka .connect .data .SchemaBuilder ;
3434import org .apache .kafka .connect .data .Struct ;
35+ import org .apache .kafka .connect .data .Values ;
3536import org .apache .kafka .connect .errors .DataException ;
3637import org .apache .kafka .connect .transforms .Transformation ;
3738
@@ -98,8 +99,14 @@ public void configure(final Map<String, ?> settings) {
9899 }
99100
100101
101- private void validateUnextractableKeyFields (final R record ) {
102- // If the key fields have names, but the key value doesn't support names
102+ /**
103+ * Validation check to ensure that if any named fields exist, they can be extracted from the structured data in the
104+ * key.
105+ *
106+ * @param record The incoming record to be transformed.
107+ * @throws DataException if the key does not support extracting named fields.
108+ */
109+ private void validateNoUnextractableKeyFields (final R record ) {
103110 if (keyFieldsHasNamedFields && record .keySchema () != null && record .keySchema ().type () != Schema .Type .STRUCT ) {
104111 throw new DataException (
105112 String .format ("Named key fields %s cannot be copied from the key schema: %s" , valueFields .values (),
@@ -114,14 +121,24 @@ private void validateUnextractableKeyFields(final R record) {
114121 }
115122 }
116123
117- private void validateKeyFieldSchemaRequired (final R record ) {
124+ /**
125+ * Validation check to fail quickly when the entire key is copied into a structured value with a schema, but has a
126+ * schemaless class.
127+ *
128+ * @param record The incoming record to be transformed.
129+ * @throws DataException if the value class requires the key to have a schema but the key is a schemaless class.
130+ */
131+ private void validateKeySchemaRequirementsMet (final R record ) {
118132 if (keyFieldsHasWildcard && record .keySchema () == null && record .key () instanceof Map ) {
119133 if (record .valueSchema () != null && record .valueSchema ().type () == Schema .Type .STRUCT ) {
120134 throw new DataException ("The value requires a schema, but the key class is a schemaless Map" );
121135 }
122136 }
123137 }
124138
139+ /**
140+ * Validation check to ensure that the value can receive columns from the key, i.e. as either a Struct or Map.
141+ */
125142 private void validateStructuredValue (final R record ) {
126143 if (record .valueSchema () == null && !(record .value () instanceof Map )
127144 || record .valueSchema () != null && record .valueSchema ().type () != Schema .Type .STRUCT ) {
@@ -131,15 +148,20 @@ private void validateStructuredValue(final R record) {
131148
132149 @ Override
133150 public R apply (final R record ) {
134- validateUnextractableKeyFields (record );
135- validateKeyFieldSchemaRequired (record );
151+ validateNoUnextractableKeyFields (record );
152+ validateKeySchemaRequirementsMet (record );
136153 validateStructuredValue (record );
137154
138155 if (record .value () instanceof Struct ) {
139156 if (record .keySchema () != null ) {
140157 return applyToStruct (record , record .keySchema ());
141158 } else {
142- return applyToStruct (record , inferSchemaFromPrimitive (record .key ()));
159+ final Schema inferredSchema = Values .inferSchema (record .key ());
160+ if (inferredSchema == null ) {
161+ throw new DataException (
162+ "Cannot infer schema for unsupported key class: " + record .key ().getClass ().getName ());
163+ }
164+ return applyToStruct (record , inferredSchema );
143165 }
144166 } else {
145167 return applyToMap (record );
@@ -200,34 +222,6 @@ private R applyToStruct(final R record, final Schema keySchema) {
200222 newValue .schema (), newValue , record .timestamp ());
201223 }
202224
203- /**
204- * Infers the schema from a primitive key.
205- *
206- * @param key The key value.
207- * @return The inferred schema.
208- */
209- private Schema inferSchemaFromPrimitive (final Object key ) {
210- if (key instanceof String ) {
211- return Schema .OPTIONAL_STRING_SCHEMA ;
212- } else if (key instanceof Boolean ) {
213- return Schema .OPTIONAL_BOOLEAN_SCHEMA ;
214- } else if (key instanceof Byte ) {
215- return Schema .OPTIONAL_INT8_SCHEMA ;
216- } else if (key instanceof Short ) {
217- return Schema .OPTIONAL_INT16_SCHEMA ;
218- } else if (key instanceof Integer ) {
219- return Schema .OPTIONAL_INT32_SCHEMA ;
220- } else if (key instanceof Long ) {
221- return Schema .OPTIONAL_INT64_SCHEMA ;
222- } else if (key instanceof Float ) {
223- return Schema .OPTIONAL_FLOAT32_SCHEMA ;
224- } else if (key instanceof Double ) {
225- return Schema .OPTIONAL_FLOAT64_SCHEMA ;
226- } else {
227- throw new DataException ("Cannot infer schema for unsupported key class: " + key .getClass ().getName ());
228- }
229- }
230-
231225 /**
232226 * Merges the key and value schemas into a new schema, according to the configuration of keyFields to copy into the
233227 * new value schema, and how they are renamed via valueFields.
0 commit comments