Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ dependencies {
implementation("com.clickhouse:clickhouse-data:${project.extra["clickHouseDriverVersion"]}")
implementation("com.clickhouse:client-v2:${project.extra["clickHouseDriverVersion"]}")
implementation("com.google.code.gson:gson:${project.extra["gson"]}")

implementation("org.apache.httpcomponents.client5:httpclient5:5.5")

// Avoid telescoping constructors problem with the builder pattern using Lombok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,7 @@ static ErrorReporter devNullErrorReporter() {
};
}

public int taskId() {
return this.proxySinkTask == null ? Integer.MAX_VALUE : this.proxySinkTask.getId();
}
}
40 changes: 15 additions & 25 deletions src/main/java/com/clickhouse/kafka/connect/sink/ProxySinkTask.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.clickhouse.kafka.connect.sink;

import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.kafka.connect.sink.data.Record;
import com.clickhouse.kafka.connect.sink.db.ClickHouseWriter;
import com.clickhouse.kafka.connect.sink.db.DBWriter;
Expand All @@ -11,7 +10,6 @@
import com.clickhouse.kafka.connect.sink.state.provider.InMemoryState;
import com.clickhouse.kafka.connect.sink.state.provider.KeeperStateProvider;
import com.clickhouse.kafka.connect.util.jmx.ExecutionTimer;
import com.clickhouse.kafka.connect.util.jmx.MBeanServerUtils;
import com.clickhouse.kafka.connect.util.jmx.SinkTaskStatistics;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
Expand All @@ -30,14 +28,14 @@ public class ProxySinkTask {

private static final Logger LOGGER = LoggerFactory.getLogger(ProxySinkTask.class);
private static final AtomicInteger NEXT_ID = new AtomicInteger();
private Processing processing = null;
private StateProvider stateProvider = null;
private DBWriter dbWriter = null;
private ClickHouseSinkConfig clickHouseSinkConfig = null;
private final Processing processing;
private final StateProvider stateProvider;
private final DBWriter dbWriter;
private final ClickHouseSinkConfig clickHouseSinkConfig;
private Timer tableRefreshTimer;

private final SinkTaskStatistics statistics;
private int id = NEXT_ID.getAndAdd(1);
private final int id = NEXT_ID.getAndAdd(1);

public ProxySinkTask(final ClickHouseSinkConfig clickHouseSinkConfig, final ErrorReporter errorReporter) {
this.clickHouseSinkConfig = clickHouseSinkConfig;
Expand All @@ -47,8 +45,10 @@ public ProxySinkTask(final ClickHouseSinkConfig clickHouseSinkConfig, final Erro
} else {
this.stateProvider = new InMemoryState();
}
this.statistics = new SinkTaskStatistics(id);
this.statistics.registerMBean();

ClickHouseWriter chWriter = new ClickHouseWriter();
ClickHouseWriter chWriter = new ClickHouseWriter(this.statistics);
this.dbWriter = chWriter;

// Add table mapping refresher
Expand All @@ -62,26 +62,12 @@ public ProxySinkTask(final ClickHouseSinkConfig clickHouseSinkConfig, final Erro
boolean isStarted = dbWriter.start(clickHouseSinkConfig);
if (!isStarted)
throw new RuntimeException("Connection to ClickHouse is not active.");
processing = new Processing(stateProvider, dbWriter, errorReporter, clickHouseSinkConfig);
processing = new Processing(stateProvider, dbWriter, errorReporter, clickHouseSinkConfig, statistics);

this.statistics = MBeanServerUtils.registerMBean(new SinkTaskStatistics(), getMBeanNAme());
}

private String getMBeanNAme() {
return String.format("com.clickhouse:type=ClickHouseKafkaConnector,name=SinkTask%d,version=%s", id, com.clickhouse.kafka.connect.sink.Version.ARTIFACT_VERSION);
}

public void stop() {
if (tableRefreshTimer != null) {
try {
tableRefreshTimer.cancel();
} catch (Exception e) {
LOGGER.error("Error canceling table refresh timer on com.clickhouse.kafka.connect.sink.ProxySinkTask.stop", e);
}
}

LOGGER.info("Stopping MBean server {}", getMBeanNAme());
MBeanServerUtils.unregisterMBean(getMBeanNAme());
statistics.unregisterMBean();
}

public void put(final Collection<SinkRecord> records) throws IOException, ExecutionException, InterruptedException {
Expand All @@ -90,8 +76,8 @@ public void put(final Collection<SinkRecord> records) throws IOException, Execut
return;
}
// Group by topic & partition
statistics.receivedRecords(records);
ExecutionTimer taskTime = ExecutionTimer.start();
statistics.receivedRecords(records.size());
LOGGER.trace(String.format("Got %d records from put API.", records.size()));
ExecutionTimer processingTime = ExecutionTimer.start();

Expand All @@ -112,4 +98,8 @@ public void put(final Collection<SinkRecord> records) throws IOException, Execut
}
statistics.taskProcessingTime(taskTime);
}

public int getId() {
return id;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.clickhouse.kafka.connect.sink.dlq.ErrorReporter;
import com.clickhouse.kafka.connect.util.QueryIdentifier;
import com.clickhouse.kafka.connect.util.Utils;
import com.clickhouse.kafka.connect.util.jmx.SinkTaskStatistics;
import com.google.gson.ExclusionStrategy;
import com.google.gson.FieldAttributes;
import com.google.gson.Gson;
Expand Down Expand Up @@ -76,9 +77,10 @@ public class ClickHouseWriter implements DBWriter {

private Map<String, Table> mapping = null;
private AtomicBoolean isUpdateMappingRunning = new AtomicBoolean(false);

public ClickHouseWriter() {
private final SinkTaskStatistics statistics;
public ClickHouseWriter(SinkTaskStatistics statistics) {
this.mapping = new HashMap<String, Table>();
this.statistics = statistics;
}

protected void setClient(ClickHouseHelperClient chc) {
Expand Down Expand Up @@ -200,7 +202,6 @@ public void doInsert(List<Record> records, QueryIdentifier queryId, ErrorReporte
Table table = getTable(database, topic);
if (table == null) { return; }//We checked the error flag in getTable, so we don't need to check it again here
LOGGER.debug("Trying to insert [{}] records to table name [{}] (QueryId: [{}])", records.size(), table.getName(), queryId.getQueryId());

switch (first.getSchemaType()) {
case SCHEMA:
if (csc.isBypassRowBinary()) {
Expand Down Expand Up @@ -879,6 +880,8 @@ protected void doInsertRawBinaryV2(List<Record> records, Table table, QueryIdent
}
long s3 = System.currentTimeMillis();
LOGGER.info("topic: {} partition: {} batchSize: {} push stream ms: {} data ms: {} send ms: {} (QueryId: [{}])", topic, partition, records.size(), pushStreamTime,s2 - s1, s3 - s2, queryId.getQueryId());
statistics.insertTime(s2 - s1, topic);

}
protected void doInsertRawBinaryV1(List<Record> records, Table table, QueryIdentifier queryId, boolean supportDefaults) throws IOException, ExecutionException, InterruptedException {
long s1 = System.currentTimeMillis();
Expand Down Expand Up @@ -934,6 +937,7 @@ protected void doInsertRawBinaryV1(List<Record> records, Table table, QueryIdent

long s3 = System.currentTimeMillis();
LOGGER.info("topic :{} partition: {} batchSize: {} push stream ms: {} data ms: {} send ms: {} (QueryId: [{}])", topic, partition, records.size(), pushStreamTime,s2 - s1, s3 - s2, queryId.getQueryId());
statistics.insertTime(s2 - s1, topic);
}

protected void doInsertJson(List<Record> records, Table table, QueryIdentifier queryId) throws IOException, ExecutionException, InterruptedException {
Expand Down Expand Up @@ -993,12 +997,15 @@ protected void doInsertJsonV1(List<Record> records, Table table, QueryIdentifier
long beforeSerialize = System.currentTimeMillis();
String gsonString = gson.toJson(cleanupExtraFields(data, table), gsonType);
dataSerializeTime += System.currentTimeMillis() - beforeSerialize;

LOGGER.trace("topic {} partition {} offset {} payload {}",
record.getTopic(),
record.getRecordOffsetContainer().getPartition(),
record.getRecordOffsetContainer().getOffset(),
gsonString);
BinaryStreamUtils.writeBytes(stream, gsonString.getBytes(StandardCharsets.UTF_8));
byte[] bytes = gsonString.getBytes(StandardCharsets.UTF_8);
statistics.bytesInserted(bytes.length);
BinaryStreamUtils.writeBytes(stream, bytes);
} else {
LOGGER.warn(String.format("Getting empty record skip the insert topic[%s] offset[%d]", record.getTopic(), record.getSinkRecord().kafkaOffset()));
}
Expand All @@ -1014,6 +1021,7 @@ protected void doInsertJsonV1(List<Record> records, Table table, QueryIdentifier
}
long s3 = System.currentTimeMillis();
LOGGER.info("topic: {} partition: {} batchSize: {} serialization ms: {} data ms: {} send ms: {} (QueryId: [{}])", topic, partition, records.size(), dataSerializeTime, s2 - s1, s3 - s2, queryId.getQueryId());
statistics.insertTime(s2 - s1, topic);
}

protected void doInsertJsonV2(List<Record> records, Table table, QueryIdentifier queryId) throws IOException, ExecutionException, InterruptedException {
Expand Down Expand Up @@ -1073,7 +1081,9 @@ protected void doInsertJsonV2(List<Record> records, Table table, QueryIdentifier
record.getRecordOffsetContainer().getPartition(),
record.getRecordOffsetContainer().getOffset(),
gsonString);
BinaryStreamUtils.writeBytes(stream, gsonString.getBytes(StandardCharsets.UTF_8));
byte[] bytes = gsonString.getBytes(StandardCharsets.UTF_8);
statistics.bytesInserted(bytes.length);
BinaryStreamUtils.writeBytes(stream, bytes);
} else {
LOGGER.warn(String.format("Getting empty record skip the insert topic[%s] offset[%d]", record.getTopic(), record.getSinkRecord().kafkaOffset()));
}
Expand All @@ -1086,6 +1096,7 @@ protected void doInsertJsonV2(List<Record> records, Table table, QueryIdentifier
}
long s3 = System.currentTimeMillis();
LOGGER.info("topic: {} partition: {} batchSize: {} serialization ms: {} data ms: {} send ms: {} (QueryId: [{}])", topic, partition, records.size(), dataSerializeTime, s2 - s1, s3 - s2, queryId.getQueryId());
statistics.insertTime(s2 - s1, topic);
}

protected Map<String, Object> cleanupExtraFields(Map<String, Object> m, Table t) {
Expand Down Expand Up @@ -1150,6 +1161,7 @@ protected void doInsertStringV1(List<Record> records, Table table, QueryIdentifi
String data = (String)record.getSinkRecord().value();
LOGGER.debug(String.format("data: %s", data));
byte[] bytes = data.getBytes(StandardCharsets.UTF_8);
statistics.bytesInserted(bytes.length);
long beforePushStream = System.currentTimeMillis();
BinaryStreamUtils.writeBytes(stream, bytes);
pushStreamTime += System.currentTimeMillis() - beforePushStream;
Expand All @@ -1176,6 +1188,7 @@ protected void doInsertStringV1(List<Record> records, Table table, QueryIdentifi
}
long s3 = System.currentTimeMillis();
LOGGER.info("topic: {} partition: {} batchSize: {} push stream ms: {} data ms: {} send ms: {} (QueryId: [{}])", topic, partition, records.size(), pushStreamTime, s2 - s1, s3 - s2, queryId.getQueryId());
statistics.insertTime(s2 - s1, topic);
}
protected void doInsertStringV2(List<Record> records, Table table, QueryIdentifier queryId) throws IOException, ExecutionException, InterruptedException {
byte[] endingLine = new byte[]{'\n'};
Expand Down Expand Up @@ -1224,6 +1237,7 @@ protected void doInsertStringV2(List<Record> records, Table table, QueryIdentifi
String data = (String)record.getSinkRecord().value();
LOGGER.debug(String.format("data: %s", data));
byte[] bytes = data.getBytes(StandardCharsets.UTF_8);
statistics.bytesInserted(bytes.length);
long beforePushStream = System.currentTimeMillis();
BinaryStreamUtils.writeBytes(stream, bytes);
pushStreamTime += System.currentTimeMillis() - beforePushStream;
Expand All @@ -1246,6 +1260,7 @@ protected void doInsertStringV2(List<Record> records, Table table, QueryIdentifi
}
long s3 = System.currentTimeMillis();
LOGGER.info("topic: {} partition: {} batchSize: {} push stream ms: {} data ms: {} send ms: {} (QueryId: [{}])", topic, partition, records.size(), pushStreamTime, s2 - s1, s3 - s2, queryId.getQueryId());
statistics.insertTime(s2 - s1, topic);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
import com.clickhouse.kafka.connect.sink.dlq.ErrorReporter;
import com.clickhouse.kafka.connect.sink.kafka.RangeContainer;
import com.clickhouse.kafka.connect.sink.kafka.RangeState;
import com.clickhouse.kafka.connect.sink.state.State;
import com.clickhouse.kafka.connect.sink.state.StateProvider;
import com.clickhouse.kafka.connect.sink.state.StateRecord;
import com.clickhouse.kafka.connect.util.QueryIdentifier;
import com.clickhouse.kafka.connect.util.Utils;
import com.clickhouse.kafka.connect.util.jmx.SinkTaskStatistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -28,15 +28,17 @@ public class Processing {
private StateProvider stateProvider = null;
private DBWriter dbWriter = null;
private ClickHouseSinkConfig clickHouseSinkConfig;

private SinkTaskStatistics statistics;

private ErrorReporter errorReporter = null;

public Processing(StateProvider stateProvider, DBWriter dbWriter, ErrorReporter errorReporter, ClickHouseSinkConfig clickHouseSinkConfig) {
public Processing(StateProvider stateProvider, DBWriter dbWriter, ErrorReporter errorReporter,
ClickHouseSinkConfig clickHouseSinkConfig, SinkTaskStatistics statistics) {
this.stateProvider = stateProvider;
this.dbWriter = dbWriter;
this.errorReporter = errorReporter;
this.clickHouseSinkConfig = clickHouseSinkConfig;
this.statistics = statistics;
}
/**
* the logic is only for topic partition scoop
Expand All @@ -49,14 +51,23 @@ private void doInsert(List<Record> records, RangeContainer rangeContainer) {
LOGGER.trace("doInsert - No records to insert.");
return;
}
QueryIdentifier queryId = new QueryIdentifier(records.get(0).getRecordOffsetContainer().getTopic(), records.get(0).getRecordOffsetContainer().getPartition(),

final Record firstRecord = records.get(0);
long eventReceiveLag = 0;
if (firstRecord.getSinkRecord().timestamp() != null) {
eventReceiveLag = System.currentTimeMillis() - firstRecord.getSinkRecord().timestamp();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know where the timestamp comes from? Record metadata, or when created, just want to validate since we can have clock skew in some siuations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will find out.
As for time skew - this would be a problem. But it can be normalized I think if skew stays in the range.

}
final String topic = firstRecord.getRecordOffsetContainer().getTopic();
QueryIdentifier queryId = new QueryIdentifier(topic, records.get(0).getRecordOffsetContainer().getPartition(),
rangeContainer.getMinOffset(), rangeContainer.getMaxOffset(),
UUID.randomUUID().toString());

try {
LOGGER.debug("doInsert - Records: [{}] - {}", records.size(), queryId);
dbWriter.doInsert(records, queryId, errorReporter);
statistics.recordTopicStats(records.size(), queryId.getTopic(), eventReceiveLag);
} catch (Exception e) {
statistics.recordTopicStatsOnFailure(records.size(), queryId.getTopic(), eventReceiveLag);
throw new RuntimeException(queryId.toString(), e);//This way the queryId will propagate
}
}
Expand All @@ -66,12 +77,21 @@ private void doInsert(List<Record> records) {
LOGGER.trace("doInsert - No records to insert.");
return;
}
QueryIdentifier queryId = new QueryIdentifier(records.get(0).getRecordOffsetContainer().getTopic(), UUID.randomUUID().toString());

final Record firstRecord = records.get(0);
long eventReceiveLag = 0;
if (firstRecord.getSinkRecord().timestamp() != null) {
eventReceiveLag = System.currentTimeMillis() - firstRecord.getSinkRecord().timestamp();
}
final String topic = firstRecord.getRecordOffsetContainer().getTopic();
QueryIdentifier queryId = new QueryIdentifier(topic, UUID.randomUUID().toString());

try {
LOGGER.info("doInsert - Records: [{}] - {}", records.size(), queryId);
dbWriter.doInsert(records, queryId, errorReporter);
statistics.recordTopicStats(records.size(), topic, eventReceiveLag);
} catch (Exception e) {
statistics.recordTopicStatsOnFailure(records.size(), topic, eventReceiveLag);
throw new RuntimeException(queryId.toString(), e);//This way the queryId will propagate
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.clickhouse.kafka.connect.util.jmx;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
* Not thread-safe Exponential Moving Average implementation
*/
public class SimpleMovingAverage {

public static final int DEFAULT_WINDOW_SIZE = 60;

private final long[] values;
private final AtomicInteger head;
private final AtomicLong sum;
private final int n;

public SimpleMovingAverage(int numOfValues) {
this.values = new long[numOfValues];
this.n = this.values.length - 1;
this.head = new AtomicInteger();
this.sum = new AtomicLong();
}

public void add(long value) {
int insertIndex = head.getAndIncrement() % values.length;
sum.addAndGet(value - values[insertIndex]);
values[insertIndex] = value;
}

public double get() {
return (double) sum.get() / values.length;
}
}
Loading