Skip to content

Commit a979cc5

Browse files
jrevillardJerome Revillard
authored andcommitted
Implement replication and partitions synchronisation
1 parent 43933f3 commit a979cc5

File tree

61 files changed

+1003
-266
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+1003
-266
lines changed

docker/docker-compose.yml

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ services:
1515
- ./data/zoo1/datalog:/datalog
1616

1717
kafka1:
18-
image: confluentinc/cp-kafka:5.3.1
18+
image: confluentinc/cp-kafka:5.5.3
1919
hostname: kafka1
2020
ports:
2121
- "9092:9092"
@@ -38,4 +38,57 @@ services:
3838
- ./data/kafka1/data:/var/lib/kafka/data
3939
- ./config/kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf
4040
depends_on:
41-
- zoo1
41+
- zoo1
42+
43+
kafka2:
44+
image: confluentinc/cp-kafka:5.5.3
45+
hostname: kafka2
46+
ports:
47+
- "9093:9093"
48+
environment:
49+
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
50+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:SASL_PLAINTEXT,LISTENER_DOCKER_EXTERNAL:SASL_PLAINTEXT
51+
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
52+
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
53+
KAFKA_BROKER_ID: 2
54+
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
55+
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
56+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
57+
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
58+
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
59+
ZOOKEEPER_SASL_ENABLED: "false"
60+
KAFKA_AUTHORIZER_CLASS_NAME: "kafka.security.auth.SimpleAclAuthorizer"
61+
KAFKA_SUPER_USERS: "User:test;User:kafka"
62+
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: "false"
63+
volumes:
64+
- ./data/kafka2/data:/var/lib/kafka/data
65+
- ./config/kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf
66+
depends_on:
67+
- zoo1
68+
69+
kafka3:
70+
image: confluentinc/cp-kafka:5.5.3
71+
hostname: kafka3
72+
ports:
73+
- "9094:9094"
74+
environment:
75+
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094
76+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:SASL_PLAINTEXT,LISTENER_DOCKER_EXTERNAL:SASL_PLAINTEXT
77+
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
78+
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
79+
KAFKA_BROKER_ID: 3
80+
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
81+
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
82+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
83+
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
84+
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
85+
ZOOKEEPER_SASL_ENABLED: "false"
86+
KAFKA_AUTHORIZER_CLASS_NAME: "kafka.security.auth.SimpleAclAuthorizer"
87+
KAFKA_SUPER_USERS: "User:test;User:kafka"
88+
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: "false"
89+
volumes:
90+
- ./data/kafka3/data:/var/lib/kafka/data
91+
- ./config/kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf
92+
depends_on:
93+
- zoo1
94+
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package com.devshawn.kafka.gitops.domain.plan;
2+
3+
import java.util.Optional;
4+
import org.inferred.freebuilder.FreeBuilder;
5+
import com.devshawn.kafka.gitops.enums.PlanAction;
6+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
7+
8+
@FreeBuilder
9+
@JsonDeserialize(builder = TopicDetailsPlan.Builder.class)
10+
public interface TopicDetailsPlan {
11+
Optional<Integer> getPartitions();
12+
Optional<Integer> getPreviousPartitions();
13+
PlanAction getPartitionsAction();
14+
15+
Optional<Integer> getReplication();
16+
Optional<Integer> getPreviousReplication();
17+
PlanAction getReplicationAction();
18+
19+
public static TopicDetailsPlan toChangesOnlyPlan(TopicDetailsPlan topicDetailsPlan) {
20+
TopicDetailsPlan.Builder builder = new TopicDetailsPlan.Builder();
21+
builder.setReplicationAction(topicDetailsPlan.getReplicationAction());
22+
builder.setPartitionsAction(topicDetailsPlan.getPartitionsAction());
23+
if ( topicDetailsPlan.getReplicationAction() != null && ! topicDetailsPlan.getReplicationAction().equals(PlanAction.NO_CHANGE)) {
24+
builder.setReplication(topicDetailsPlan.getReplication());
25+
builder.setPreviousReplication(topicDetailsPlan.getPreviousReplication());
26+
}
27+
if (topicDetailsPlan.getPartitionsAction() != null && ! topicDetailsPlan.getPartitionsAction().equals(PlanAction.NO_CHANGE)) {
28+
builder.setPartitions(topicDetailsPlan.getPartitions());
29+
builder.setPreviousPartitions(topicDetailsPlan.getPreviousPartitions());
30+
}
31+
return builder.build();
32+
}
33+
class Builder extends TopicDetailsPlan_Builder {
34+
}
35+
}

src/main/java/com/devshawn/kafka/gitops/domain/plan/TopicPlan.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.devshawn.kafka.gitops.domain.plan;
22

3-
import com.devshawn.kafka.gitops.domain.state.TopicDetails;
43
import com.devshawn.kafka.gitops.enums.PlanAction;
54
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
65
import org.inferred.freebuilder.FreeBuilder;
@@ -16,12 +15,16 @@ public interface TopicPlan {
1615

1716
PlanAction getAction();
1817

19-
Optional<TopicDetails> getTopicDetails();
18+
Optional<TopicDetailsPlan> getTopicDetailsPlan();
2019

2120
List<TopicConfigPlan> getTopicConfigPlans();
2221

2322
default TopicPlan toChangesOnlyPlan() {
24-
TopicPlan.Builder builder = new TopicPlan.Builder().setName(getName()).setAction(getAction()).setTopicDetails(getTopicDetails());
23+
TopicPlan.Builder builder = new TopicPlan.Builder().setName(getName()).setAction(getAction());
24+
25+
if(getTopicDetailsPlan().isPresent()) {
26+
builder.setTopicDetailsPlan(TopicDetailsPlan.toChangesOnlyPlan(getTopicDetailsPlan().get()));
27+
}
2528
getTopicConfigPlans().stream().filter(it -> !it.getAction().equals(PlanAction.NO_CHANGE)).forEach(builder::addTopicConfigPlans);
2629
return builder.build();
2730
}

src/main/java/com/devshawn/kafka/gitops/domain/state/AclDetails.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.devshawn.kafka.gitops.domain.state;
22

3-
import com.devshawn.kafka.gitops.exception.ValidationException;
43
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
54
import org.apache.kafka.common.acl.AccessControlEntry;
65
import org.apache.kafka.common.acl.AclBinding;
@@ -11,10 +10,6 @@
1110
import org.apache.kafka.common.resource.ResourceType;
1211
import org.inferred.freebuilder.FreeBuilder;
1312

14-
import java.util.Arrays;
15-
import java.util.List;
16-
import java.util.stream.Collectors;
17-
1813
@FreeBuilder
1914
@JsonDeserialize(builder = AclDetails.Builder.class)
2015
public abstract class AclDetails {

src/main/java/com/devshawn/kafka/gitops/manager/ApplyManager.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33
import com.devshawn.kafka.gitops.config.ManagerConfig;
44
import com.devshawn.kafka.gitops.domain.plan.DesiredPlan;
55
import com.devshawn.kafka.gitops.domain.plan.TopicConfigPlan;
6+
import com.devshawn.kafka.gitops.domain.plan.TopicDetailsPlan;
67
import com.devshawn.kafka.gitops.domain.plan.TopicPlan;
78
import com.devshawn.kafka.gitops.enums.PlanAction;
89
import com.devshawn.kafka.gitops.service.KafkaService;
910
import com.devshawn.kafka.gitops.util.LogUtil;
1011
import org.apache.kafka.clients.admin.AlterConfigOp;
1112
import org.apache.kafka.clients.admin.ConfigEntry;
13+
import org.apache.kafka.common.Node;
1214
import org.apache.kafka.common.config.ConfigResource;
1315

1416
import java.util.*;
@@ -24,13 +26,25 @@ public ApplyManager(ManagerConfig managerConfig, KafkaService kafkaService) {
2426
}
2527

2628
public void applyTopics(DesiredPlan desiredPlan) {
29+
Collection<Node> clusterNodes = kafkaService.describeClusterNodes();
2730
desiredPlan.getTopicPlans().forEach(topicPlan -> {
2831
if (topicPlan.getAction() == PlanAction.ADD) {
2932
LogUtil.printTopicPreApply(topicPlan);
30-
kafkaService.createTopic(topicPlan.getName(), topicPlan.getTopicDetails().get());
33+
kafkaService.createTopic(topicPlan.getName(), topicPlan.getTopicDetailsPlan().get(), topicPlan.getTopicConfigPlans());
3134
LogUtil.printPostApply();
3235
} else if (topicPlan.getAction() == PlanAction.UPDATE) {
3336
LogUtil.printTopicPreApply(topicPlan);
37+
38+
if(topicPlan.getTopicDetailsPlan().isPresent()) {
39+
// Update Replication factor and partition number
40+
TopicDetailsPlan topicDetailsPlan = topicPlan.getTopicDetailsPlan().get();
41+
if(topicDetailsPlan.getPartitionsAction() == PlanAction.UPDATE) {
42+
kafkaService.addTopicPartition(topicPlan.getName(), topicDetailsPlan.getPartitions().get());
43+
}
44+
if(topicDetailsPlan.getReplicationAction() == PlanAction.UPDATE) {
45+
kafkaService.updateTopicReplication(clusterNodes, topicPlan.getName(), topicDetailsPlan.getReplication().get());
46+
}
47+
}
3448
topicPlan.getTopicConfigPlans().forEach(topicConfigPlan -> applyTopicConfiguration(topicPlan, topicConfigPlan));
3549
LogUtil.printPostApply();
3650
} else if (topicPlan.getAction() == PlanAction.REMOVE && !managerConfig.isDeleteDisabled()) {

src/main/java/com/devshawn/kafka/gitops/manager/PlanManager.java

Lines changed: 56 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,22 @@
55
import com.devshawn.kafka.gitops.domain.plan.DesiredPlan;
66
import com.devshawn.kafka.gitops.domain.plan.PlanOverview;
77
import com.devshawn.kafka.gitops.domain.plan.TopicConfigPlan;
8+
import com.devshawn.kafka.gitops.domain.plan.TopicDetailsPlan;
89
import com.devshawn.kafka.gitops.domain.plan.TopicPlan;
910
import com.devshawn.kafka.gitops.domain.state.AclDetails;
1011
import com.devshawn.kafka.gitops.domain.state.DesiredState;
1112
import com.devshawn.kafka.gitops.domain.state.TopicDetails;
1213
import com.devshawn.kafka.gitops.enums.PlanAction;
1314
import com.devshawn.kafka.gitops.exception.PlanIsUpToDateException;
1415
import com.devshawn.kafka.gitops.exception.ReadPlanInputException;
16+
import com.devshawn.kafka.gitops.exception.ValidationException;
1517
import com.devshawn.kafka.gitops.exception.WritePlanOutputException;
1618
import com.devshawn.kafka.gitops.service.KafkaService;
1719
import com.devshawn.kafka.gitops.util.PlanUtil;
1820
import com.fasterxml.jackson.databind.ObjectMapper;
1921
import org.apache.kafka.clients.admin.Config;
2022
import org.apache.kafka.clients.admin.ConfigEntry;
21-
import org.apache.kafka.clients.admin.TopicListing;
23+
import org.apache.kafka.clients.admin.TopicDescription;
2224
import org.apache.kafka.common.acl.AclBinding;
2325
import org.apache.kafka.common.config.ConfigResource;
2426
import org.slf4j.LoggerFactory;
@@ -47,37 +49,71 @@ public PlanManager(ManagerConfig managerConfig, KafkaService kafkaService, Objec
4749
}
4850

4951
public void planTopics(DesiredState desiredState, DesiredPlan.Builder desiredPlan) {
50-
List<TopicListing> topics = kafkaService.getTopics();
51-
List<String> topicNames = topics.stream().map(TopicListing::name).collect(Collectors.toList());
52+
Map<String, TopicDescription> topics = kafkaService.getTopics();
53+
List<String> topicNames = topics.entrySet().stream().map(Map.Entry::getKey).collect(Collectors.toList());
5254
Map<String, List<ConfigEntry>> topicConfigs = fetchTopicConfigurations(topicNames);
5355

5456
desiredState.getTopics().forEach((key, value) -> {
57+
TopicDetailsPlan.Builder topicDetailsPlan = new TopicDetailsPlan.Builder();
58+
topicDetailsPlan.setPartitionsAction(PlanAction.NO_CHANGE)
59+
.setReplicationAction(PlanAction.NO_CHANGE);
60+
5561
TopicPlan.Builder topicPlan = new TopicPlan.Builder()
56-
.setName(key)
57-
.setTopicDetails(value);
62+
.setName(key);
5863

5964
if (!topicNames.contains(key)) {
6065
log.info("[PLAN] Topic {} does not exist; it will be created.", key);
6166
topicPlan.setAction(PlanAction.ADD);
67+
topicDetailsPlan.setPartitionsAction(PlanAction.ADD)
68+
.setPartitions(value.getPartitions())
69+
.setReplicationAction(PlanAction.ADD)
70+
.setReplication(value.getReplication().get());
71+
planTopicConfigurations(key, value, topicConfigs.get(key), topicPlan);
6272
} else {
6373
log.info("[PLAN] Topic {} exists, it will not be created.", key);
6474
topicPlan.setAction(PlanAction.NO_CHANGE);
75+
76+
TopicDescription topicDescription = topics.get(key);
77+
boolean topicDetailsUpdated = false;
78+
if (value.getPartitions().intValue() != topicDescription.partitions().size()) {
79+
if( value.getPartitions().intValue() < topicDescription.partitions().size()) {
80+
throw new ValidationException("Removing the partition number is not supported by Apache Kafka "
81+
+ "(topic: " + key + " ("+topicDescription.partitions().size()+" -> "+value.getPartitions().intValue()+"))");
82+
}
83+
topicDetailsPlan.setPartitions(value.getPartitions())
84+
.setPreviousPartitions(topicDescription.partitions().size());
85+
topicDetailsPlan.setPartitionsAction(PlanAction.UPDATE);
86+
topicDetailsUpdated = true;
87+
}
88+
if (value.getReplication().isPresent() &&
89+
( value.getReplication().get().intValue() != topicDescription.partitions().get(0).replicas().size()) ) {
90+
topicDetailsPlan.setReplication(value.getReplication().get())
91+
.setPreviousReplication(topicDescription.partitions().get(0).replicas().size());
92+
topicDetailsPlan.setReplicationAction(PlanAction.UPDATE);
93+
topicDetailsUpdated = true;
94+
}
95+
if (topicDetailsUpdated) {
96+
topicPlan.setAction(PlanAction.UPDATE);
97+
}
98+
6599
planTopicConfigurations(key, value, topicConfigs.get(key), topicPlan);
66100
}
67101

102+
topicPlan.setTopicDetailsPlan(topicDetailsPlan.build());
103+
68104
desiredPlan.addTopicPlans(topicPlan.build());
69105
});
70106

71-
topics.forEach(currentTopic -> {
72-
boolean shouldIgnore = desiredState.getPrefixedTopicsToIgnore().stream().anyMatch(it -> currentTopic.name().startsWith(it));
107+
topics.forEach((currentTopicName, currentTopicDescription) -> {
108+
boolean shouldIgnore = desiredState.getPrefixedTopicsToIgnore().stream().anyMatch(it -> currentTopicName.startsWith(it));
73109
if (shouldIgnore) {
74-
log.info("[PLAN] Ignoring topic {} due to prefix", currentTopic.name());
110+
log.info("[PLAN] Ignoring topic {} due to prefix", currentTopicName);
75111
return;
76112
}
77113

78-
if (!managerConfig.isDeleteDisabled() && desiredState.getTopics().getOrDefault(currentTopic.name(), null) == null) {
114+
if (!managerConfig.isDeleteDisabled() && desiredState.getTopics().getOrDefault(currentTopicName, null) == null) {
79115
TopicPlan topicPlan = new TopicPlan.Builder()
80-
.setName(currentTopic.name())
116+
.setName(currentTopicName)
81117
.setAction(PlanAction.REMOVE)
82118
.build();
83119

@@ -88,7 +124,7 @@ public void planTopics(DesiredState desiredState, DesiredPlan.Builder desiredPla
88124

89125
private void planTopicConfigurations(String topicName, TopicDetails topicDetails, List<ConfigEntry> configs, TopicPlan.Builder topicPlan) {
90126
Map<String, TopicConfigPlan> configPlans = new HashMap<>();
91-
List<ConfigEntry> customConfigs = configs.stream()
127+
List<ConfigEntry> customConfigs = configs == null ? new ArrayList<>() : configs.stream()
92128
.filter(it -> it.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG)
93129
.collect(Collectors.toList());
94130

@@ -105,7 +141,9 @@ private void planTopicConfigurations(String topicName, TopicDetails topicDetails
105141
} else if (newConfig == null) {
106142
topicConfigPlan.setAction(PlanAction.REMOVE);
107143
configPlans.put(currentConfig.name(), topicConfigPlan.build());
108-
topicPlan.setAction(PlanAction.UPDATE);
144+
if (topicPlan.getAction() == null || topicPlan.getAction().equals(PlanAction.NO_CHANGE)) {
145+
topicPlan.setAction(PlanAction.UPDATE);
146+
}
109147
}
110148
});
111149

@@ -120,12 +158,16 @@ private void planTopicConfigurations(String topicName, TopicDetails topicDetails
120158
if (currentConfig == null) {
121159
topicConfigPlan.setAction(PlanAction.ADD);
122160
configPlans.put(key, topicConfigPlan.build());
123-
topicPlan.setAction(PlanAction.UPDATE);
161+
if (topicPlan.getAction() == null || topicPlan.getAction().equals(PlanAction.NO_CHANGE)) {
162+
topicPlan.setAction(PlanAction.UPDATE);
163+
}
124164
} else if (!currentConfig.value().equals(value)) {
125165
topicConfigPlan.setPreviousValue(currentConfig.value())
126166
.setAction(PlanAction.UPDATE);
127167
configPlans.put(key, topicConfigPlan.build());
128-
topicPlan.setAction(PlanAction.UPDATE);
168+
if (topicPlan.getAction() == null || topicPlan.getAction().equals(PlanAction.NO_CHANGE)) {
169+
topicPlan.setAction(PlanAction.UPDATE);
170+
}
129171
}
130172
});
131173

src/main/java/com/devshawn/kafka/gitops/service/ConfluentCloudService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ public ServiceAccount createServiceAccount(String name, boolean isUser) {
4343
}
4444

4545
public static String execCmd(String[] cmd) throws java.io.IOException {
46-
java.util.Scanner s = new java.util.Scanner(Runtime.getRuntime().exec(cmd).getInputStream()).useDelimiter("\\A");
47-
return s.hasNext() ? s.next() : "";
46+
try (java.util.Scanner s = new java.util.Scanner(Runtime.getRuntime().exec(cmd).getInputStream()).useDelimiter("\\A");) {
47+
return s.hasNext() ? s.next() : "";
48+
}
4849
}
4950
}

0 commit comments

Comments
 (0)