Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ extra.apply {
set("kafkaVersion", "2.7.0")
set("gson", "2.13.1")
set("jackson", "2.19.1")
set("dropwizardMetrics", "5.0.0")

// Testing dependencies
set("junitJupiterVersion", "5.9.2")
Expand Down Expand Up @@ -86,9 +87,10 @@ dependencies {
implementation("com.clickhouse:clickhouse-data:${project.extra["clickHouseDriverVersion"]}")
implementation("com.clickhouse:client-v2:${project.extra["clickHouseDriverVersion"]}")
implementation("com.google.code.gson:gson:${project.extra["gson"]}")

implementation("org.apache.httpcomponents.client5:httpclient5:5.5")

implementation("io.dropwizard:dropwizard-core:${project.extra["dropwizardMetrics"]}")

// Avoid telescoping constructors problem with the builder pattern using Lombok
compileOnly("org.projectlombok:lombok:1.18.38")
annotationProcessor("org.projectlombok:lombok:1.18.38")
Expand All @@ -109,6 +111,7 @@ dependencies {
clickhouseDependencies("com.fasterxml.jackson.core:jackson-core:${project.extra["jackson"]}")
clickhouseDependencies("com.fasterxml.jackson.core:jackson-databind:${project.extra["jackson"]}")
clickhouseDependencies("com.fasterxml.jackson.core:jackson-annotations:${project.extra["jackson"]}")
clickhouseDependencies("io.dropwizard:dropwizard-core:${project.extra["dropwizardMetrics"]}")

// Unit Tests
testImplementation(platform("org.junit:junit-bom:${project.extra["junitJupiterVersion"]}"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,7 @@ static ErrorReporter devNullErrorReporter() {
};
}

public int taskId() {
return this.proxySinkTask == null ? Integer.MAX_VALUE : this.proxySinkTask.getId();
}
}
38 changes: 14 additions & 24 deletions src/main/java/com/clickhouse/kafka/connect/sink/ProxySinkTask.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.clickhouse.kafka.connect.sink;

import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.kafka.connect.sink.data.Record;
import com.clickhouse.kafka.connect.sink.db.ClickHouseWriter;
import com.clickhouse.kafka.connect.sink.db.DBWriter;
Expand All @@ -11,7 +10,6 @@
import com.clickhouse.kafka.connect.sink.state.provider.InMemoryState;
import com.clickhouse.kafka.connect.sink.state.provider.KeeperStateProvider;
import com.clickhouse.kafka.connect.util.jmx.ExecutionTimer;
import com.clickhouse.kafka.connect.util.jmx.MBeanServerUtils;
import com.clickhouse.kafka.connect.util.jmx.SinkTaskStatistics;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
Expand All @@ -30,14 +28,14 @@ public class ProxySinkTask {

private static final Logger LOGGER = LoggerFactory.getLogger(ProxySinkTask.class);
private static final AtomicInteger NEXT_ID = new AtomicInteger();
private Processing processing = null;
private StateProvider stateProvider = null;
private DBWriter dbWriter = null;
private ClickHouseSinkConfig clickHouseSinkConfig = null;
private final Processing processing;
private final StateProvider stateProvider;
private final DBWriter dbWriter;
private final ClickHouseSinkConfig clickHouseSinkConfig;
private Timer tableRefreshTimer;

private final SinkTaskStatistics statistics;
private int id = NEXT_ID.getAndAdd(1);
private final int id = NEXT_ID.getAndAdd(1);

public ProxySinkTask(final ClickHouseSinkConfig clickHouseSinkConfig, final ErrorReporter errorReporter) {
this.clickHouseSinkConfig = clickHouseSinkConfig;
Expand All @@ -47,8 +45,10 @@ public ProxySinkTask(final ClickHouseSinkConfig clickHouseSinkConfig, final Erro
} else {
this.stateProvider = new InMemoryState();
}
this.statistics = new SinkTaskStatistics(id);
this.statistics.registerMBean();

ClickHouseWriter chWriter = new ClickHouseWriter();
ClickHouseWriter chWriter = new ClickHouseWriter(this.statistics);
this.dbWriter = chWriter;

// Add table mapping refresher
Expand All @@ -62,26 +62,12 @@ public ProxySinkTask(final ClickHouseSinkConfig clickHouseSinkConfig, final Erro
boolean isStarted = dbWriter.start(clickHouseSinkConfig);
if (!isStarted)
throw new RuntimeException("Connection to ClickHouse is not active.");
processing = new Processing(stateProvider, dbWriter, errorReporter, clickHouseSinkConfig);
processing = new Processing(stateProvider, dbWriter, errorReporter, clickHouseSinkConfig, statistics);

this.statistics = MBeanServerUtils.registerMBean(new SinkTaskStatistics(), getMBeanNAme());
}

private String getMBeanNAme() {
return String.format("com.clickhouse:type=ClickHouseKafkaConnector,name=SinkTask%d,version=%s", id, com.clickhouse.kafka.connect.sink.Version.ARTIFACT_VERSION);
}

public void stop() {
if (tableRefreshTimer != null) {
try {
tableRefreshTimer.cancel();
} catch (Exception e) {
LOGGER.error("Error canceling table refresh timer on com.clickhouse.kafka.connect.sink.ProxySinkTask.stop", e);
}
}

LOGGER.info("Stopping MBean server {}", getMBeanNAme());
MBeanServerUtils.unregisterMBean(getMBeanNAme());
statistics.unregisterMBean();
}

public void put(final Collection<SinkRecord> records) throws IOException, ExecutionException, InterruptedException {
Expand Down Expand Up @@ -112,4 +98,8 @@ public void put(final Collection<SinkRecord> records) throws IOException, Execut
}
statistics.taskProcessingTime(taskTime);
}

public int getId() {
return id;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.clickhouse.kafka.connect.sink.dlq.ErrorReporter;
import com.clickhouse.kafka.connect.util.QueryIdentifier;
import com.clickhouse.kafka.connect.util.Utils;
import com.clickhouse.kafka.connect.util.jmx.SinkTaskStatistics;
import com.google.gson.ExclusionStrategy;
import com.google.gson.FieldAttributes;
import com.google.gson.Gson;
Expand Down Expand Up @@ -75,9 +76,10 @@ public class ClickHouseWriter implements DBWriter {

private Map<String, Table> mapping = null;
private AtomicBoolean isUpdateMappingRunning = new AtomicBoolean(false);

public ClickHouseWriter() {
private final SinkTaskStatistics statistics;
public ClickHouseWriter(SinkTaskStatistics statistics) {
this.mapping = new HashMap<String, Table>();
this.statistics = statistics;
}

protected void setClient(ClickHouseHelperClient chc) {
Expand Down Expand Up @@ -199,7 +201,6 @@ public void doInsert(List<Record> records, QueryIdentifier queryId, ErrorReporte
Table table = getTable(database, topic);
if (table == null) { return; }//We checked the error flag in getTable, so we don't need to check it again here
LOGGER.debug("Trying to insert [{}] records to table name [{}] (QueryId: [{}])", records.size(), table.getName(), queryId.getQueryId());

switch (first.getSchemaType()) {
case SCHEMA:
if (csc.isBypassRowBinary()) {
Expand Down Expand Up @@ -837,6 +838,8 @@ protected void doInsertRawBinaryV2(List<Record> records, Table table, QueryIdent
}
long s3 = System.currentTimeMillis();
LOGGER.info("topic: {} partition: {} batchSize: {} push stream ms: {} data ms: {} send ms: {} (QueryId: [{}])", topic, partition, records.size(), pushStreamTime,s2 - s1, s3 - s2, queryId.getQueryId());
statistics.insertTime(s2 - s1, topic);

}
protected void doInsertRawBinaryV1(List<Record> records, Table table, QueryIdentifier queryId, boolean supportDefaults) throws IOException, ExecutionException, InterruptedException {
long s1 = System.currentTimeMillis();
Expand Down Expand Up @@ -892,6 +895,7 @@ protected void doInsertRawBinaryV1(List<Record> records, Table table, QueryIdent

long s3 = System.currentTimeMillis();
LOGGER.info("topic :{} partition: {} batchSize: {} push stream ms: {} data ms: {} send ms: {} (QueryId: [{}])", topic, partition, records.size(), pushStreamTime,s2 - s1, s3 - s2, queryId.getQueryId());
statistics.insertTime(s2 - s1, topic);
}

protected void doInsertJson(List<Record> records, Table table, QueryIdentifier queryId) throws IOException, ExecutionException, InterruptedException {
Expand Down Expand Up @@ -972,6 +976,7 @@ protected void doInsertJsonV1(List<Record> records, Table table, QueryIdentifier
}
long s3 = System.currentTimeMillis();
LOGGER.info("topic: {} partition: {} batchSize: {} serialization ms: {} data ms: {} send ms: {} (QueryId: [{}])", topic, partition, records.size(), dataSerializeTime, s2 - s1, s3 - s2, queryId.getQueryId());
statistics.insertTime(s2 - s1, topic);
}

protected void doInsertJsonV2(List<Record> records, Table table, QueryIdentifier queryId) throws IOException, ExecutionException, InterruptedException {
Expand Down Expand Up @@ -1044,6 +1049,7 @@ protected void doInsertJsonV2(List<Record> records, Table table, QueryIdentifier
}
long s3 = System.currentTimeMillis();
LOGGER.info("topic: {} partition: {} batchSize: {} serialization ms: {} data ms: {} send ms: {} (QueryId: [{}])", topic, partition, records.size(), dataSerializeTime, s2 - s1, s3 - s2, queryId.getQueryId());
statistics.insertTime(s2 - s1, topic);
}

protected Map<String, Object> cleanupExtraFields(Map<String, Object> m, Table t) {
Expand Down Expand Up @@ -1134,6 +1140,7 @@ protected void doInsertStringV1(List<Record> records, Table table, QueryIdentifi
}
long s3 = System.currentTimeMillis();
LOGGER.info("topic: {} partition: {} batchSize: {} push stream ms: {} data ms: {} send ms: {} (QueryId: [{}])", topic, partition, records.size(), pushStreamTime, s2 - s1, s3 - s2, queryId.getQueryId());
statistics.insertTime(s2 - s1, topic);
}
protected void doInsertStringV2(List<Record> records, Table table, QueryIdentifier queryId) throws IOException, ExecutionException, InterruptedException {
byte[] endingLine = new byte[]{'\n'};
Expand Down Expand Up @@ -1204,6 +1211,7 @@ protected void doInsertStringV2(List<Record> records, Table table, QueryIdentifi
}
long s3 = System.currentTimeMillis();
LOGGER.info("topic: {} partition: {} batchSize: {} push stream ms: {} data ms: {} send ms: {} (QueryId: [{}])", topic, partition, records.size(), pushStreamTime, s2 - s1, s3 - s2, queryId.getQueryId());
statistics.insertTime(s2 - s1, topic);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
import com.clickhouse.kafka.connect.sink.dlq.ErrorReporter;
import com.clickhouse.kafka.connect.sink.kafka.RangeContainer;
import com.clickhouse.kafka.connect.sink.kafka.RangeState;
import com.clickhouse.kafka.connect.sink.state.State;
import com.clickhouse.kafka.connect.sink.state.StateProvider;
import com.clickhouse.kafka.connect.sink.state.StateRecord;
import com.clickhouse.kafka.connect.util.QueryIdentifier;
import com.clickhouse.kafka.connect.util.Utils;
import com.clickhouse.kafka.connect.util.jmx.SinkTaskStatistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -28,15 +28,17 @@ public class Processing {
private StateProvider stateProvider = null;
private DBWriter dbWriter = null;
private ClickHouseSinkConfig clickHouseSinkConfig;

private SinkTaskStatistics statistics;

private ErrorReporter errorReporter = null;

public Processing(StateProvider stateProvider, DBWriter dbWriter, ErrorReporter errorReporter, ClickHouseSinkConfig clickHouseSinkConfig) {
public Processing(StateProvider stateProvider, DBWriter dbWriter, ErrorReporter errorReporter,
ClickHouseSinkConfig clickHouseSinkConfig, SinkTaskStatistics statistics) {
this.stateProvider = stateProvider;
this.dbWriter = dbWriter;
this.errorReporter = errorReporter;
this.clickHouseSinkConfig = clickHouseSinkConfig;
this.statistics = statistics;
}
/**
* the logic is only for topic partition scoop
Expand All @@ -49,14 +51,23 @@ private void doInsert(List<Record> records, RangeContainer rangeContainer) {
LOGGER.trace("doInsert - No records to insert.");
return;
}
QueryIdentifier queryId = new QueryIdentifier(records.get(0).getRecordOffsetContainer().getTopic(), records.get(0).getRecordOffsetContainer().getPartition(),

final Record firstRecord = records.get(0);
long eventReceiveLag = 0;
if (firstRecord.getSinkRecord().timestamp() != null) {
eventReceiveLag = System.currentTimeMillis() - firstRecord.getSinkRecord().timestamp();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know where the timestamp comes from? Record metadata, or when created, just want to validate since we can have clock skew in some siuations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will find out.
As for time skew - this would be a problem. But it can be normalized I think if skew stays in the range.

}
final String topic = firstRecord.getRecordOffsetContainer().getTopic();
QueryIdentifier queryId = new QueryIdentifier(topic, records.get(0).getRecordOffsetContainer().getPartition(),
rangeContainer.getMinOffset(), rangeContainer.getMaxOffset(),
UUID.randomUUID().toString());

try {
LOGGER.debug("doInsert - Records: [{}] - {}", records.size(), queryId);
dbWriter.doInsert(records, queryId, errorReporter);
statistics.recordTopicStats(records.size(), queryId.getTopic(), eventReceiveLag);
} catch (Exception e) {
statistics.recordTopicStatsOnFailure(records.size(), queryId.getTopic(), eventReceiveLag);
throw new RuntimeException(queryId.toString(), e);//This way the queryId will propagate
}
}
Expand All @@ -66,12 +77,21 @@ private void doInsert(List<Record> records) {
LOGGER.trace("doInsert - No records to insert.");
return;
}
QueryIdentifier queryId = new QueryIdentifier(records.get(0).getRecordOffsetContainer().getTopic(), UUID.randomUUID().toString());

final Record firstRecord = records.get(0);
long eventReceiveLag = 0;
if (firstRecord.getSinkRecord().timestamp() != null) {
eventReceiveLag = System.currentTimeMillis() - firstRecord.getSinkRecord().timestamp();
}
final String topic = firstRecord.getRecordOffsetContainer().getTopic();
QueryIdentifier queryId = new QueryIdentifier(topic, UUID.randomUUID().toString());

try {
LOGGER.info("doInsert - Records: [{}] - {}", records.size(), queryId);
dbWriter.doInsert(records, queryId, errorReporter);
statistics.recordTopicStats(records.size(), topic, eventReceiveLag);
} catch (Exception e) {
statistics.recordTopicStatsOnFailure(records.size(), topic, eventReceiveLag);
throw new RuntimeException(queryId.toString(), e);//This way the queryId will propagate
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,34 +1,106 @@
package com.clickhouse.kafka.connect.util.jmx;

import com.clickhouse.kafka.connect.sink.Version;

import java.util.Deque;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicLong;

public class SinkTaskStatistics implements SinkTaskStatisticsMBean {
private volatile long receivedRecords;
private volatile long recordProcessingTime;
private volatile long taskProcessingTime;
private final AtomicLong receivedRecords;
private final AtomicLong recordProcessingTime;
private final AtomicLong taskProcessingTime;
private final AtomicLong insertedRecords;
private final Map<String, TopicStatistics> topicStatistics;
private final int taskId;

private final Deque<String> topicMBeans;

public SinkTaskStatistics(int taskId) {
this.taskId = taskId;
this.topicMBeans = new ConcurrentLinkedDeque<>();
this.receivedRecords = new AtomicLong(0);
this.recordProcessingTime = new AtomicLong(0);
this.taskProcessingTime = new AtomicLong(0);
this.insertedRecords = new AtomicLong(0);
this.topicStatistics = new ConcurrentHashMap<>();
}

public void registerMBean() {
MBeanServerUtils.registerMBean(this, getMBeanNAme(taskId));
}

public void unregisterMBean() {
MBeanServerUtils.unregisterMBean(getMBeanNAme(taskId));
for (String topicMBean : topicMBeans) {
MBeanServerUtils.unregisterMBean(topicMBean);
}
}

@Override
public long getReceivedRecords() {
return receivedRecords;
return receivedRecords.get();
}

@Override
public long getRecordProcessingTime() {
return recordProcessingTime;
return recordProcessingTime.get();
}

@Override
public long getTaskProcessingTime() {
return taskProcessingTime;
return taskProcessingTime.get();
}

@Override
public long getInsertedRecords() {
return insertedRecords.get();
}

public void receivedRecords(final int n ) {
this.receivedRecords += n;
public void receivedRecords(final int n) {
this.receivedRecords.addAndGet(n);
}

public void recordProcessingTime(ExecutionTimer timer) {
this.recordProcessingTime += timer.nanosElapsed();
this.recordProcessingTime.addAndGet(timer.nanosElapsed());
}

public void taskProcessingTime(ExecutionTimer timer) {
this.taskProcessingTime += timer.nanosElapsed();
this.taskProcessingTime.addAndGet(timer.nanosElapsed());
}

public void recordTopicStats(int n, String table, long eventReceiveLag) {
insertedRecords.addAndGet(n);
topicStatistics.computeIfAbsent(table, this::createTopicStatistics).recordsInserted(n);
topicStatistics.computeIfAbsent(table, this::createTopicStatistics).batchInserted(1);
topicStatistics.computeIfAbsent(table, this::createTopicStatistics).eventReceiveLag(eventReceiveLag);
}

public void recordTopicStatsOnFailure(int n, String table, long eventReceiveLag) {
insertedRecords.addAndGet(n);
topicStatistics.computeIfAbsent(table, this::createTopicStatistics).recordsFailed(n);
topicStatistics.computeIfAbsent(table, this::createTopicStatistics).batchesFailed(1);
topicStatistics.computeIfAbsent(table, this::createTopicStatistics).eventReceiveLag(eventReceiveLag);
}

private TopicStatistics createTopicStatistics(String topic) {
TopicStatistics topicStatistics = new TopicStatistics();
String topicMBeanName = getTopicMBeanName(taskId, topic);
topicMBeans.add(topicMBeanName);
return MBeanServerUtils.registerMBean(topicStatistics, topicMBeanName);
}

public static String getMBeanNAme(int taskId) {
return String.format("com.clickhouse:type=ClickHouseKafkaConnector,name=SinkTask%d,version=%s", taskId, Version.ARTIFACT_VERSION);
}

public static String getTopicMBeanName(int taskId, String topic) {
return String.format("com.clickhouse:type=ClickHouseKafkaConnector,name=SinkTask%d,version=%s,topic=%s", taskId, Version.ARTIFACT_VERSION, topic);
}

public void insertTime(long t, String topic) {
topicStatistics.computeIfAbsent(topic, this::createTopicStatistics).insertTime(t);
}
}
Loading
Loading