Skip to content

Commit 6ca6a0a

Browse files
tball-devJerome Revillard
authored andcommitted
feat: added ability to manage schema registry
1 parent fe5a48f commit 6ca6a0a

27 files changed

+636
-21
lines changed

build.gradle

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ sourceCompatibility = 1.8
1919

2020
repositories {
2121
mavenCentral()
22+
maven {
23+
url "https://packages.confluent.io/maven/"
24+
}
2225
}
2326

2427
dependencies {
@@ -28,6 +31,9 @@ dependencies {
2831
compile "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.10.2"
2932
compile 'info.picocli:picocli:4.1.4'
3033

34+
implementation ('io.confluent:kafka-schema-registry-client:6.1.1')
35+
implementation('com.flipkart.zjsonpatch:zjsonpatch:0.4.11')
36+
3137
compile 'org.slf4j:slf4j-api:1.7.30'
3238
compile group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3'
3339
compile group: 'ch.qos.logback', name: 'logback-core', version: '1.2.3'

src/main/java/com/devshawn/kafka/gitops/StateManager.java

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,25 @@
2424
import com.devshawn.kafka.gitops.service.KafkaService;
2525
import com.devshawn.kafka.gitops.service.ParserService;
2626
import com.devshawn.kafka.gitops.service.RoleService;
27+
import com.devshawn.kafka.gitops.config.SchemaRegistryConfig;
28+
import com.devshawn.kafka.gitops.config.SchemaRegistryConfigLoader;
29+
import com.devshawn.kafka.gitops.service.SchemaRegistryService;
2730
import com.devshawn.kafka.gitops.util.LogUtil;
2831
import com.devshawn.kafka.gitops.util.StateUtil;
2932
import com.fasterxml.jackson.core.JsonParser;
3033
import com.fasterxml.jackson.databind.DeserializationFeature;
3134
import com.fasterxml.jackson.databind.ObjectMapper;
3235
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
36+
import io.confluent.kafka.schemaregistry.ParsedSchema;
37+
import io.confluent.kafka.schemaregistry.SchemaProvider;
38+
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
39+
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
3340
import org.slf4j.LoggerFactory;
3441

42+
import java.nio.file.Files;
43+
import java.nio.file.Paths;
3544
import java.util.ArrayList;
45+
import java.util.Collections;
3646
import java.util.List;
3747
import java.util.Map;
3848
import java.util.NoSuchElementException;
@@ -48,6 +58,7 @@ public class StateManager {
4858
private final ObjectMapper objectMapper;
4959
private final ParserService parserService;
5060
private final KafkaService kafkaService;
61+
private final SchemaRegistryService schemaRegistryService;
5162
private final RoleService roleService;
5263
private final ConfluentCloudService confluentCloudService;
5364

@@ -61,17 +72,19 @@ public StateManager(ManagerConfig managerConfig, ParserService parserService) {
6172
this.managerConfig = managerConfig;
6273
this.objectMapper = initializeObjectMapper();
6374
this.kafkaService = new KafkaService(KafkaGitopsConfigLoader.load());
75+
this.schemaRegistryService = new SchemaRegistryService(SchemaRegistryConfigLoader.load());
6476
this.parserService = parserService;
6577
this.roleService = new RoleService();
6678
this.confluentCloudService = new ConfluentCloudService(objectMapper);
67-
this.planManager = new PlanManager(managerConfig, kafkaService, objectMapper);
68-
this.applyManager = new ApplyManager(managerConfig, kafkaService);
79+
this.planManager = new PlanManager(managerConfig, kafkaService, schemaRegistryService, objectMapper);
80+
this.applyManager = new ApplyManager(managerConfig, kafkaService, schemaRegistryService);
6981
}
7082

7183
public DesiredStateFile getAndValidateStateFile() {
7284
DesiredStateFile desiredStateFile = parserService.parseStateFile();
7385
validateTopics(desiredStateFile);
7486
validateCustomAcls(desiredStateFile);
87+
validateSchemas(desiredStateFile);
7588
this.describeAclEnabled = StateUtil.isDescribeTopicAclEnabled(desiredStateFile);
7689
return desiredStateFile;
7790
}
@@ -90,6 +103,7 @@ private DesiredPlan generatePlan() {
90103
planManager.planAcls(desiredState, desiredPlan);
91104
}
92105
planManager.planTopics(desiredState, desiredPlan);
106+
planManager.planSchemas(desiredState, desiredPlan);
93107
return desiredPlan.build();
94108
}
95109

@@ -105,6 +119,7 @@ public DesiredPlan apply() {
105119
if (!managerConfig.isSkipAclsDisabled()) {
106120
applyManager.applyAcls(desiredPlan);
107121
}
122+
applyManager.applySchemas(desiredPlan);
108123

109124
return desiredPlan;
110125
}
@@ -145,6 +160,7 @@ private DesiredState getDesiredState() {
145160
.addAllPrefixedTopicsToIgnore(getPrefixedTopicsToIgnore(desiredStateFile));
146161

147162
generateTopicsState(desiredState, desiredStateFile);
163+
generateSchemasState(desiredState, desiredStateFile);
148164

149165
if (isConfluentCloudEnabled(desiredStateFile)) {
150166
generateConfluentCloudServiceAcls(desiredState, desiredStateFile);
@@ -169,6 +185,10 @@ private void generateTopicsState(DesiredState.Builder desiredState, DesiredState
169185
}
170186
}
171187

188+
private void generateSchemasState(DesiredState.Builder desiredState, DesiredStateFile desiredStateFile) {
189+
desiredState.putAllSchemas(desiredStateFile.getSchemas());
190+
}
191+
172192
private void generateConfluentCloudServiceAcls(DesiredState.Builder desiredState, DesiredStateFile desiredStateFile) {
173193
List<ServiceAccount> serviceAccounts = confluentCloudService.getServiceAccounts();
174194
desiredStateFile.getServices().forEach((name, service) -> {
@@ -321,6 +341,47 @@ private void validateTopics(DesiredStateFile desiredStateFile) {
321341
}
322342
}
323343

344+
private void validateSchemas(DesiredStateFile desiredStateFile) {
345+
if (!desiredStateFile.getSchemas().isEmpty()) {
346+
SchemaRegistryConfig schemaRegistryConfig = SchemaRegistryConfigLoader.load();
347+
desiredStateFile.getSchemas().forEach((s, schemaDetails) -> {
348+
if (!schemaDetails.getType().equalsIgnoreCase("Avro")) {
349+
throw new ValidationException(String.format("Schema type %s is currently not supported.", schemaDetails.getType()));
350+
}
351+
if (!Files.exists(Paths.get(schemaRegistryConfig.getConfig().get("SCHEMA_DIRECTORY") + "/" + schemaDetails.getFile()))) {
352+
throw new ValidationException(String.format("Schema file %s not found in schema directory at %s", schemaDetails.getFile(), schemaRegistryConfig.getConfig().get("SCHEMA_DIRECTORY")));
353+
}
354+
if (schemaDetails.getType().equalsIgnoreCase("Avro")) {
355+
AvroSchemaProvider avroSchemaProvider = new AvroSchemaProvider();
356+
if (schemaDetails.getReferences().isEmpty() && schemaDetails.getType().equalsIgnoreCase("Avro")) {
357+
Optional<ParsedSchema> parsedSchema = avroSchemaProvider.parseSchema(schemaRegistryService.loadSchemaFromDisk(schemaDetails.getFile()), Collections.emptyList());
358+
if (!parsedSchema.isPresent()) {
359+
throw new ValidationException(String.format("Avro schema %s could not be parsed.", schemaDetails.getFile()));
360+
}
361+
} else {
362+
List<SchemaReference> schemaReferences = new ArrayList<>();
363+
schemaDetails.getReferences().forEach(referenceDetails -> {
364+
SchemaReference schemaReference = new SchemaReference(referenceDetails.getName(), referenceDetails.getSubject(), referenceDetails.getVersion());
365+
schemaReferences.add(schemaReference);
366+
});
367+
// we need to pass a schema registry client as a config because the underlying code validates against the current state
368+
avroSchemaProvider.configure(Collections.singletonMap(SchemaProvider.SCHEMA_VERSION_FETCHER_CONFIG, schemaRegistryService.createSchemaRegistryClient()));
369+
try {
370+
Optional<ParsedSchema> parsedSchema = avroSchemaProvider.parseSchema(schemaRegistryService.loadSchemaFromDisk(schemaDetails.getFile()), schemaReferences);
371+
if (!parsedSchema.isPresent()) {
372+
throw new ValidationException(String.format("Avro schema %s could not be parsed.", schemaDetails.getFile()));
373+
}
374+
} catch (IllegalStateException ex) {
375+
throw new ValidationException(String.format("Reference validation error: %s", ex.getMessage()));
376+
} catch (RuntimeException ex) {
377+
throw new ValidationException(String.format("Error thrown when attempting to validate schema with reference", ex.getMessage()));
378+
}
379+
}
380+
}
381+
});
382+
}
383+
}
384+
324385
private boolean isConfluentCloudEnabled(DesiredStateFile desiredStateFile) {
325386
if (desiredStateFile.getSettings().isPresent() && desiredStateFile.getSettings().get().getCcloud().isPresent()) {
326387
return desiredStateFile.getSettings().get().getCcloud().get().isEnabled();

src/main/java/com/devshawn/kafka/gitops/cli/ApplyCommand.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.devshawn.kafka.gitops.exception.MissingConfigurationException;
99
import com.devshawn.kafka.gitops.exception.PlanIsUpToDateException;
1010
import com.devshawn.kafka.gitops.exception.ReadPlanInputException;
11+
import com.devshawn.kafka.gitops.exception.SchemaRegistryExecutionException;
1112
import com.devshawn.kafka.gitops.exception.ValidationException;
1213
import com.devshawn.kafka.gitops.service.ParserService;
1314
import com.devshawn.kafka.gitops.util.LogUtil;
@@ -45,6 +46,8 @@ public Integer call() {
4546
LogUtil.printValidationResult(ex.getMessage(), false);
4647
} catch (KafkaExecutionException ex) {
4748
LogUtil.printKafkaExecutionError(ex, true);
49+
} catch (SchemaRegistryExecutionException ex) {
50+
LogUtil.printSchemaRegistryExecutionError(ex, true);
4851
}
4952
return 2;
5053
}

src/main/java/com/devshawn/kafka/gitops/cli/PlanCommand.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.devshawn.kafka.gitops.exception.KafkaExecutionException;
88
import com.devshawn.kafka.gitops.exception.MissingConfigurationException;
99
import com.devshawn.kafka.gitops.exception.PlanIsUpToDateException;
10+
import com.devshawn.kafka.gitops.exception.SchemaRegistryExecutionException;
1011
import com.devshawn.kafka.gitops.exception.ValidationException;
1112
import com.devshawn.kafka.gitops.exception.WritePlanOutputException;
1213
import com.devshawn.kafka.gitops.service.ParserService;
@@ -49,6 +50,8 @@ public Integer call() {
4950
LogUtil.printKafkaExecutionError(ex);
5051
} catch (WritePlanOutputException ex) {
5152
LogUtil.printPlanOutputError(ex);
53+
} catch (SchemaRegistryExecutionException ex) {
54+
LogUtil.printSchemaRegistryExecutionError(ex);
5255
}
5356
return 2;
5457
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.devshawn.kafka.gitops.config;
2+
3+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
4+
import org.inferred.freebuilder.FreeBuilder;
5+
6+
import java.util.Map;
7+
8+
@FreeBuilder
9+
@JsonDeserialize(builder = SchemaRegistryConfig.Builder.class)
10+
public interface SchemaRegistryConfig {
11+
12+
Map<String, Object> getConfig();
13+
14+
class Builder extends SchemaRegistryConfig_Builder {
15+
16+
}
17+
18+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package com.devshawn.kafka.gitops.config;
2+
3+
import com.devshawn.kafka.gitops.exception.MissingConfigurationException;
4+
import com.devshawn.kafka.gitops.exception.MissingMultipleConfigurationException;
5+
import org.slf4j.LoggerFactory;
6+
7+
import java.util.HashMap;
8+
import java.util.Map;
9+
import java.util.concurrent.atomic.AtomicReference;
10+
11+
public class SchemaRegistryConfigLoader {
12+
13+
private static org.slf4j.Logger log = LoggerFactory.getLogger(SchemaRegistryConfigLoader.class);
14+
15+
public static SchemaRegistryConfig load() {
16+
SchemaRegistryConfig.Builder builder = new SchemaRegistryConfig.Builder();
17+
setConfig(builder);
18+
return builder.build();
19+
}
20+
21+
private static void setConfig(SchemaRegistryConfig.Builder builder) {
22+
Map<String, Object> config = new HashMap<>();
23+
AtomicReference<String> username = new AtomicReference<>();
24+
AtomicReference<String> password = new AtomicReference<>();
25+
26+
Map<String, String> environment = System.getenv();
27+
28+
environment.forEach((key, value) -> {
29+
if (key.equals("SCHEMA_REGISTRY_SASL_JAAS_USERNAME")) {
30+
username.set(value);
31+
} else if (key.equals("SCHEMA_REGISTRY_SASL_JAAS_PASSWORD")) {
32+
password.set(value);
33+
} else if (key.equals("SCHEMA_REGISTRY_URL")) {
34+
config.put("SCHEMA_REGISTRY_URL", value);
35+
} else if (key.equals("SCHEMA_DIRECTORY")) {
36+
config.put("SCHEMA_DIRECTORY", value);
37+
}
38+
});
39+
40+
handleDefaultConfig(config);
41+
handleAuthentication(username, password, config);
42+
43+
log.info("Schema Registry Config: {}", config);
44+
45+
builder.putAllConfig(config);
46+
}
47+
48+
private static void handleDefaultConfig(Map<String, Object> config) {
49+
final String DEFAULT_URL = "http://localhost:8081";
50+
final String CURRENT_WORKING_DIR = System.getProperty("user.dir");
51+
if (!config.containsKey("SCHEMA_REGISTRY_URL")) {
52+
log.info("SCHEMA_REGISTRY_URL not set. Using default value of {}", DEFAULT_URL);
53+
config.put("SCHEMA_REGISTRY_URL", DEFAULT_URL);
54+
}
55+
if (!config.containsKey("SCHEMA_DIRECTORY")) {
56+
log.info("SCHEMA_DIRECTORY not set. Defaulting to current working directory: {}", CURRENT_WORKING_DIR);
57+
config.put("SCHEMA_DIRECTORY", CURRENT_WORKING_DIR);
58+
}
59+
}
60+
61+
private static void handleAuthentication(AtomicReference<String> username, AtomicReference<String> password, Map<String, Object> config) {
62+
if (username.get() != null && password.get() != null) {
63+
String loginModule = "org.apache.kafka.common.security.plain.PlainLoginModule";
64+
String value = String.format("%s required username=\"%s\" password=\"%s\";",
65+
loginModule, escape(username.get()), escape(password.get()));
66+
config.put("SCHEMA_REGISTRY_SASL_CONFIG", value);
67+
} else if (username.get() != null) {
68+
throw new MissingConfigurationException("SCHEMA_REGISTRY_SASL_JAAS_USERNAME");
69+
} else if (password.get() != null) {
70+
throw new MissingConfigurationException("SCHEMA_REGISTRY_SASL_JAAS_PASSWORD");
71+
} else if (username.get() == null & password.get() == null) {
72+
throw new MissingMultipleConfigurationException("SCHEMA_REGISTRY_SASL_JAAS_PASSWORD", "SCHEMA_REGISTRY_SASL_JAAS_USERNAME");
73+
}
74+
}
75+
76+
private static String escape(String value) {
77+
if (value != null) {
78+
return value.replace("\"", "\\\"");
79+
}
80+
return null;
81+
}
82+
}

src/main/java/com/devshawn/kafka/gitops/domain/plan/DesiredPlan.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,15 @@ public interface DesiredPlan {
1212

1313
List<TopicPlan> getTopicPlans();
1414

15+
List<SchemaPlan> getSchemaPlans();
16+
1517
List<AclPlan> getAclPlans();
1618

1719
default DesiredPlan toChangesOnlyPlan() {
1820
DesiredPlan.Builder builder = new DesiredPlan.Builder();
1921
getTopicPlans().stream().filter(it -> !it.getAction().equals(PlanAction.NO_CHANGE)).map(TopicPlan::toChangesOnlyPlan).forEach(builder::addTopicPlans);
2022
getAclPlans().stream().filter(it -> !it.getAction().equals(PlanAction.NO_CHANGE)).forEach(builder::addAclPlans);
23+
getSchemaPlans().stream().filter(it -> !it.getAction().equals(PlanAction.NO_CHANGE)).forEach(builder::addSchemaPlans);
2124
return builder.build();
2225
}
2326

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.devshawn.kafka.gitops.domain.plan;
2+
3+
import com.devshawn.kafka.gitops.domain.state.SchemaDetails;
4+
import com.devshawn.kafka.gitops.enums.PlanAction;
5+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
6+
import org.inferred.freebuilder.FreeBuilder;
7+
8+
import java.util.Optional;
9+
10+
@FreeBuilder
11+
@JsonDeserialize(builder = SchemaPlan.Builder.class)
12+
public interface SchemaPlan {
13+
14+
String getName();
15+
16+
PlanAction getAction();
17+
18+
Optional<SchemaDetails> getSchemaDetails();
19+
20+
class Builder extends SchemaPlan_Builder {
21+
}
22+
}

src/main/java/com/devshawn/kafka/gitops/domain/state/DesiredState.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ public interface DesiredState {
1414

1515
Map<String, AclDetails> getAcls();
1616

17+
Map<String, SchemaDetails> getSchemas();
18+
1719
List<String> getPrefixedTopicsToIgnore();
1820

1921
class Builder extends DesiredState_Builder {

src/main/java/com/devshawn/kafka/gitops/domain/state/DesiredStateFile.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ public interface DesiredStateFile {
1818

1919
Map<String, TopicDetails> getTopics();
2020

21+
Map<String, SchemaDetails> getSchemas();
22+
2123
Map<String, UserDetails> getUsers();
2224

2325
Map<String, Map<String, CustomAclDetails>> getCustomServiceAcls();

0 commit comments

Comments
 (0)