Skip to content

Commit 6a2851c

Browse files
author
Jerome Revillard
committed
Add JSON and PROTOBUF shcema registry messages types
1 parent 5f6fda4 commit 6a2851c

File tree

11 files changed

+136
-57
lines changed

11 files changed

+136
-57
lines changed

build.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ repositories {
2222
maven {
2323
url "https://packages.confluent.io/maven/"
2424
}
25+
maven {
26+
url "https://jitpack.io"
27+
}
2528
}
2629

2730
dependencies {
@@ -32,6 +35,8 @@ dependencies {
3235
compile 'info.picocli:picocli:4.1.4'
3336

3437
implementation ('io.confluent:kafka-schema-registry-client:6.1.1')
38+
implementation ('io.confluent:kafka-json-schema-provider:6.1.1')
39+
implementation ('io.confluent:kafka-protobuf-serializer:6.1.1')
3540
implementation('com.flipkart.zjsonpatch:zjsonpatch:0.4.11')
3641

3742
compile 'org.slf4j:slf4j-api:1.7.30'

docker/config/registry_jaas.conf

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
KafkaClient {
2+
org.apache.kafka.common.security.plain.PlainLoginModule required
3+
username="test"
4+
password="test-secret";
5+
};

docker/docker-compose.yml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,3 +92,24 @@ services:
9292
depends_on:
9393
- zoo1
9494

95+
schema-registry:
96+
image: confluentinc/cp-schema-registry:6.1.1
97+
hostname: schema-registry
98+
ports:
99+
- "8081:8081"
100+
environment:
101+
SCHEMA_REGISTRY_HOST_NAME: schema-registry
102+
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "kafka1:19092,kafka2:19092,kafka3:19092"
103+
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: SASL_PLAINTEXT
104+
SCHEMA_REGISTRY_KAFKASTORE_SASL_MECHANISM: PLAIN
105+
SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081"
106+
SCHEMA_REGISTRY_GROUP_ID: "schema-registry-test"
107+
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/registry_jaas.conf"
108+
SCHEMA_REGISTRY_OPTS: "-Djava.security.auth.login.config=/etc/kafka/registry_jaas.conf"
109+
volumes:
110+
- ./config/registry_jaas.conf:/etc/kafka/registry_jaas.conf
111+
depends_on:
112+
- kafka1
113+
- kafka2
114+
- kafka3
115+

src/main/java/com/devshawn/kafka/gitops/StateManager.java

Lines changed: 5 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import com.devshawn.kafka.gitops.domain.state.DesiredStateFile;
1414
import com.devshawn.kafka.gitops.domain.state.TopicDetails;
1515
import com.devshawn.kafka.gitops.domain.state.service.KafkaStreamsService;
16+
import com.devshawn.kafka.gitops.enums.SchemaType;
1617
import com.devshawn.kafka.gitops.exception.ConfluentCloudException;
1718
import com.devshawn.kafka.gitops.exception.InvalidAclDefinitionException;
1819
import com.devshawn.kafka.gitops.exception.MissingConfigurationException;
@@ -33,16 +34,11 @@
3334
import com.fasterxml.jackson.databind.DeserializationFeature;
3435
import com.fasterxml.jackson.databind.ObjectMapper;
3536
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
36-
import io.confluent.kafka.schemaregistry.ParsedSchema;
37-
import io.confluent.kafka.schemaregistry.SchemaProvider;
38-
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
39-
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
4037
import org.slf4j.LoggerFactory;
4138

4239
import java.nio.file.Files;
4340
import java.nio.file.Paths;
4441
import java.util.ArrayList;
45-
import java.util.Collections;
4642
import java.util.List;
4743
import java.util.Map;
4844
import java.util.NoSuchElementException;
@@ -52,8 +48,6 @@
5248

5349
public class StateManager {
5450

55-
private static org.slf4j.Logger log = LoggerFactory.getLogger(StateManager.class);
56-
5751
private final ManagerConfig managerConfig;
5852
private final ObjectMapper objectMapper;
5953
private final ParserService parserService;
@@ -345,39 +339,15 @@ private void validateSchemas(DesiredStateFile desiredStateFile) {
345339
if (!desiredStateFile.getSchemas().isEmpty()) {
346340
SchemaRegistryConfig schemaRegistryConfig = SchemaRegistryConfigLoader.load();
347341
desiredStateFile.getSchemas().forEach((s, schemaDetails) -> {
348-
if (!schemaDetails.getType().equalsIgnoreCase("Avro")) {
342+
if (!schemaDetails.getType().equalsIgnoreCase(SchemaType.AVRO.toString()) ||
343+
!schemaDetails.getType().equalsIgnoreCase(SchemaType.JSON.toString()) ||
344+
!schemaDetails.getType().equalsIgnoreCase(SchemaType.PROTOBUF.toString())) {
349345
throw new ValidationException(String.format("Schema type %s is currently not supported.", schemaDetails.getType()));
350346
}
351347
if (!Files.exists(Paths.get(schemaRegistryConfig.getConfig().get("SCHEMA_DIRECTORY") + "/" + schemaDetails.getFile()))) {
352348
throw new ValidationException(String.format("Schema file %s not found in schema directory at %s", schemaDetails.getFile(), schemaRegistryConfig.getConfig().get("SCHEMA_DIRECTORY")));
353349
}
354-
if (schemaDetails.getType().equalsIgnoreCase("Avro")) {
355-
AvroSchemaProvider avroSchemaProvider = new AvroSchemaProvider();
356-
if (schemaDetails.getReferences().isEmpty() && schemaDetails.getType().equalsIgnoreCase("Avro")) {
357-
Optional<ParsedSchema> parsedSchema = avroSchemaProvider.parseSchema(schemaRegistryService.loadSchemaFromDisk(schemaDetails.getFile()), Collections.emptyList());
358-
if (!parsedSchema.isPresent()) {
359-
throw new ValidationException(String.format("Avro schema %s could not be parsed.", schemaDetails.getFile()));
360-
}
361-
} else {
362-
List<SchemaReference> schemaReferences = new ArrayList<>();
363-
schemaDetails.getReferences().forEach(referenceDetails -> {
364-
SchemaReference schemaReference = new SchemaReference(referenceDetails.getName(), referenceDetails.getSubject(), referenceDetails.getVersion());
365-
schemaReferences.add(schemaReference);
366-
});
367-
// we need to pass a schema registry client as a config because the underlying code validates against the current state
368-
avroSchemaProvider.configure(Collections.singletonMap(SchemaProvider.SCHEMA_VERSION_FETCHER_CONFIG, schemaRegistryService.createSchemaRegistryClient()));
369-
try {
370-
Optional<ParsedSchema> parsedSchema = avroSchemaProvider.parseSchema(schemaRegistryService.loadSchemaFromDisk(schemaDetails.getFile()), schemaReferences);
371-
if (!parsedSchema.isPresent()) {
372-
throw new ValidationException(String.format("Avro schema %s could not be parsed.", schemaDetails.getFile()));
373-
}
374-
} catch (IllegalStateException ex) {
375-
throw new ValidationException(String.format("Reference validation error: %s", ex.getMessage()));
376-
} catch (RuntimeException ex) {
377-
throw new ValidationException(String.format("Error thrown when attempting to validate schema with reference", ex.getMessage()));
378-
}
379-
}
380-
}
350+
schemaRegistryService.validateSchema(schemaDetails);
381351
});
382352
}
383353
}

src/main/java/com/devshawn/kafka/gitops/config/SchemaRegistryConfigLoader.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ public class SchemaRegistryConfigLoader {
1212

1313
private static org.slf4j.Logger log = LoggerFactory.getLogger(SchemaRegistryConfigLoader.class);
1414

15+
private SchemaRegistryConfigLoader() {}
16+
1517
public static SchemaRegistryConfig load() {
1618
SchemaRegistryConfig.Builder builder = new SchemaRegistryConfig.Builder();
1719
setConfig(builder);
@@ -68,7 +70,7 @@ private static void handleAuthentication(AtomicReference<String> username, Atomi
6870
throw new MissingConfigurationException("SCHEMA_REGISTRY_SASL_JAAS_USERNAME");
6971
} else if (password.get() != null) {
7072
throw new MissingConfigurationException("SCHEMA_REGISTRY_SASL_JAAS_PASSWORD");
71-
} else if (username.get() == null & password.get() == null) {
73+
} else if (username.get() == null && password.get() == null) {
7274
throw new MissingMultipleConfigurationException("SCHEMA_REGISTRY_SASL_JAAS_PASSWORD", "SCHEMA_REGISTRY_SASL_JAAS_USERNAME");
7375
}
7476
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package com.devshawn.kafka.gitops.enums;
2+
3+
public enum SchemaType {
4+
AVRO, JSON, PROTOBUF;
5+
}

src/main/java/com/devshawn/kafka/gitops/manager/ApplyManager.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,11 +99,8 @@ public void applySchemas(DesiredPlan desiredPlan) {
9999
LogUtil.printSchemaPreApply(schemaPlan);
100100
schemaRegistryService.register(schemaPlan);
101101
LogUtil.printPostApply();
102-
} else if (schemaPlan.getAction() == PlanAction.UPDATE) {
103-
LogUtil.printSchemaPreApply(schemaPlan);
104-
schemaRegistryService.register(schemaPlan);
105-
LogUtil.printPostApply();
106-
} else if (schemaPlan.getAction() == PlanAction.REMOVE && !managerConfig.isDeleteDisabled()) {
102+
} else if (schemaPlan.getAction() == PlanAction.UPDATE ||
103+
(schemaPlan.getAction() == PlanAction.REMOVE && !managerConfig.isDeleteDisabled())) {
107104
LogUtil.printSchemaPreApply(schemaPlan);
108105
schemaRegistryService.deleteSubject(schemaPlan.getName(), true);
109106
LogUtil.printPostApply();

src/main/java/com/devshawn/kafka/gitops/manager/PlanManager.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -239,15 +239,15 @@ public void planSchemas(DesiredState desiredState, DesiredPlan.Builder desiredPl
239239
.setSchemaDetails(schemaDetails);
240240

241241
if (!currentSubjectSchemasMap.containsKey(subject)) {
242-
log.info("[PLAN] Schema Subject {} does not exist; it will be created.", subject);
242+
log.info("[PLAN] Schema Subject '{}' does not exist; it will be created.", subject);
243243
schemaPlan.setAction(PlanAction.ADD);
244244
} else {
245245
String diff = schemaRegistryService.compareSchemasAndReturnDiff(schemaRegistryService.loadSchemaFromDisk(schemaDetails.getFile()), currentSubjectSchemasMap.get(subject).getSchema());
246246
if (diff == null) {
247-
log.info("[PLAN] Schema Subject {} exists and has not changed; it will not be created.", subject);
247+
log.info("[PLAN] Schema Subject '{}' exists and has not changed; it will not be created.", subject);
248248
schemaPlan.setAction(PlanAction.NO_CHANGE);
249249
} else {
250-
log.info("[PLAN] Schema Subject {} exists and has changed; it will be updated.", subject);
250+
log.info("[PLAN] Schema Subject '{}' exists and has changed; it will be updated.", subject);
251251
schemaPlan.setAction(PlanAction.UPDATE);
252252
// TODO: Set diff string for logging?
253253
}

src/main/java/com/devshawn/kafka/gitops/service/SchemaRegistryService.java

Lines changed: 78 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,38 @@
11
package com.devshawn.kafka.gitops.service;
22

3-
3+
import java.io.IOException;
4+
import java.nio.charset.StandardCharsets;
5+
import java.nio.file.Files;
6+
import java.nio.file.Paths;
7+
import java.util.ArrayList;
8+
import java.util.Collections;
9+
import java.util.HashMap;
10+
import java.util.List;
11+
import java.util.Map;
12+
import java.util.Optional;
13+
import org.apache.kafka.common.config.SaslConfigs;
414
import com.devshawn.kafka.gitops.config.SchemaRegistryConfig;
515
import com.devshawn.kafka.gitops.domain.plan.SchemaPlan;
16+
import com.devshawn.kafka.gitops.domain.state.SchemaDetails;
17+
import com.devshawn.kafka.gitops.enums.SchemaType;
618
import com.devshawn.kafka.gitops.exception.SchemaRegistryExecutionException;
19+
import com.devshawn.kafka.gitops.exception.ValidationException;
720
import com.fasterxml.jackson.core.JsonProcessingException;
821
import com.fasterxml.jackson.databind.JsonNode;
922
import com.fasterxml.jackson.databind.ObjectMapper;
1023
import com.flipkart.zjsonpatch.JsonDiff;
24+
import io.confluent.kafka.schemaregistry.AbstractSchemaProvider;
1125
import io.confluent.kafka.schemaregistry.ParsedSchema;
26+
import io.confluent.kafka.schemaregistry.SchemaProvider;
1227
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
1328
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
1429
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
1530
import io.confluent.kafka.schemaregistry.client.rest.RestService;
31+
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
1632
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
1733
import io.confluent.kafka.schemaregistry.client.security.basicauth.SaslBasicAuthCredentialProvider;
18-
import org.apache.kafka.common.config.SaslConfigs;
19-
20-
import java.io.IOException;
21-
import java.nio.charset.StandardCharsets;
22-
import java.nio.file.Files;
23-
import java.nio.file.Paths;
24-
import java.util.*;
34+
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
35+
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
2536

2637
public class SchemaRegistryService {
2738

@@ -55,15 +66,71 @@ public void deleteSubject(String subject, boolean isPermanent) {
5566

5667
public int register(SchemaPlan schemaPlan) {
5768
final CachedSchemaRegistryClient cachedSchemaRegistryClient = createSchemaRegistryClient();
58-
AvroSchemaProvider avroSchemaProvider = new AvroSchemaProvider();
59-
Optional<ParsedSchema> parsedSchema = avroSchemaProvider.parseSchema(loadSchemaFromDisk(schemaPlan.getSchemaDetails().get().getFile()), Collections.emptyList());
69+
ParsedSchema parsedSchema;
70+
if(SchemaType.AVRO.toString().equalsIgnoreCase(schemaPlan.getSchemaDetails().get().getType())) {
71+
AvroSchemaProvider avroSchemaProvider = new AvroSchemaProvider();
72+
parsedSchema = avroSchemaProvider.parseSchema(
73+
loadSchemaFromDisk(schemaPlan.getSchemaDetails().get().getFile()), Collections.emptyList()).get();
74+
} else if (SchemaType.JSON.toString().equalsIgnoreCase(schemaPlan.getSchemaDetails().get().getType())) {
75+
JsonSchemaProvider jsonSchemaProvider = new JsonSchemaProvider();
76+
parsedSchema = jsonSchemaProvider.parseSchema(
77+
loadSchemaFromDisk(schemaPlan.getSchemaDetails().get().getFile()), Collections.emptyList()).get();
78+
} else if (SchemaType.PROTOBUF.toString().equalsIgnoreCase(schemaPlan.getSchemaDetails().get().getType())) {
79+
ProtobufSchemaProvider protobufSchemaProvider = new ProtobufSchemaProvider();
80+
parsedSchema = protobufSchemaProvider.parseSchema(loadSchemaFromDisk(
81+
schemaPlan.getSchemaDetails().get().getFile()), Collections.emptyList()).get();
82+
} else {
83+
throw new ValidationException("Unknown schema type: " + schemaPlan.getSchemaDetails().get().getType());
84+
}
6085
try {
61-
return cachedSchemaRegistryClient.register(schemaPlan.getName(), parsedSchema.get());
86+
return cachedSchemaRegistryClient.register(schemaPlan.getName(), parsedSchema);
6287
} catch (IOException | RestClientException ex) {
6388
throw new SchemaRegistryExecutionException("Error thrown when attempting to register subject with schema registry", ex.getMessage());
6489
}
6590
}
6691

92+
public void validateSchema(SchemaDetails schemaDetails) {
93+
if (schemaDetails.getType().equalsIgnoreCase(SchemaType.AVRO.toString())) {
94+
AvroSchemaProvider avroSchemaProvider = new AvroSchemaProvider();
95+
validateSchema(schemaDetails, avroSchemaProvider);
96+
} else if (schemaDetails.getType().equalsIgnoreCase(SchemaType.JSON.toString())) {
97+
JsonSchemaProvider jsonSchemaProvider = new JsonSchemaProvider();
98+
validateSchema(schemaDetails, jsonSchemaProvider);
99+
} else if (schemaDetails.getType().equalsIgnoreCase(SchemaType.PROTOBUF.toString())) {
100+
ProtobufSchemaProvider protobufSchemaProvider = new ProtobufSchemaProvider();
101+
validateSchema(schemaDetails, protobufSchemaProvider);
102+
} else {
103+
throw new ValidationException("Unknown schema type: " + schemaDetails.getType());
104+
}
105+
}
106+
107+
public void validateSchema(SchemaDetails schemaDetails, AbstractSchemaProvider schemaProvider) {
108+
if (schemaDetails.getReferences().isEmpty()) {
109+
Optional<ParsedSchema> parsedSchema = schemaProvider.parseSchema(loadSchemaFromDisk(schemaDetails.getFile()), Collections.emptyList());
110+
if (!parsedSchema.isPresent()) {
111+
throw new ValidationException(String.format("%s schema %s could not be parsed.", schemaProvider.schemaType(), schemaDetails.getFile()));
112+
}
113+
} else {
114+
List<SchemaReference> schemaReferences = new ArrayList<>();
115+
schemaDetails.getReferences().forEach(referenceDetails -> {
116+
SchemaReference schemaReference = new SchemaReference(referenceDetails.getName(), referenceDetails.getSubject(), referenceDetails.getVersion());
117+
schemaReferences.add(schemaReference);
118+
});
119+
// we need to pass a schema registry client as a config because the underlying code validates against the current state
120+
schemaProvider.configure(Collections.singletonMap(SchemaProvider.SCHEMA_VERSION_FETCHER_CONFIG, createSchemaRegistryClient()));
121+
try {
122+
Optional<ParsedSchema> parsedSchema = schemaProvider.parseSchema(loadSchemaFromDisk(schemaDetails.getFile()), schemaReferences);
123+
if (!parsedSchema.isPresent()) {
124+
throw new ValidationException(String.format("%s schema %s could not be parsed.", schemaProvider.schemaType(), schemaDetails.getFile()));
125+
}
126+
} catch (IllegalStateException ex) {
127+
throw new ValidationException(String.format("Reference validation error: %s", ex.getMessage()));
128+
} catch (RuntimeException ex) {
129+
throw new ValidationException(String.format("Error thrown when attempting to validate %s schema with reference: %s", schemaProvider.schemaType(), ex.getMessage()));
130+
}
131+
}
132+
}
133+
67134
public SchemaMetadata getLatestSchemaMetadata(String subject) {
68135
final CachedSchemaRegistryClient cachedSchemaRegistryClient = createSchemaRegistryClient();
69136
try {

src/main/java/com/devshawn/kafka/gitops/util/LogUtil.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ private static void printTopicPlan(TopicPlan topicPlan) {
6464
System.out.println(red(String.format("- [TOPIC] %s", topicPlan.getName())));
6565
System.out.println("\n");
6666
break;
67+
case NO_CHANGE:
68+
break;
6769
}
6870
}
6971

@@ -117,7 +119,7 @@ private static void printTopicConfigPlan(TopicConfigPlan topicConfigPlan) {
117119
System.out.println(red(String.format("\t\t- %s (%s)", topicConfigPlan.getKey(), topicConfigPlan.getPreviousValue().get())));
118120
break;
119121
case NO_CHANGE:
120-
break;
122+
break;
121123
}
122124
}
123125

@@ -147,6 +149,9 @@ private static void printAclPlan(AclPlan aclPlan) {
147149
System.out.println(red(String.format("\t - permission: %s", aclDetails.getPermission())));
148150
System.out.println("\n");
149151
break;
152+
case UPDATE:
153+
case NO_CHANGE:
154+
break;
150155
}
151156
}
152157

@@ -174,6 +179,8 @@ private static void printSchemaPlan(SchemaPlan schemaPlan) {
174179
System.out.println(red(String.format("- [SCHEMA] %s", schemaPlan.getName())));
175180
System.out.println("\n");
176181
break;
182+
case NO_CHANGE:
183+
break;
177184
}
178185
}
179186

0 commit comments

Comments
 (0)