Skip to content

Commit 5f8d402

Browse files
author
Jerome Revillard
committed
Tests
1 parent f72614e commit 5f8d402

File tree

148 files changed

+4421
-2699
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

148 files changed

+4421
-2699
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@ plan.json
1414
test.py
1515
/generated/
1616
/.apt_generated/
17+
/.apt_generated_tests/

build.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ dependencies {
3737
implementation ('io.confluent:kafka-schema-registry-client:6.1.1')
3838
implementation ('io.confluent:kafka-json-schema-provider:6.1.1')
3939
implementation ('io.confluent:kafka-protobuf-serializer:6.1.1')
40-
implementation('com.flipkart.zjsonpatch:zjsonpatch:0.4.11')
4140

4241
compile 'org.slf4j:slf4j-api:1.7.30'
4342
compile group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3'

docker/docker-compose.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,13 +96,13 @@ services:
9696
image: confluentinc/cp-schema-registry:6.1.1
9797
hostname: schema-registry
9898
ports:
99-
- "8081:8081"
99+
- "8082:8082"
100100
environment:
101101
SCHEMA_REGISTRY_HOST_NAME: schema-registry
102102
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "kafka1:19092,kafka2:19092,kafka3:19092"
103103
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: SASL_PLAINTEXT
104104
SCHEMA_REGISTRY_KAFKASTORE_SASL_MECHANISM: PLAIN
105-
SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081"
105+
SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8082"
106106
SCHEMA_REGISTRY_GROUP_ID: "schema-registry-test"
107107
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/registry_jaas.conf"
108108
SCHEMA_REGISTRY_OPTS: "-Djava.security.auth.login.config=/etc/kafka/registry_jaas.conf"

src/main/java/com/devshawn/kafka/gitops/StateManager.java

Lines changed: 44 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,27 @@
11
package com.devshawn.kafka.gitops;
22

3-
import ch.qos.logback.classic.Level;
4-
import ch.qos.logback.classic.Logger;
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
import java.util.Map;
6+
import java.util.NoSuchElementException;
7+
import java.util.Optional;
8+
import java.util.concurrent.atomic.AtomicInteger;
9+
import java.util.concurrent.atomic.AtomicReference;
10+
import org.slf4j.LoggerFactory;
511
import com.devshawn.kafka.gitops.config.KafkaGitopsConfigLoader;
612
import com.devshawn.kafka.gitops.config.ManagerConfig;
13+
import com.devshawn.kafka.gitops.config.SchemaRegistryConfigLoader;
714
import com.devshawn.kafka.gitops.domain.confluent.ServiceAccount;
815
import com.devshawn.kafka.gitops.domain.options.GetAclOptions;
916
import com.devshawn.kafka.gitops.domain.plan.DesiredPlan;
1017
import com.devshawn.kafka.gitops.domain.state.AclDetails;
1118
import com.devshawn.kafka.gitops.domain.state.CustomAclDetails;
1219
import com.devshawn.kafka.gitops.domain.state.DesiredState;
1320
import com.devshawn.kafka.gitops.domain.state.DesiredStateFile;
21+
import com.devshawn.kafka.gitops.domain.state.SchemaDetails;
1422
import com.devshawn.kafka.gitops.domain.state.TopicDetails;
1523
import com.devshawn.kafka.gitops.domain.state.service.KafkaStreamsService;
16-
import com.devshawn.kafka.gitops.enums.SchemaType;
24+
import com.devshawn.kafka.gitops.enums.SchemaCompatibility;
1725
import com.devshawn.kafka.gitops.exception.ConfluentCloudException;
1826
import com.devshawn.kafka.gitops.exception.InvalidAclDefinitionException;
1927
import com.devshawn.kafka.gitops.exception.MissingConfigurationException;
@@ -25,26 +33,18 @@
2533
import com.devshawn.kafka.gitops.service.KafkaService;
2634
import com.devshawn.kafka.gitops.service.ParserService;
2735
import com.devshawn.kafka.gitops.service.RoleService;
28-
import com.devshawn.kafka.gitops.config.SchemaRegistryConfig;
29-
import com.devshawn.kafka.gitops.config.SchemaRegistryConfigLoader;
3036
import com.devshawn.kafka.gitops.service.SchemaRegistryService;
3137
import com.devshawn.kafka.gitops.util.LogUtil;
3238
import com.devshawn.kafka.gitops.util.StateUtil;
3339
import com.fasterxml.jackson.core.JsonParser;
40+
import com.fasterxml.jackson.core.util.DefaultIndenter;
41+
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
3442
import com.fasterxml.jackson.databind.DeserializationFeature;
3543
import com.fasterxml.jackson.databind.ObjectMapper;
44+
import com.fasterxml.jackson.databind.SerializationFeature;
3645
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
37-
import org.slf4j.LoggerFactory;
38-
39-
import java.nio.file.Files;
40-
import java.nio.file.Paths;
41-
import java.util.ArrayList;
42-
import java.util.List;
43-
import java.util.Map;
44-
import java.util.NoSuchElementException;
45-
import java.util.Optional;
46-
import java.util.concurrent.atomic.AtomicInteger;
47-
import java.util.concurrent.atomic.AtomicReference;
46+
import ch.qos.logback.classic.Level;
47+
import ch.qos.logback.classic.Logger;
4848

4949
public class StateManager {
5050

@@ -168,7 +168,7 @@ private DesiredState getDesiredState() {
168168
}
169169

170170
private void generateTopicsState(DesiredState.Builder desiredState, DesiredStateFile desiredStateFile) {
171-
Optional<Integer> defaultReplication = StateUtil.fetchReplication(desiredStateFile);
171+
Optional<Integer> defaultReplication = StateUtil.fetchDefaultTopicsReplication(desiredStateFile);
172172
if (defaultReplication.isPresent()) {
173173
desiredStateFile.getTopics().forEach((name, details) -> {
174174
Integer replication = details.getReplication().isPresent() ? details.getReplication().get() : defaultReplication.get();
@@ -180,7 +180,15 @@ private void generateTopicsState(DesiredState.Builder desiredState, DesiredState
180180
}
181181

182182
private void generateSchemasState(DesiredState.Builder desiredState, DesiredStateFile desiredStateFile) {
183-
desiredState.putAllSchemas(desiredStateFile.getSchemas());
183+
Optional<SchemaCompatibility> defaultSchemaCompatibility = StateUtil.fetchDefaultSchemasCompatibility(desiredStateFile);
184+
if (defaultSchemaCompatibility.isPresent()) {
185+
desiredStateFile.getSchemas().forEach((s, details) -> {
186+
SchemaCompatibility compatibility = details.getCompatibility().isPresent() ? details.getCompatibility().get() : defaultSchemaCompatibility.get();
187+
desiredState.putSchemas(s, new SchemaDetails.Builder().mergeFrom(details).setCompatibility(compatibility).build());
188+
});
189+
} else {
190+
desiredState.putAllSchemas(desiredStateFile.getSchemas());
191+
}
184192
}
185193

186194
private void generateConfluentCloudServiceAcls(DesiredState.Builder desiredState, DesiredStateFile desiredStateFile) {
@@ -321,7 +329,7 @@ private void validateCustomAcls(DesiredStateFile desiredStateFile) {
321329
}
322330

323331
private void validateTopics(DesiredStateFile desiredStateFile) {
324-
Optional<Integer> defaultReplication = StateUtil.fetchReplication(desiredStateFile);
332+
Optional<Integer> defaultReplication = StateUtil.fetchDefaultTopicsReplication(desiredStateFile);
325333
if (!defaultReplication.isPresent()) {
326334
desiredStateFile.getTopics().forEach((name, details) -> {
327335
if (!details.getReplication().isPresent()) {
@@ -336,18 +344,13 @@ private void validateTopics(DesiredStateFile desiredStateFile) {
336344
}
337345

338346
private void validateSchemas(DesiredStateFile desiredStateFile) {
339-
if (!desiredStateFile.getSchemas().isEmpty()) {
340-
SchemaRegistryConfig schemaRegistryConfig = SchemaRegistryConfigLoader.load();
341-
desiredStateFile.getSchemas().forEach((s, schemaDetails) -> {
342-
if (!schemaDetails.getType().equalsIgnoreCase(SchemaType.AVRO.toString()) ||
343-
!schemaDetails.getType().equalsIgnoreCase(SchemaType.JSON.toString()) ||
344-
!schemaDetails.getType().equalsIgnoreCase(SchemaType.PROTOBUF.toString())) {
345-
throw new ValidationException(String.format("Schema type %s is currently not supported.", schemaDetails.getType()));
346-
}
347-
if (!Files.exists(Paths.get(schemaRegistryConfig.getConfig().get("SCHEMA_DIRECTORY") + "/" + schemaDetails.getFile()))) {
348-
throw new ValidationException(String.format("Schema file %s not found in schema directory at %s", schemaDetails.getFile(), schemaRegistryConfig.getConfig().get("SCHEMA_DIRECTORY")));
347+
Optional<SchemaCompatibility> defaultSchemaCompatibility = StateUtil.fetchDefaultSchemasCompatibility(desiredStateFile);
348+
if (!defaultSchemaCompatibility.isPresent()) {
349+
desiredStateFile.getSchemas().forEach((subject, details) -> {
350+
if (!details.getCompatibility().isPresent()) {
351+
throw new ValidationException(String.format("Not set: [compatibility] in state file definition: schema -> %s", subject));
349352
}
350-
schemaRegistryService.validateSchema(schemaDetails);
353+
schemaRegistryService.validateSchema(subject, details);
351354
});
352355
}
353356
}
@@ -360,11 +363,17 @@ private boolean isConfluentCloudEnabled(DesiredStateFile desiredStateFile) {
360363
}
361364

362365
private ObjectMapper initializeObjectMapper() {
363-
ObjectMapper objectMapper = new ObjectMapper();
364-
objectMapper.enable(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES);
365-
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
366-
objectMapper.registerModule(new Jdk8Module());
367-
return objectMapper;
366+
ObjectMapper gitopsObjectMapper = new ObjectMapper();
367+
gitopsObjectMapper.enable(SerializationFeature.INDENT_OUTPUT);
368+
gitopsObjectMapper.enable(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES);
369+
gitopsObjectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
370+
gitopsObjectMapper.registerModule(new Jdk8Module());
371+
DefaultIndenter defaultIndenter = new DefaultIndenter(" ", DefaultIndenter.SYS_LF);
372+
DefaultPrettyPrinter printer = new DefaultPrettyPrinter()
373+
.withObjectIndenter(defaultIndenter)
374+
.withArrayIndenter(defaultIndenter);
375+
gitopsObjectMapper.setDefaultPrettyPrinter(printer);
376+
return gitopsObjectMapper;
368377
}
369378

370379
private void initializeLogger(boolean verbose) {

src/main/java/com/devshawn/kafka/gitops/config/SchemaRegistryConfigLoader.java

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package com.devshawn.kafka.gitops.config;
22

3-
import com.devshawn.kafka.gitops.exception.MissingConfigurationException;
4-
import com.devshawn.kafka.gitops.exception.MissingMultipleConfigurationException;
53
import org.slf4j.LoggerFactory;
64

75
import java.util.HashMap;
@@ -12,6 +10,12 @@ public class SchemaRegistryConfigLoader {
1210

1311
private static org.slf4j.Logger log = LoggerFactory.getLogger(SchemaRegistryConfigLoader.class);
1412

13+
public static final String SCHEMA_REGISTRY_URL_KEY = "SCHEMA_REGISTRY_URL";
14+
public static final String SCHEMA_DIRECTORY_KEY = "SCHEMA_DIRECTORY";
15+
public static final String SCHEMA_REGISTRY_SASL_JAAS_USERNAME_KEY = "SCHEMA_REGISTRY_SASL_JAAS_USERNAME";
16+
public static final String SCHEMA_REGISTRY_SASL_JAAS_PASSWORD_KEY = "SCHEMA_REGISTRY_SASL_JAAS_PASSWORD";
17+
public static final String SCHEMA_REGISTRY_SASL_CONFIG_KEY = "SCHEMA_REGISTRY_SASL_CONFIG";
18+
1519
private SchemaRegistryConfigLoader() {}
1620

1721
public static SchemaRegistryConfig load() {
@@ -28,14 +32,14 @@ private static void setConfig(SchemaRegistryConfig.Builder builder) {
2832
Map<String, String> environment = System.getenv();
2933

3034
environment.forEach((key, value) -> {
31-
if (key.equals("SCHEMA_REGISTRY_SASL_JAAS_USERNAME")) {
35+
if (key.equals(SCHEMA_REGISTRY_SASL_JAAS_USERNAME_KEY)) {
3236
username.set(value);
33-
} else if (key.equals("SCHEMA_REGISTRY_SASL_JAAS_PASSWORD")) {
37+
} else if (key.equals(SCHEMA_REGISTRY_SASL_JAAS_PASSWORD_KEY)) {
3438
password.set(value);
35-
} else if (key.equals("SCHEMA_REGISTRY_URL")) {
36-
config.put("SCHEMA_REGISTRY_URL", value);
37-
} else if (key.equals("SCHEMA_DIRECTORY")) {
38-
config.put("SCHEMA_DIRECTORY", value);
39+
} else if (key.equals(SCHEMA_REGISTRY_URL_KEY)) {
40+
config.put(SCHEMA_REGISTRY_URL_KEY, value);
41+
} else if (key.equals(SCHEMA_DIRECTORY_KEY)) {
42+
config.put(SCHEMA_DIRECTORY_KEY, value);
3943
}
4044
});
4145

@@ -50,13 +54,13 @@ private static void setConfig(SchemaRegistryConfig.Builder builder) {
5054
private static void handleDefaultConfig(Map<String, Object> config) {
5155
final String DEFAULT_URL = "http://localhost:8081";
5256
final String CURRENT_WORKING_DIR = System.getProperty("user.dir");
53-
if (!config.containsKey("SCHEMA_REGISTRY_URL")) {
54-
log.info("SCHEMA_REGISTRY_URL not set. Using default value of {}", DEFAULT_URL);
55-
config.put("SCHEMA_REGISTRY_URL", DEFAULT_URL);
57+
if (!config.containsKey(SCHEMA_REGISTRY_URL_KEY)) {
58+
log.info("{} not set. Using default value of {}", SCHEMA_REGISTRY_URL_KEY, DEFAULT_URL);
59+
config.put(SCHEMA_REGISTRY_URL_KEY, DEFAULT_URL);
5660
}
57-
if (!config.containsKey("SCHEMA_DIRECTORY")) {
58-
log.info("SCHEMA_DIRECTORY not set. Defaulting to current working directory: {}", CURRENT_WORKING_DIR);
59-
config.put("SCHEMA_DIRECTORY", CURRENT_WORKING_DIR);
61+
if (!config.containsKey(SCHEMA_DIRECTORY_KEY)) {
62+
log.info("{} not set. Defaulting to current working directory: {}", SCHEMA_DIRECTORY_KEY, CURRENT_WORKING_DIR);
63+
config.put(SCHEMA_DIRECTORY_KEY, CURRENT_WORKING_DIR);
6064
}
6165
}
6266

@@ -65,13 +69,12 @@ private static void handleAuthentication(AtomicReference<String> username, Atomi
6569
String loginModule = "org.apache.kafka.common.security.plain.PlainLoginModule";
6670
String value = String.format("%s required username=\"%s\" password=\"%s\";",
6771
loginModule, escape(username.get()), escape(password.get()));
68-
config.put("SCHEMA_REGISTRY_SASL_CONFIG", value);
69-
} else if (username.get() != null) {
70-
throw new MissingConfigurationException("SCHEMA_REGISTRY_SASL_JAAS_USERNAME");
71-
} else if (password.get() != null) {
72-
throw new MissingConfigurationException("SCHEMA_REGISTRY_SASL_JAAS_PASSWORD");
73-
} else if (username.get() == null && password.get() == null) {
74-
throw new MissingMultipleConfigurationException("SCHEMA_REGISTRY_SASL_JAAS_PASSWORD", "SCHEMA_REGISTRY_SASL_JAAS_USERNAME");
72+
config.put(SCHEMA_REGISTRY_SASL_CONFIG_KEY, value);
73+
} else {
74+
if(config.get(SCHEMA_REGISTRY_SASL_CONFIG_KEY) == null) {
75+
log.info("{} or {} not set. No authentication configured for the Schema Registry",
76+
SCHEMA_REGISTRY_SASL_JAAS_USERNAME_KEY, SCHEMA_REGISTRY_SASL_JAAS_PASSWORD_KEY);
77+
}
7578
}
7679
}
7780

Lines changed: 64 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,79 @@
11
package com.devshawn.kafka.gitops.domain.state;
22

3-
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
4-
import org.inferred.freebuilder.FreeBuilder;
5-
3+
import java.io.IOException;
4+
import java.nio.charset.StandardCharsets;
5+
import java.nio.file.Files;
6+
import java.nio.file.Paths;
7+
import java.util.Collections;
68
import java.util.List;
9+
import java.util.Map;
10+
import java.util.Optional;
11+
import org.inferred.freebuilder.FreeBuilder;
12+
import com.devshawn.kafka.gitops.config.SchemaRegistryConfigLoader;
13+
import com.devshawn.kafka.gitops.enums.SchemaCompatibility;
14+
import com.devshawn.kafka.gitops.enums.SchemaType;
15+
import com.devshawn.kafka.gitops.exception.SchemaRegistryExecutionException;
16+
import com.devshawn.kafka.gitops.exception.ValidationException;
17+
import com.devshawn.kafka.gitops.service.SchemaRegistryService;
18+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
19+
import io.confluent.kafka.schemaregistry.AbstractSchemaProvider;
20+
import io.confluent.kafka.schemaregistry.ParsedSchema;
721

822
@FreeBuilder
9-
@JsonDeserialize(builder = SchemaDetails.Builder.class)
23+
@JsonDeserialize(builder = SchemaDetails.Builder.class)
1024
public interface SchemaDetails {
1125

12-
String getType();
26+
SchemaType getType();
27+
28+
String getSchema();
1329

14-
String getFile();
30+
Optional<String> getFile();
1531

16-
List<String> getSubjects();
32+
Optional<SchemaCompatibility> getCompatibility();
1733

1834
List<ReferenceDetails> getReferences();
1935

2036
class Builder extends SchemaDetails_Builder {
37+
@Override
38+
public SchemaDetails build() {
39+
AbstractSchemaProvider schemaProvider = SchemaRegistryService.schemaProviderFromType(super.getType());
40+
ParsedSchema parsedSchema;
41+
if(super.getFile().isPresent()) {
42+
boolean schema = true;
43+
try {
44+
super.getSchema();
45+
}catch (IllegalStateException e) {
46+
schema = false;
47+
}
48+
if ( schema ) {
49+
throw new IllegalStateException("schema and file fields cannot be both set at the same time");
50+
}
51+
parsedSchema = schemaProvider.parseSchema(loadSchemaFromDisk(super.getFile().get()), Collections.emptyList()).get();
52+
super.setFile(Optional.empty());
53+
} else {
54+
String schema;
55+
try {
56+
schema = super.getSchema();
57+
}catch (IllegalStateException e) {
58+
throw new IllegalStateException("schema or file field must be provided");
59+
}
60+
parsedSchema = schemaProvider.parseSchema(schema, Collections.emptyList()).get();
61+
}
62+
super.setSchema(parsedSchema.canonicalString());
63+
return super.build();
64+
}
65+
66+
private String loadSchemaFromDisk(String fileName) {
67+
Map<String, Object> config = SchemaRegistryConfigLoader.load().getConfig();
68+
final String SCHEMA_DIRECTORY = config.get(SchemaRegistryConfigLoader.SCHEMA_DIRECTORY_KEY).toString();
69+
if (!Files.exists(Paths.get(SCHEMA_DIRECTORY + "/" + fileName))) {
70+
throw new ValidationException(String.format("Schema file %s not found in schema directory at %s", getFile(), config.get("SCHEMA_DIRECTORY")));
71+
}
72+
try {
73+
return new String(Files.readAllBytes(Paths.get(SCHEMA_DIRECTORY + "/" + fileName)), StandardCharsets.UTF_8);
74+
} catch (IOException ex) {
75+
throw new SchemaRegistryExecutionException("Error thrown when attempting to load a schema from schema directory", ex.getMessage());
76+
}
77+
}
2178
}
2279
}

src/main/java/com/devshawn/kafka/gitops/domain/state/settings/SettingsFiles.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ public interface SettingsFiles {
1414
Optional<String> getTopics();
1515

1616
Optional<String> getUsers();
17+
18+
Optional<String> getSchemas();
1719

1820
class Builder extends SettingsFiles_Builder {
1921
}

src/main/java/com/devshawn/kafka/gitops/domain/state/settings/SettingsSchema.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.devshawn.kafka.gitops.domain.state.settings;
22

3+
import com.devshawn.kafka.gitops.enums.SchemaCompatibility;
34
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
45
import org.inferred.freebuilder.FreeBuilder;
56

@@ -13,6 +14,8 @@ public interface SettingsSchema {
1314

1415
Optional<SettingsDirectory> getDirectory();
1516

17+
Optional<SettingsSchemasDefaults> getDefaults();
18+
1619
class Builder extends SettingsSchema_Builder {
1720
}
1821
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.devshawn.kafka.gitops.domain.state.settings;
2+
3+
import java.util.Optional;
4+
import org.inferred.freebuilder.FreeBuilder;
5+
import com.devshawn.kafka.gitops.enums.SchemaCompatibility;
6+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
7+
8+
@FreeBuilder
9+
@JsonDeserialize(builder = SettingsSchemasDefaults.Builder.class)
10+
public interface SettingsSchemasDefaults {
11+
12+
Optional<SchemaCompatibility> getCompatibility();
13+
14+
class Builder extends SettingsSchemasDefaults_Builder {
15+
}
16+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package com.devshawn.kafka.gitops.enums;
2+
3+
public enum SchemaCompatibility {
4+
NONE,
5+
BACKWARD,
6+
FORWARD,
7+
FULL,
8+
BACKWARD_TRANSITIVE,
9+
FORWARD_TRANSITIVE,
10+
FULL_TRANSITIVE;
11+
}

0 commit comments

Comments
 (0)