Skip to content

Commit 705ef75

Browse files
authored
BE: Fixes Issue #445 Added connect stats (#1224)
1 parent 236a8f0 commit 705ef75

File tree

14 files changed

+190
-37
lines changed

14 files changed

+190
-37
lines changed

api/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ dependencies {
2525
implementation libs.spring.starter.oauth2.client
2626
implementation libs.spring.security.oauth2.resource.server
2727
implementation libs.spring.boot.actuator
28+
2829
compileOnly libs.spring.boot.devtools
2930

3031
implementation libs.spring.security.ldap
@@ -48,6 +49,7 @@ dependencies {
4849

4950
implementation libs.jackson.databind.nullable
5051
implementation libs.cel
52+
implementation libs.caffeine
5153
antlr libs.antlr
5254
implementation libs.antlr.runtime
5355

api/src/main/java/io/kafbat/ui/config/ClustersProperties.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import jakarta.validation.Valid;
66
import jakarta.validation.constraints.NotBlank;
77
import jakarta.validation.constraints.NotNull;
8+
import java.time.Duration;
89
import java.util.ArrayList;
910
import java.util.HashMap;
1011
import java.util.HashSet;
@@ -36,6 +37,8 @@ public class ClustersProperties {
3637

3738
PollingProperties polling = new PollingProperties();
3839

40+
CacheProperties cache = new CacheProperties();
41+
3942
@Data
4043
public static class Cluster {
4144
@NotBlank(message = "field name for for cluster could not be blank")
@@ -183,6 +186,14 @@ public enum LogLevel {
183186
}
184187
}
185188

189+
@Data
190+
@NoArgsConstructor
191+
@AllArgsConstructor
192+
public static class CacheProperties {
193+
boolean enabled = true;
194+
Duration connectCacheExpiry = Duration.ofMinutes(1);
195+
}
196+
186197
@PostConstruct
187198
public void validateAndSetDefaults() {
188199
if (clusters != null) {

api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,12 @@ public class KafkaConnectController extends AbstractController implements KafkaC
4545

4646
@Override
4747
public Mono<ResponseEntity<Flux<ConnectDTO>>> getConnects(String clusterName,
48+
Boolean withStats,
4849
ServerWebExchange exchange) {
4950

50-
Flux<ConnectDTO> availableConnects = kafkaConnectService.getConnects(getCluster(clusterName))
51-
.filterWhen(dto -> accessControlService.isConnectAccessible(dto, clusterName));
51+
Flux<ConnectDTO> availableConnects = kafkaConnectService.getConnects(
52+
getCluster(clusterName), withStats != null ? withStats : false
53+
).filterWhen(dto -> accessControlService.isConnectAccessible(dto, clusterName));
5254

5355
return Mono.just(ResponseEntity.ok(availableConnects));
5456
}

api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package io.kafbat.ui.mapper;
22

3-
import io.kafbat.ui.config.ClustersProperties;
43
import io.kafbat.ui.model.BrokerConfigDTO;
54
import io.kafbat.ui.model.BrokerDTO;
65
import io.kafbat.ui.model.BrokerDiskUsageDTO;
@@ -11,7 +10,6 @@
1110
import io.kafbat.ui.model.ClusterStatsDTO;
1211
import io.kafbat.ui.model.ConfigSourceDTO;
1312
import io.kafbat.ui.model.ConfigSynonymDTO;
14-
import io.kafbat.ui.model.ConnectDTO;
1513
import io.kafbat.ui.model.InternalBroker;
1614
import io.kafbat.ui.model.InternalBrokerConfig;
1715
import io.kafbat.ui.model.InternalBrokerDiskUsage;
@@ -107,8 +105,6 @@ default ConfigSynonymDTO toConfigSynonym(ConfigEntry.ConfigSynonym config) {
107105

108106
ReplicaDTO toReplica(InternalReplica replica);
109107

110-
ConnectDTO toKafkaConnect(ClustersProperties.ConnectCluster connect);
111-
112108
List<ClusterDTO.FeaturesEnum> toFeaturesEnum(List<ClusterFeature> features);
113109

114110
default List<PartitionDTO> map(Map<Integer, InternalPartition> map) {

api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,22 @@
11
package io.kafbat.ui.mapper;
22

3+
import io.kafbat.ui.config.ClustersProperties;
34
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
45
import io.kafbat.ui.connect.model.ConnectorTask;
56
import io.kafbat.ui.connect.model.NewConnector;
7+
import io.kafbat.ui.model.ConnectDTO;
68
import io.kafbat.ui.model.ConnectorDTO;
79
import io.kafbat.ui.model.ConnectorPluginConfigValidationResponseDTO;
810
import io.kafbat.ui.model.ConnectorPluginDTO;
11+
import io.kafbat.ui.model.ConnectorStateDTO;
912
import io.kafbat.ui.model.ConnectorStatusDTO;
1013
import io.kafbat.ui.model.ConnectorTaskStatusDTO;
1114
import io.kafbat.ui.model.FullConnectorInfoDTO;
1215
import io.kafbat.ui.model.TaskDTO;
1316
import io.kafbat.ui.model.TaskStatusDTO;
14-
import io.kafbat.ui.model.connect.InternalConnectInfo;
17+
import io.kafbat.ui.model.connect.InternalConnectorInfo;
1518
import java.util.List;
19+
import java.util.Optional;
1620
import org.mapstruct.Mapper;
1721
import org.mapstruct.Mapping;
1822

@@ -38,7 +42,51 @@ ConnectorPluginConfigValidationResponseDTO fromClient(
3842
io.kafbat.ui.connect.model.ConnectorPluginConfigValidationResponse
3943
connectorPluginConfigValidationResponse);
4044

41-
default FullConnectorInfoDTO fullConnectorInfo(InternalConnectInfo connectInfo) {
45+
default ConnectDTO toKafkaConnect(
46+
ClustersProperties.ConnectCluster connect,
47+
List<InternalConnectorInfo> connectors,
48+
boolean withStats) {
49+
Integer connectorCount = null;
50+
Integer failedConnectors = null;
51+
Integer tasksCount = null;
52+
Integer failedTasksCount = null;
53+
54+
if (withStats) {
55+
connectorCount = connectors.size();
56+
failedConnectors = 0;
57+
tasksCount = 0;
58+
failedTasksCount = 0;
59+
60+
for (InternalConnectorInfo connector : connectors) {
61+
Optional<ConnectorDTO> internalConnector = Optional.ofNullable(connector.getConnector());
62+
63+
failedConnectors += internalConnector
64+
.map(ConnectorDTO::getStatus)
65+
.map(ConnectorStatusDTO::getState)
66+
.filter(ConnectorStateDTO.FAILED::equals)
67+
.map(s -> 1).orElse(0);
68+
69+
tasksCount += internalConnector.map(c -> c.getTasks().size()).orElse(0);
70+
71+
for (TaskDTO task : connector.getTasks()) {
72+
if (task.getStatus() != null && ConnectorTaskStatusDTO.FAILED.equals(task.getStatus().getState())) {
73+
failedTasksCount += tasksCount;
74+
}
75+
}
76+
}
77+
78+
}
79+
80+
return new ConnectDTO()
81+
.address(connect.getAddress())
82+
.name(connect.getName())
83+
.connectorsCount(connectorCount)
84+
.failedConnectorsCount(failedConnectors)
85+
.tasksCount(tasksCount)
86+
.failedTasksCount(failedTasksCount);
87+
}
88+
89+
default FullConnectorInfoDTO fullConnectorInfo(InternalConnectorInfo connectInfo) {
4290
ConnectorDTO connector = connectInfo.getConnector();
4391
List<TaskDTO> tasks = connectInfo.getTasks();
4492
int failedTasksCount = (int) tasks.stream()

api/src/main/java/io/kafbat/ui/model/connect/InternalConnectInfo.java renamed to api/src/main/java/io/kafbat/ui/model/connect/InternalConnectorInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
@Data
1111
@Builder(toBuilder = true)
12-
public class InternalConnectInfo {
12+
public class InternalConnectorInfo {
1313
private final ConnectorDTO connector;
1414
private final Map<String, Object> config;
1515
private final List<TaskDTO> tasks;

api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java

Lines changed: 71 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package io.kafbat.ui.service;
22

3+
4+
import com.github.benmanes.caffeine.cache.AsyncCache;
5+
import com.github.benmanes.caffeine.cache.Caffeine;
6+
import io.kafbat.ui.config.ClustersProperties;
37
import io.kafbat.ui.connect.api.KafkaConnectClientApi;
48
import io.kafbat.ui.connect.model.ConnectorStatus;
59
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
@@ -21,15 +25,15 @@
2125
import io.kafbat.ui.model.KafkaCluster;
2226
import io.kafbat.ui.model.NewConnectorDTO;
2327
import io.kafbat.ui.model.TaskDTO;
24-
import io.kafbat.ui.model.connect.InternalConnectInfo;
28+
import io.kafbat.ui.model.connect.InternalConnectorInfo;
2529
import io.kafbat.ui.util.ReactiveFailover;
30+
import jakarta.validation.Valid;
2631
import java.util.List;
2732
import java.util.Map;
2833
import java.util.Optional;
2934
import java.util.function.Predicate;
3035
import java.util.stream.Stream;
3136
import javax.annotation.Nullable;
32-
import lombok.RequiredArgsConstructor;
3337
import lombok.extern.slf4j.Slf4j;
3438
import org.apache.commons.lang3.StringUtils;
3539
import org.springframework.stereotype.Service;
@@ -39,23 +43,78 @@
3943

4044
@Service
4145
@Slf4j
42-
@RequiredArgsConstructor
4346
public class KafkaConnectService {
4447
private final ClusterMapper clusterMapper;
4548
private final KafkaConnectMapper kafkaConnectMapper;
4649
private final KafkaConfigSanitizer kafkaConfigSanitizer;
50+
private final ClustersProperties clustersProperties;
51+
52+
private final AsyncCache<ConnectCacheKey, List<InternalConnectorInfo>> cachedConnectors;
53+
54+
public KafkaConnectService(ClusterMapper clusterMapper, KafkaConnectMapper kafkaConnectMapper,
55+
KafkaConfigSanitizer kafkaConfigSanitizer,
56+
ClustersProperties clustersProperties) {
57+
this.clusterMapper = clusterMapper;
58+
this.kafkaConnectMapper = kafkaConnectMapper;
59+
this.kafkaConfigSanitizer = kafkaConfigSanitizer;
60+
this.clustersProperties = clustersProperties;
61+
this.cachedConnectors = Caffeine.newBuilder()
62+
.expireAfterWrite(clustersProperties.getCache().getConnectCacheExpiry())
63+
.buildAsync();
64+
}
4765

48-
public Flux<ConnectDTO> getConnects(KafkaCluster cluster) {
49-
return Flux.fromIterable(
50-
Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect())
51-
.map(lst -> lst.stream().map(clusterMapper::toKafkaConnect).toList())
52-
.orElse(List.of())
66+
public Flux<ConnectDTO> getConnects(KafkaCluster cluster, boolean withStats) {
67+
Optional<List<ClustersProperties.@Valid ConnectCluster>> connectClusters =
68+
Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect());
69+
if (withStats) {
70+
return connectClusters.map(connects ->
71+
Flux.fromIterable(connects).flatMap(connect -> (
72+
getConnectConnectorsFromCache(new ConnectCacheKey(cluster, connect), withStats).map(
73+
connectors -> kafkaConnectMapper.toKafkaConnect(connect, connectors, withStats)
74+
)
75+
)
76+
)
77+
).orElse(Flux.fromIterable(List.of()));
78+
} else {
79+
return Flux.fromIterable(connectClusters.map(connects ->
80+
connects.stream().map(c -> kafkaConnectMapper.toKafkaConnect(c, List.of(), withStats)).toList()
81+
).orElse(List.of()));
82+
}
83+
}
84+
85+
private Mono<List<InternalConnectorInfo>> getConnectConnectorsFromCache(ConnectCacheKey key, boolean withStats) {
86+
if (clustersProperties.getCache().isEnabled()) {
87+
return Mono.fromFuture(
88+
cachedConnectors.get(key, (t, e) ->
89+
getConnectConnectors(t.cluster(), t.connect()).collectList().toFuture()
90+
)
91+
);
92+
} else {
93+
return getConnectConnectors(key.cluster(), key.connect()).collectList();
94+
}
95+
}
96+
97+
private Flux<InternalConnectorInfo> getConnectConnectors(
98+
KafkaCluster cluster,
99+
ClustersProperties.ConnectCluster connect) {
100+
return getConnectorNamesWithErrorsSuppress(cluster, connect.getName()).flatMap(connectorName ->
101+
Mono.zip(
102+
getConnector(cluster, connect.getName(), connectorName),
103+
getConnectorTasks(cluster, connect.getName(), connectorName).collectList()
104+
).map(tuple ->
105+
InternalConnectorInfo.builder()
106+
.connector(tuple.getT1())
107+
.config(null)
108+
.tasks(tuple.getT2())
109+
.topics(null)
110+
.build()
111+
)
53112
);
54113
}
55114

56115
public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
57116
@Nullable final String search) {
58-
return getConnects(cluster)
117+
return getConnects(cluster, false)
59118
.flatMap(connect ->
60119
getConnectorNamesWithErrorsSuppress(cluster, connect.getName())
61120
.flatMap(connectorName ->
@@ -65,7 +124,7 @@ public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
65124
getConnectorTasks(cluster, connect.getName(), connectorName).collectList(),
66125
getConnectorTopics(cluster, connect.getName(), connectorName)
67126
).map(tuple ->
68-
InternalConnectInfo.builder()
127+
InternalConnectorInfo.builder()
69128
.connector(tuple.getT1())
70129
.config(tuple.getT2())
71130
.tasks(tuple.getT3())
@@ -289,4 +348,6 @@ public Mono<Void> resetConnectorOffsets(KafkaCluster cluster, String connectName
289348
.formatted(connectorName, connectName));
290349
});
291350
}
351+
352+
record ConnectCacheKey(KafkaCluster cluster, ClustersProperties.ConnectCluster connect) {}
292353
}

api/src/main/java/io/kafbat/ui/service/integration/odd/ConnectorsExporter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class ConnectorsExporter {
2424
private final KafkaConnectService kafkaConnectService;
2525

2626
Flux<DataEntityList> export(KafkaCluster cluster) {
27-
return kafkaConnectService.getConnects(cluster)
27+
return kafkaConnectService.getConnects(cluster, false)
2828
.flatMap(connect -> kafkaConnectService.getConnectorNamesWithErrorsSuppress(cluster, connect.getName())
2929
.flatMap(connectorName -> kafkaConnectService.getConnector(cluster, connect.getName(), connectorName))
3030
.flatMap(connectorDTO ->
@@ -41,7 +41,7 @@ Flux<DataEntityList> export(KafkaCluster cluster) {
4141
}
4242

4343
Flux<DataSource> getConnectDataSources(KafkaCluster cluster) {
44-
return kafkaConnectService.getConnects(cluster)
44+
return kafkaConnectService.getConnects(cluster, false)
4545
.map(ConnectorsExporter::toDataSource);
4646
}
4747

api/src/main/java/io/kafbat/ui/util/DynamicConfigOperations.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -187,20 +187,7 @@ private void writeYamlToFile(String yaml, Path path) {
187187
}
188188

189189
private String serializeToYaml(PropertiesStructure props) {
190-
//representer, that skips fields with null values
191-
Representer representer = new Representer(new DumperOptions()) {
192-
@Override
193-
protected NodeTuple representJavaBeanProperty(Object javaBean,
194-
Property property,
195-
Object propertyValue,
196-
Tag customTag) {
197-
if (propertyValue == null) {
198-
return null; // if value of property is null, ignore it.
199-
} else {
200-
return super.representJavaBeanProperty(javaBean, property, propertyValue, customTag);
201-
}
202-
}
203-
};
190+
Representer representer = new YamlNullSkipRepresenter(new DumperOptions());
204191
var propertyUtils = new PropertyUtils();
205192
propertyUtils.setBeanAccess(BeanAccess.FIELD);
206193
representer.setPropertyUtils(propertyUtils);
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package io.kafbat.ui.util;
2+
3+
import java.time.Duration;
4+
import org.yaml.snakeyaml.DumperOptions;
5+
import org.yaml.snakeyaml.introspector.Property;
6+
import org.yaml.snakeyaml.nodes.NodeTuple;
7+
import org.yaml.snakeyaml.nodes.Tag;
8+
import org.yaml.snakeyaml.representer.Representer;
9+
10+
// representer, that skips fields with null values
11+
public class YamlNullSkipRepresenter extends Representer {
12+
public YamlNullSkipRepresenter(DumperOptions options) {
13+
super(options);
14+
this.representers.put(Duration.class, data -> this.representScalar(Tag.STR, data.toString()));
15+
}
16+
17+
@Override
18+
protected NodeTuple representJavaBeanProperty(Object javaBean,
19+
Property property,
20+
Object propertyValue,
21+
Tag customTag) {
22+
if (propertyValue == null) {
23+
return null; // if value of property is null, ignore it.
24+
} else {
25+
return super.representJavaBeanProperty(javaBean, property, propertyValue, customTag);
26+
}
27+
}
28+
}

0 commit comments

Comments
 (0)