Skip to content

Commit a74ca40

Browse files
jrevillardJerome Revillard
authored andcommitted
Cleanup + respect the includeUnchanged flag
1 parent a979cc5 commit a74ca40

19 files changed

+126
-173
lines changed

src/main/java/com/devshawn/kafka/gitops/domain/plan/TopicDetailsPlan.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,28 @@ public interface TopicDetailsPlan {
1616
Optional<Integer> getPreviousReplication();
1717
PlanAction getReplicationAction();
1818

19-
public static TopicDetailsPlan toChangesOnlyPlan(TopicDetailsPlan topicDetailsPlan) {
19+
public static Optional<TopicDetailsPlan> toChangesOnlyPlan(Optional<TopicDetailsPlan> topicDetailsPlan) {
20+
if(! topicDetailsPlan.isPresent()) {
21+
return topicDetailsPlan;
22+
}
2023
TopicDetailsPlan.Builder builder = new TopicDetailsPlan.Builder();
21-
builder.setReplicationAction(topicDetailsPlan.getReplicationAction());
22-
builder.setPartitionsAction(topicDetailsPlan.getPartitionsAction());
23-
if ( topicDetailsPlan.getReplicationAction() != null && ! topicDetailsPlan.getReplicationAction().equals(PlanAction.NO_CHANGE)) {
24-
builder.setReplication(topicDetailsPlan.getReplication());
25-
builder.setPreviousReplication(topicDetailsPlan.getPreviousReplication());
24+
builder.setReplicationAction(topicDetailsPlan.get().getReplicationAction());
25+
builder.setPartitionsAction(topicDetailsPlan.get().getPartitionsAction());
26+
boolean nochanges = true;
27+
if ( topicDetailsPlan.get().getReplicationAction() != null && ! topicDetailsPlan.get().getReplicationAction().equals(PlanAction.NO_CHANGE)) {
28+
builder.setReplication(topicDetailsPlan.get().getReplication());
29+
builder.setPreviousReplication(topicDetailsPlan.get().getPreviousReplication());
30+
nochanges = false;
31+
}
32+
if (topicDetailsPlan.get().getPartitionsAction() != null && ! topicDetailsPlan.get().getPartitionsAction().equals(PlanAction.NO_CHANGE)) {
33+
builder.setPartitions(topicDetailsPlan.get().getPartitions());
34+
builder.setPreviousPartitions(topicDetailsPlan.get().getPreviousPartitions());
35+
nochanges = false;
2636
}
27-
if (topicDetailsPlan.getPartitionsAction() != null && ! topicDetailsPlan.getPartitionsAction().equals(PlanAction.NO_CHANGE)) {
28-
builder.setPartitions(topicDetailsPlan.getPartitions());
29-
builder.setPreviousPartitions(topicDetailsPlan.getPreviousPartitions());
37+
if(nochanges) {
38+
return Optional.<TopicDetailsPlan>empty();
3039
}
31-
return builder.build();
40+
return Optional.of(builder.build());
3241
}
3342
class Builder extends TopicDetailsPlan_Builder {
3443
}

src/main/java/com/devshawn/kafka/gitops/domain/plan/TopicPlan.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,8 @@ public interface TopicPlan {
2222
default TopicPlan toChangesOnlyPlan() {
2323
TopicPlan.Builder builder = new TopicPlan.Builder().setName(getName()).setAction(getAction());
2424

25-
if(getTopicDetailsPlan().isPresent()) {
26-
builder.setTopicDetailsPlan(TopicDetailsPlan.toChangesOnlyPlan(getTopicDetailsPlan().get()));
27-
}
25+
builder.setTopicDetailsPlan(TopicDetailsPlan.toChangesOnlyPlan(getTopicDetailsPlan()));
26+
2827
getTopicConfigPlans().stream().filter(it -> !it.getAction().equals(PlanAction.NO_CHANGE)).forEach(builder::addTopicConfigPlans);
2928
return builder.build();
3029
}

src/main/java/com/devshawn/kafka/gitops/manager/PlanManager.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public void planTopics(DesiredState desiredState, DesiredPlan.Builder desiredPla
6060

6161
TopicPlan.Builder topicPlan = new TopicPlan.Builder()
6262
.setName(key);
63-
63+
boolean topicDetailsAddOrUpdate = false;
6464
if (!topicNames.contains(key)) {
6565
log.info("[PLAN] Topic {} does not exist; it will be created.", key);
6666
topicPlan.setAction(PlanAction.ADD);
@@ -69,12 +69,15 @@ public void planTopics(DesiredState desiredState, DesiredPlan.Builder desiredPla
6969
.setReplicationAction(PlanAction.ADD)
7070
.setReplication(value.getReplication().get());
7171
planTopicConfigurations(key, value, topicConfigs.get(key), topicPlan);
72+
topicDetailsAddOrUpdate = true;
7273
} else {
7374
log.info("[PLAN] Topic {} exists, it will not be created.", key);
74-
topicPlan.setAction(PlanAction.NO_CHANGE);
75-
7675
TopicDescription topicDescription = topics.get(key);
77-
boolean topicDetailsUpdated = false;
76+
77+
topicPlan.setAction(PlanAction.NO_CHANGE);
78+
topicDetailsPlan.setPartitions(topicDescription.partitions().size())
79+
.setReplication(topicDescription.partitions().get(0).replicas().size());
80+
7881
if (value.getPartitions().intValue() != topicDescription.partitions().size()) {
7982
if( value.getPartitions().intValue() < topicDescription.partitions().size()) {
8083
throw new ValidationException("Removing the partition number is not supported by Apache Kafka "
@@ -83,16 +86,16 @@ public void planTopics(DesiredState desiredState, DesiredPlan.Builder desiredPla
8386
topicDetailsPlan.setPartitions(value.getPartitions())
8487
.setPreviousPartitions(topicDescription.partitions().size());
8588
topicDetailsPlan.setPartitionsAction(PlanAction.UPDATE);
86-
topicDetailsUpdated = true;
89+
topicDetailsAddOrUpdate = true;
8790
}
8891
if (value.getReplication().isPresent() &&
8992
( value.getReplication().get().intValue() != topicDescription.partitions().get(0).replicas().size()) ) {
9093
topicDetailsPlan.setReplication(value.getReplication().get())
9194
.setPreviousReplication(topicDescription.partitions().get(0).replicas().size());
9295
topicDetailsPlan.setReplicationAction(PlanAction.UPDATE);
93-
topicDetailsUpdated = true;
96+
topicDetailsAddOrUpdate = true;
9497
}
95-
if (topicDetailsUpdated) {
98+
if (topicDetailsAddOrUpdate) {
9699
topicPlan.setAction(PlanAction.UPDATE);
97100
}
98101

@@ -140,6 +143,7 @@ private void planTopicConfigurations(String topicName, TopicDetails topicDetails
140143
configPlans.put(currentConfig.name(), topicConfigPlan.build());
141144
} else if (newConfig == null) {
142145
topicConfigPlan.setAction(PlanAction.REMOVE);
146+
topicConfigPlan.setPreviousValue(currentConfig.value());
143147
configPlans.put(currentConfig.name(), topicConfigPlan.build());
144148
if (topicPlan.getAction() == null || topicPlan.getAction().equals(PlanAction.NO_CHANGE)) {
145149
topicPlan.setAction(PlanAction.UPDATE);
@@ -153,7 +157,6 @@ private void planTopicConfigurations(String topicName, TopicDetails topicDetails
153157
TopicConfigPlan.Builder topicConfigPlan = new TopicConfigPlan.Builder()
154158
.setKey(key)
155159
.setValue(value);
156-
157160

158161
if (currentConfig == null) {
159162
topicConfigPlan.setAction(PlanAction.ADD);

src/main/java/com/devshawn/kafka/gitops/service/KafkaService.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.apache.kafka.clients.admin.NewTopic;
1515
import org.apache.kafka.clients.admin.TopicDescription;
1616
import org.apache.kafka.common.KafkaException;
17-
import org.apache.kafka.common.KafkaFuture;
1817
import org.apache.kafka.common.Node;
1918
import org.apache.kafka.common.TopicPartition;
2019
import org.apache.kafka.common.TopicPartitionInfo;

src/main/java/com/devshawn/kafka/gitops/util/LogUtil.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@ private static void printTopicPlan(TopicPlan topicPlan) {
4646
break;
4747
case UPDATE:
4848
System.out.println(yellow(String.format("~ [TOPIC] %s", topicPlan.getName())));
49-
LogUtil.printTopicDetailsPlan(topicPlan.getTopicDetailsPlan().get());
49+
if(topicPlan.getTopicDetailsPlan().isPresent()) {
50+
LogUtil.printTopicDetailsPlan(topicPlan.getTopicDetailsPlan().get());
51+
}
5052
if(!topicPlan.getTopicConfigPlans().isEmpty()) {
5153
System.out.println(yellow("\t~ configs:"));
5254
topicPlan.getTopicConfigPlans().forEach(LogUtil::printTopicConfigPlan);
@@ -78,7 +80,7 @@ private static void printTopicDetailsPlan(TopicDetailsPlan topicDetailsPlan) {
7880
System.out.println(yellow(String.format("\t~ partitions: %s (%s)", topicDetailsPlan.getPartitions().get(), topicDetailsPlan.getPreviousPartitions().get())));
7981
break;
8082
case REMOVE:
81-
System.out.println(red(String.format("\t- partitions")));
83+
System.out.println(red(String.format("\t- partitions (%s)", topicDetailsPlan.getPreviousPartitions().get())));
8284
break;
8385
case NO_CHANGE:
8486
break;
@@ -91,7 +93,7 @@ private static void printTopicDetailsPlan(TopicDetailsPlan topicDetailsPlan) {
9193
System.out.println(yellow(String.format("\t~ replication: %s (%s)", topicDetailsPlan.getReplication().get(), topicDetailsPlan.getPreviousReplication().get())));
9294
break;
9395
case REMOVE:
94-
System.out.println(red(String.format("\t- replication")));
96+
System.out.println(red(String.format("\t- replication (%s)", topicDetailsPlan.getPreviousReplication().get())));
9597
break;
9698
case NO_CHANGE:
9799
break;
@@ -107,7 +109,7 @@ private static void printTopicConfigPlan(TopicConfigPlan topicConfigPlan) {
107109
System.out.println(yellow(String.format("\t\t~ %s: %s ( %s )", topicConfigPlan.getKey(), topicConfigPlan.getValue().get(), topicConfigPlan.getPreviousValue().get())));
108110
break;
109111
case REMOVE:
110-
System.out.println(red(String.format("\t\t- %s", topicConfigPlan.getKey())));
112+
System.out.println(red(String.format("\t\t- %s (%s)", topicConfigPlan.getKey(), topicConfigPlan.getPreviousValue().get())));
111113
break;
112114
case NO_CHANGE:
113115
break;

src/test/resources/plans/no-changes-include-unchanged-plan.json

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44
"name": "delete-topic",
55
"action": "NO_CHANGE",
66
"topicDetailsPlan": {
7-
"partitions": null,
7+
"partitions": 1,
88
"previousPartitions": null,
99
"partitionsAction": "NO_CHANGE",
10-
"replication": null,
10+
"replication": 2,
1111
"previousReplication": null,
1212
"replicationAction": "NO_CHANGE"
1313
},
@@ -17,10 +17,10 @@
1717
"name": "test-topic",
1818
"action": "NO_CHANGE",
1919
"topicDetailsPlan": {
20-
"partitions": null,
20+
"partitions": 1,
2121
"previousPartitions": null,
2222
"partitionsAction": "NO_CHANGE",
23-
"replication": null,
23+
"replication": 2,
2424
"previousReplication": null,
2525
"replicationAction": "NO_CHANGE"
2626
},
@@ -30,10 +30,10 @@
3030
"name": "topic-with-configs-1",
3131
"action": "NO_CHANGE",
3232
"topicDetailsPlan": {
33-
"partitions": null,
33+
"partitions": 3,
3434
"previousPartitions": null,
3535
"partitionsAction": "NO_CHANGE",
36-
"replication": null,
36+
"replication": 2,
3737
"previousReplication": null,
3838
"replicationAction": "NO_CHANGE"
3939
},
@@ -56,10 +56,10 @@
5656
"name": "topic-with-configs-2",
5757
"action": "NO_CHANGE",
5858
"topicDetailsPlan": {
59-
"partitions": null,
59+
"partitions": 6,
6060
"previousPartitions": null,
6161
"partitionsAction": "NO_CHANGE",
62-
"replication": null,
62+
"replication": 2,
6363
"previousReplication": null,
6464
"replicationAction": "NO_CHANGE"
6565
},

src/test/resources/plans/seed-acl-exists-apply-output.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ Applying: [UPDATE]
1313

1414
~ [TOPIC] topic-with-configs-1
1515
~ configs:
16-
- cleanup.policy
17-
- segment.bytes
16+
- cleanup.policy (compact)
17+
- segment.bytes (100000)
1818
+ retention.ms: 100000
1919

2020

src/test/resources/plans/seed-acl-exists-plan.json

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,7 @@
33
{
44
"name": "test-topic",
55
"action": "UPDATE",
6-
"topicDetailsPlan": {
7-
"partitions": null,
8-
"previousPartitions": null,
9-
"partitionsAction": "NO_CHANGE",
10-
"replication": null,
11-
"previousReplication": null,
12-
"replicationAction": "NO_CHANGE"
13-
},
6+
"topicDetailsPlan": null,
147
"topicConfigPlans": [
158
{
169
"key": "retention.ms",
@@ -23,25 +16,18 @@
2316
{
2417
"name": "topic-with-configs-1",
2518
"action": "UPDATE",
26-
"topicDetailsPlan": {
27-
"partitions": null,
28-
"previousPartitions": null,
29-
"partitionsAction": "NO_CHANGE",
30-
"replication": null,
31-
"previousReplication": null,
32-
"replicationAction": "NO_CHANGE"
33-
},
19+
"topicDetailsPlan": null,
3420
"topicConfigPlans": [
3521
{
3622
"key": "cleanup.policy",
3723
"value": null,
38-
"previousValue": null,
24+
"previousValue": "compact",
3925
"action": "REMOVE"
4026
},
4127
{
4228
"key": "segment.bytes",
4329
"value": null,
44-
"previousValue": null,
30+
"previousValue": "100000",
4531
"action": "REMOVE"
4632
},
4733
{

src/test/resources/plans/seed-basic-include-unchanged-plan.json

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44
"name": "delete-topic",
55
"action": "NO_CHANGE",
66
"topicDetailsPlan": {
7-
"partitions": null,
7+
"partitions": 1,
88
"previousPartitions": null,
99
"partitionsAction": "NO_CHANGE",
10-
"replication": null,
10+
"replication": 2,
1111
"previousReplication": null,
1212
"replicationAction": "NO_CHANGE"
1313
},
@@ -17,10 +17,10 @@
1717
"name": "topic-with-configs-1",
1818
"action": "UPDATE",
1919
"topicDetailsPlan": {
20-
"partitions": null,
20+
"partitions": 3,
2121
"previousPartitions": null,
2222
"partitionsAction": "NO_CHANGE",
23-
"replication": null,
23+
"replication": 2,
2424
"previousReplication": null,
2525
"replicationAction": "NO_CHANGE"
2626
},
@@ -34,7 +34,7 @@
3434
{
3535
"key": "segment.bytes",
3636
"value": null,
37-
"previousValue": null,
37+
"previousValue": "100000",
3838
"action": "REMOVE"
3939
},
4040
{
@@ -49,10 +49,10 @@
4949
"name": "topic-with-configs-2",
5050
"action": "NO_CHANGE",
5151
"topicDetailsPlan": {
52-
"partitions": null,
52+
"partitions": 6,
5353
"previousPartitions": null,
5454
"partitionsAction": "NO_CHANGE",
55-
"replication": null,
55+
"replication": 2,
5656
"previousReplication": null,
5757
"replicationAction": "NO_CHANGE"
5858
},

src/test/resources/plans/seed-basic-plan.json

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,12 @@
33
{
44
"name": "topic-with-configs-1",
55
"action": "UPDATE",
6-
"topicDetailsPlan": {
7-
"partitions": null,
8-
"previousPartitions": null,
9-
"partitionsAction": "NO_CHANGE",
10-
"replication": null,
11-
"previousReplication": null,
12-
"replicationAction": "NO_CHANGE"
13-
},
6+
"topicDetailsPlan": null,
147
"topicConfigPlans": [
158
{
169
"key": "segment.bytes",
1710
"value": null,
18-
"previousValue": null,
11+
"previousValue": "100000",
1912
"action": "REMOVE"
2013
},
2114
{

0 commit comments

Comments
 (0)