Skip to content
This repository was archived by the owner on May 5, 2024. It is now read-only.

Commit cad515e

Browse files
authored
feat: Add support for whitelisting topic prefixes (#74)
Closes devshawn#90
1 parent 7b5b7ab commit cad515e

File tree

11 files changed

+141
-14
lines changed

11 files changed

+141
-14
lines changed

docs/specification.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ The desired state file consists of:
2323
- **topics** [Optional]:
2424
- **defaults** [Optional]: Specify topic defaults so you don't need to specify them for every topic in the state file. Currently, only replication is supported.
2525
- **blacklist** [Optional]: Add a prefixed topic blacklist for ignoring specific topics when using `kafka-gitops`. This allows topics to be ignored from being deleted if they are not defined in the desired state file.
26+
- **whitelist** [Optional]: Add a prefixed topic whitelist for exclusively handling specific topics when using `kafka-gitops`. This allows topics to be exclusively handled and topics not on the list are being ignored, even if they are not defined in the desired state file.
27+
28+
?> `topics.blacklist` and `topics.whitelist` are _not mutually exclusive_ can be used together to whitelist specific topic prefixes and blacklist individual "sub-topics".
29+
30+
?> The blacklist takes precedence over the whitelist, so if a topic name is matched by both, it will be ignored and not deleted if it was not defined in the desired state file.
2631

2732
**Example**:
2833
```yaml
@@ -35,6 +40,10 @@ settings:
3540
blacklist:
3641
prefixed:
3742
- _confluent
43+
- my-topics-excluded
44+
whitelist:
45+
prefixed:
46+
- my-topics
3847
```
3948
4049
## Topics

src/main/java/com/devshawn/kafka/gitops/StateManager.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import com.devshawn.kafka.gitops.domain.state.settings.Settings;
1919
import com.devshawn.kafka.gitops.domain.state.settings.SettingsCCloud;
2020
import com.devshawn.kafka.gitops.domain.state.settings.SettingsTopics;
21-
import com.devshawn.kafka.gitops.domain.state.settings.SettingsTopicsBlacklist;
21+
import com.devshawn.kafka.gitops.domain.state.settings.SettingsTopicsList;
2222
import com.devshawn.kafka.gitops.exception.ConfluentCloudException;
2323
import com.devshawn.kafka.gitops.exception.InvalidAclDefinitionException;
2424
import com.devshawn.kafka.gitops.exception.MissingConfigurationException;
@@ -39,6 +39,7 @@
3939
import org.slf4j.LoggerFactory;
4040

4141
import java.util.ArrayList;
42+
import java.util.Collection;
4243
import java.util.List;
4344
import java.util.Map;
4445
import java.util.Optional;
@@ -153,7 +154,8 @@ private void createServiceAccount(String name, List<ServiceAccount> serviceAccou
153154
private DesiredState getDesiredState() {
154155
DesiredStateFile desiredStateFile = getAndValidateStateFile();
155156
DesiredState.Builder desiredState = new DesiredState.Builder()
156-
.addAllPrefixedTopicsToIgnore(getPrefixedTopicsToIgnore(desiredStateFile));
157+
.addAllPrefixedTopicsToIgnore(getPrefixedTopicsToIgnore(desiredStateFile))
158+
.addAllPrefixedTopicsToAccept(getPrefixedTopicsToAccept(desiredStateFile));
157159

158160
generateTopicsState(desiredState, desiredStateFile);
159161

@@ -297,7 +299,7 @@ private List<String> getPrefixedTopicsToIgnore(DesiredStateFile desiredStateFile
297299
desiredStateFile.getSettings()
298300
.flatMap(Settings::getTopics)
299301
.flatMap(SettingsTopics::getBlacklist)
300-
.map(SettingsTopicsBlacklist::getPrefixed)
302+
.map(SettingsTopicsList::getPrefixed)
301303
.ifPresent(topics::addAll);
302304

303305
desiredStateFile.getServices().forEach((name, service) -> {
@@ -308,6 +310,16 @@ private List<String> getPrefixedTopicsToIgnore(DesiredStateFile desiredStateFile
308310
return topics;
309311
}
310312

313+
private List<String> getPrefixedTopicsToAccept(DesiredStateFile desiredStateFile) {
314+
return desiredStateFile.getSettings()
315+
.flatMap(Settings::getTopics)
316+
.flatMap(SettingsTopics::getWhitelist)
317+
.map(SettingsTopicsList::getPrefixed)
318+
.stream()
319+
.flatMap(Collection::stream)
320+
.toList();
321+
}
322+
311323
private GetAclOptions buildGetAclOptions(String serviceName) {
312324
return new GetAclOptions.Builder().setServiceName(serviceName).setDescribeAclEnabled(describeAclEnabled).build();
313325
}

src/main/java/com/devshawn/kafka/gitops/domain/state/DesiredState.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ public interface DesiredState {
1616

1717
List<String> getPrefixedTopicsToIgnore();
1818

19+
List<String> getPrefixedTopicsToAccept();
20+
1921
class Builder extends DesiredState_Builder {
2022
}
2123
}

src/main/java/com/devshawn/kafka/gitops/domain/state/settings/SettingsTopics.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ public interface SettingsTopics {
1111

1212
Optional<SettingsTopicsDefaults> getDefaults();
1313

14-
Optional<SettingsTopicsBlacklist> getBlacklist();
14+
Optional<SettingsTopicsList> getBlacklist();
15+
16+
Optional<SettingsTopicsList> getWhitelist();
1517

1618
class Builder extends SettingsTopics_Builder {
1719
}
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@
66
import java.util.List;
77

88
@FreeBuilder
9-
@JsonDeserialize(builder = SettingsTopicsBlacklist.Builder.class)
10-
public interface SettingsTopicsBlacklist {
9+
@JsonDeserialize(builder = SettingsTopicsList.Builder.class)
10+
public interface SettingsTopicsList {
1111

1212
List<String> getPrefixed();
1313

14-
class Builder extends SettingsTopicsBlacklist_Builder {
14+
class Builder extends SettingsTopicsList_Builder {
1515
}
1616
}

src/main/java/com/devshawn/kafka/gitops/manager/PlanManager.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,22 +66,28 @@ public void planTopics(DesiredState desiredState, DesiredPlan.Builder desiredPla
6666
desiredPlan.addTopicPlans(topicPlan.build());
6767
});
6868

69-
topics.forEach(currentTopic -> {
70-
boolean shouldIgnore = desiredState.getPrefixedTopicsToIgnore().stream().anyMatch(it -> currentTopic.name().startsWith(it));
71-
if (shouldIgnore) {
72-
LOG.info("[PLAN] Ignoring topic {} due to prefix", currentTopic.name());
73-
return;
69+
for (TopicListing currentTopic : topics) {
70+
boolean acceptTopic = desiredState.getPrefixedTopicsToAccept().stream().anyMatch(it -> currentTopic.name().startsWith(it));
71+
if (!desiredState.getPrefixedTopicsToAccept().isEmpty() && !acceptTopic) {
72+
LOG.info("[PLAN] Ignoring topic {} due to missing prefix (whitelist)", currentTopic.name());
73+
continue;
7474
}
7575

76-
if (!managerConfig.isDeleteDisabled() && desiredState.getTopics().getOrDefault(currentTopic.name(), null) == null) {
76+
boolean ignoreTopic = desiredState.getPrefixedTopicsToIgnore().stream().anyMatch(it -> currentTopic.name().startsWith(it));
77+
if (ignoreTopic) {
78+
LOG.info("[PLAN] Ignoring topic {} due to prefix (blacklist)", currentTopic.name());
79+
continue;
80+
}
81+
82+
if (!managerConfig.isDeleteDisabled() && !desiredState.getTopics().containsKey(currentTopic.name())) {
7783
TopicPlan topicPlan = new TopicPlan.Builder()
7884
.setName(currentTopic.name())
7985
.setAction(PlanAction.REMOVE)
8086
.build();
8187

8288
desiredPlan.addTopicPlans(topicPlan);
8389
}
84-
});
90+
}
8591
}
8692

8793
private void planTopicConfigurations(String topicName, TopicDetails topicDetails, List<ConfigEntry> configs, TopicPlan.Builder topicPlan) {

src/test/groovy/com/devshawn/kafka/gitops/PlanCommandIntegrationSpec.groovy

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,8 @@ class PlanCommandIntegrationSpec extends Specification {
158158
"seed-topic-modification-no-delete" | true
159159
"seed-acl-exists" | true
160160
"seed-blacklist-topics" | false
161+
"seed-blacklist-whitelist-topics" | false
162+
"seed-whitelist-topics" | false
161163
}
162164

163165
void 'test include unchanged flag - #planName #includeUnchanged'() {
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
{
2+
"topicPlans": [
3+
{
4+
"name": "new-topic",
5+
"action": "ADD",
6+
"topicDetails": {
7+
"partitions": 6,
8+
"replication": 1,
9+
"configs": {}
10+
},
11+
"topicConfigPlans": []
12+
},
13+
{
14+
"name": "topic-with-configs-1",
15+
"action": "REMOVE",
16+
"topicDetails": null,
17+
"topicConfigPlans": []
18+
}
19+
],
20+
"aclPlans": [
21+
{
22+
"name": "Unnamed ACL",
23+
"aclDetails": {
24+
"name": "test-topic",
25+
"type": "TOPIC",
26+
"pattern": "LITERAL",
27+
"principal": "User:test",
28+
"host": "*",
29+
"operation": "READ",
30+
"permission": "ALLOW"
31+
},
32+
"action": "REMOVE"
33+
}
34+
]
35+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
settings:
2+
topics:
3+
blacklist:
4+
prefixed:
5+
- test
6+
- topic-with-configs-2
7+
whitelist:
8+
prefixed:
9+
- topic-with
10+
11+
topics:
12+
new-topic:
13+
partitions: 6
14+
replication: 1
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
{
2+
"topicPlans": [
3+
{
4+
"name": "test-new-topic",
5+
"action": "ADD",
6+
"topicDetails": {
7+
"partitions": 6,
8+
"replication": 1,
9+
"configs": {}
10+
},
11+
"topicConfigPlans": []
12+
},
13+
{
14+
"name": "test-topic",
15+
"action": "REMOVE",
16+
"topicDetails": null,
17+
"topicConfigPlans": []
18+
}
19+
],
20+
"aclPlans": [
21+
{
22+
"name": "Unnamed ACL",
23+
"aclDetails": {
24+
"name": "test-topic",
25+
"type": "TOPIC",
26+
"pattern": "LITERAL",
27+
"principal": "User:test",
28+
"host": "*",
29+
"operation": "READ",
30+
"permission": "ALLOW"
31+
},
32+
"action": "REMOVE"
33+
}
34+
]
35+
}

0 commit comments

Comments
 (0)