diff --git a/build.gradle.kts b/build.gradle.kts index 0f542ac0..ddff6e3f 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -86,7 +86,6 @@ dependencies { implementation("com.clickhouse:clickhouse-data:${project.extra["clickHouseDriverVersion"]}") implementation("com.clickhouse:client-v2:${project.extra["clickHouseDriverVersion"]}") implementation("com.google.code.gson:gson:${project.extra["gson"]}") - implementation("org.apache.httpcomponents.client5:httpclient5:5.5") // Avoid telescoping constructors problem with the builder pattern using Lombok diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTask.java b/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTask.java index 5fbc9d7f..3d2030f6 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTask.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTask.java @@ -103,4 +103,7 @@ static ErrorReporter devNullErrorReporter() { }; } + public int taskId() { + return this.proxySinkTask == null ? Integer.MAX_VALUE : this.proxySinkTask.getId(); + } } diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/ProxySinkTask.java b/src/main/java/com/clickhouse/kafka/connect/sink/ProxySinkTask.java index 0eb18c8b..3a3aeccc 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/ProxySinkTask.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/ProxySinkTask.java @@ -1,6 +1,5 @@ package com.clickhouse.kafka.connect.sink; -import com.clickhouse.client.config.ClickHouseClientOption; import com.clickhouse.kafka.connect.sink.data.Record; import com.clickhouse.kafka.connect.sink.db.ClickHouseWriter; import com.clickhouse.kafka.connect.sink.db.DBWriter; @@ -11,7 +10,6 @@ import com.clickhouse.kafka.connect.sink.state.provider.InMemoryState; import com.clickhouse.kafka.connect.sink.state.provider.KeeperStateProvider; import com.clickhouse.kafka.connect.util.jmx.ExecutionTimer; -import com.clickhouse.kafka.connect.util.jmx.MBeanServerUtils; import com.clickhouse.kafka.connect.util.jmx.SinkTaskStatistics; import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; @@ -30,14 +28,14 @@ public class ProxySinkTask { private static final Logger LOGGER = LoggerFactory.getLogger(ProxySinkTask.class); private static final AtomicInteger NEXT_ID = new AtomicInteger(); - private Processing processing = null; - private StateProvider stateProvider = null; - private DBWriter dbWriter = null; - private ClickHouseSinkConfig clickHouseSinkConfig = null; + private final Processing processing; + private final StateProvider stateProvider; + private final DBWriter dbWriter; + private final ClickHouseSinkConfig clickHouseSinkConfig; private Timer tableRefreshTimer; private final SinkTaskStatistics statistics; - private int id = NEXT_ID.getAndAdd(1); + private final int id = NEXT_ID.getAndAdd(1); public ProxySinkTask(final ClickHouseSinkConfig clickHouseSinkConfig, final ErrorReporter errorReporter) { this.clickHouseSinkConfig = clickHouseSinkConfig; @@ -47,8 +45,10 @@ public ProxySinkTask(final ClickHouseSinkConfig clickHouseSinkConfig, final Erro } else { this.stateProvider = new InMemoryState(); } + this.statistics = new SinkTaskStatistics(id); + this.statistics.registerMBean(); - ClickHouseWriter chWriter = new ClickHouseWriter(); + ClickHouseWriter chWriter = new ClickHouseWriter(this.statistics); this.dbWriter = chWriter; // Add table mapping refresher @@ -62,26 +62,12 @@ public ProxySinkTask(final ClickHouseSinkConfig clickHouseSinkConfig, final Erro boolean isStarted = dbWriter.start(clickHouseSinkConfig); if (!isStarted) throw new RuntimeException("Connection to ClickHouse is not active."); - processing = new Processing(stateProvider, dbWriter, errorReporter, clickHouseSinkConfig); + processing = new Processing(stateProvider, dbWriter, errorReporter, clickHouseSinkConfig, statistics); - this.statistics = MBeanServerUtils.registerMBean(new SinkTaskStatistics(), getMBeanNAme()); - } - - private String getMBeanNAme() { - return String.format("com.clickhouse:type=ClickHouseKafkaConnector,name=SinkTask%d,version=%s", id, com.clickhouse.kafka.connect.sink.Version.ARTIFACT_VERSION); } public void stop() { - if (tableRefreshTimer != null) { - try { - tableRefreshTimer.cancel(); - } catch (Exception e) { - LOGGER.error("Error canceling table refresh timer on com.clickhouse.kafka.connect.sink.ProxySinkTask.stop", e); - } - } - - LOGGER.info("Stopping MBean server {}", getMBeanNAme()); - MBeanServerUtils.unregisterMBean(getMBeanNAme()); + statistics.unregisterMBean(); } public void put(final Collection records) throws IOException, ExecutionException, InterruptedException { @@ -90,8 +76,8 @@ public void put(final Collection records) throws IOException, Execut return; } // Group by topic & partition + statistics.receivedRecords(records); ExecutionTimer taskTime = ExecutionTimer.start(); - statistics.receivedRecords(records.size()); LOGGER.trace(String.format("Got %d records from put API.", records.size())); ExecutionTimer processingTime = ExecutionTimer.start(); @@ -112,4 +98,8 @@ public void put(final Collection records) throws IOException, Execut } statistics.taskProcessingTime(taskTime); } + + public int getId() { + return id; + } } diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java index 43d356fd..1e60a467 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java @@ -28,6 +28,7 @@ import com.clickhouse.kafka.connect.sink.dlq.ErrorReporter; import com.clickhouse.kafka.connect.util.QueryIdentifier; import com.clickhouse.kafka.connect.util.Utils; +import com.clickhouse.kafka.connect.util.jmx.SinkTaskStatistics; import com.google.gson.ExclusionStrategy; import com.google.gson.FieldAttributes; import com.google.gson.Gson; @@ -76,9 +77,10 @@ public class ClickHouseWriter implements DBWriter { private Map mapping = null; private AtomicBoolean isUpdateMappingRunning = new AtomicBoolean(false); - - public ClickHouseWriter() { + private final SinkTaskStatistics statistics; + public ClickHouseWriter(SinkTaskStatistics statistics) { this.mapping = new HashMap(); + this.statistics = statistics; } protected void setClient(ClickHouseHelperClient chc) { @@ -200,7 +202,6 @@ public void doInsert(List records, QueryIdentifier queryId, ErrorReporte Table table = getTable(database, topic); if (table == null) { return; }//We checked the error flag in getTable, so we don't need to check it again here LOGGER.debug("Trying to insert [{}] records to table name [{}] (QueryId: [{}])", records.size(), table.getName(), queryId.getQueryId()); - switch (first.getSchemaType()) { case SCHEMA: if (csc.isBypassRowBinary()) { @@ -879,6 +880,8 @@ protected void doInsertRawBinaryV2(List records, Table table, QueryIdent } long s3 = System.currentTimeMillis(); LOGGER.info("topic: {} partition: {} batchSize: {} push stream ms: {} data ms: {} send ms: {} (QueryId: [{}])", topic, partition, records.size(), pushStreamTime,s2 - s1, s3 - s2, queryId.getQueryId()); + statistics.insertTime(s2 - s1, topic); + } protected void doInsertRawBinaryV1(List records, Table table, QueryIdentifier queryId, boolean supportDefaults) throws IOException, ExecutionException, InterruptedException { long s1 = System.currentTimeMillis(); @@ -934,6 +937,7 @@ protected void doInsertRawBinaryV1(List records, Table table, QueryIdent long s3 = System.currentTimeMillis(); LOGGER.info("topic :{} partition: {} batchSize: {} push stream ms: {} data ms: {} send ms: {} (QueryId: [{}])", topic, partition, records.size(), pushStreamTime,s2 - s1, s3 - s2, queryId.getQueryId()); + statistics.insertTime(s2 - s1, topic); } protected void doInsertJson(List records, Table table, QueryIdentifier queryId) throws IOException, ExecutionException, InterruptedException { @@ -993,12 +997,15 @@ protected void doInsertJsonV1(List records, Table table, QueryIdentifier long beforeSerialize = System.currentTimeMillis(); String gsonString = gson.toJson(cleanupExtraFields(data, table), gsonType); dataSerializeTime += System.currentTimeMillis() - beforeSerialize; + LOGGER.trace("topic {} partition {} offset {} payload {}", record.getTopic(), record.getRecordOffsetContainer().getPartition(), record.getRecordOffsetContainer().getOffset(), gsonString); - BinaryStreamUtils.writeBytes(stream, gsonString.getBytes(StandardCharsets.UTF_8)); + byte[] bytes = gsonString.getBytes(StandardCharsets.UTF_8); + statistics.bytesInserted(bytes.length); + BinaryStreamUtils.writeBytes(stream, bytes); } else { LOGGER.warn(String.format("Getting empty record skip the insert topic[%s] offset[%d]", record.getTopic(), record.getSinkRecord().kafkaOffset())); } @@ -1014,6 +1021,7 @@ protected void doInsertJsonV1(List records, Table table, QueryIdentifier } long s3 = System.currentTimeMillis(); LOGGER.info("topic: {} partition: {} batchSize: {} serialization ms: {} data ms: {} send ms: {} (QueryId: [{}])", topic, partition, records.size(), dataSerializeTime, s2 - s1, s3 - s2, queryId.getQueryId()); + statistics.insertTime(s2 - s1, topic); } protected void doInsertJsonV2(List records, Table table, QueryIdentifier queryId) throws IOException, ExecutionException, InterruptedException { @@ -1073,7 +1081,9 @@ protected void doInsertJsonV2(List records, Table table, QueryIdentifier record.getRecordOffsetContainer().getPartition(), record.getRecordOffsetContainer().getOffset(), gsonString); - BinaryStreamUtils.writeBytes(stream, gsonString.getBytes(StandardCharsets.UTF_8)); + byte[] bytes = gsonString.getBytes(StandardCharsets.UTF_8); + statistics.bytesInserted(bytes.length); + BinaryStreamUtils.writeBytes(stream, bytes); } else { LOGGER.warn(String.format("Getting empty record skip the insert topic[%s] offset[%d]", record.getTopic(), record.getSinkRecord().kafkaOffset())); } @@ -1086,6 +1096,7 @@ protected void doInsertJsonV2(List records, Table table, QueryIdentifier } long s3 = System.currentTimeMillis(); LOGGER.info("topic: {} partition: {} batchSize: {} serialization ms: {} data ms: {} send ms: {} (QueryId: [{}])", topic, partition, records.size(), dataSerializeTime, s2 - s1, s3 - s2, queryId.getQueryId()); + statistics.insertTime(s2 - s1, topic); } protected Map cleanupExtraFields(Map m, Table t) { @@ -1150,6 +1161,7 @@ protected void doInsertStringV1(List records, Table table, QueryIdentifi String data = (String)record.getSinkRecord().value(); LOGGER.debug(String.format("data: %s", data)); byte[] bytes = data.getBytes(StandardCharsets.UTF_8); + statistics.bytesInserted(bytes.length); long beforePushStream = System.currentTimeMillis(); BinaryStreamUtils.writeBytes(stream, bytes); pushStreamTime += System.currentTimeMillis() - beforePushStream; @@ -1176,6 +1188,7 @@ protected void doInsertStringV1(List records, Table table, QueryIdentifi } long s3 = System.currentTimeMillis(); LOGGER.info("topic: {} partition: {} batchSize: {} push stream ms: {} data ms: {} send ms: {} (QueryId: [{}])", topic, partition, records.size(), pushStreamTime, s2 - s1, s3 - s2, queryId.getQueryId()); + statistics.insertTime(s2 - s1, topic); } protected void doInsertStringV2(List records, Table table, QueryIdentifier queryId) throws IOException, ExecutionException, InterruptedException { byte[] endingLine = new byte[]{'\n'}; @@ -1224,6 +1237,7 @@ protected void doInsertStringV2(List records, Table table, QueryIdentifi String data = (String)record.getSinkRecord().value(); LOGGER.debug(String.format("data: %s", data)); byte[] bytes = data.getBytes(StandardCharsets.UTF_8); + statistics.bytesInserted(bytes.length); long beforePushStream = System.currentTimeMillis(); BinaryStreamUtils.writeBytes(stream, bytes); pushStreamTime += System.currentTimeMillis() - beforePushStream; @@ -1246,6 +1260,7 @@ protected void doInsertStringV2(List records, Table table, QueryIdentifi } long s3 = System.currentTimeMillis(); LOGGER.info("topic: {} partition: {} batchSize: {} push stream ms: {} data ms: {} send ms: {} (QueryId: [{}])", topic, partition, records.size(), pushStreamTime, s2 - s1, s3 - s2, queryId.getQueryId()); + statistics.insertTime(s2 - s1, topic); } diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/processing/Processing.java b/src/main/java/com/clickhouse/kafka/connect/sink/processing/Processing.java index 113c9c56..24145679 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/processing/Processing.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/processing/Processing.java @@ -7,11 +7,11 @@ import com.clickhouse.kafka.connect.sink.dlq.ErrorReporter; import com.clickhouse.kafka.connect.sink.kafka.RangeContainer; import com.clickhouse.kafka.connect.sink.kafka.RangeState; -import com.clickhouse.kafka.connect.sink.state.State; import com.clickhouse.kafka.connect.sink.state.StateProvider; import com.clickhouse.kafka.connect.sink.state.StateRecord; import com.clickhouse.kafka.connect.util.QueryIdentifier; import com.clickhouse.kafka.connect.util.Utils; +import com.clickhouse.kafka.connect.util.jmx.SinkTaskStatistics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,15 +28,17 @@ public class Processing { private StateProvider stateProvider = null; private DBWriter dbWriter = null; private ClickHouseSinkConfig clickHouseSinkConfig; - + private SinkTaskStatistics statistics; private ErrorReporter errorReporter = null; - public Processing(StateProvider stateProvider, DBWriter dbWriter, ErrorReporter errorReporter, ClickHouseSinkConfig clickHouseSinkConfig) { + public Processing(StateProvider stateProvider, DBWriter dbWriter, ErrorReporter errorReporter, + ClickHouseSinkConfig clickHouseSinkConfig, SinkTaskStatistics statistics) { this.stateProvider = stateProvider; this.dbWriter = dbWriter; this.errorReporter = errorReporter; this.clickHouseSinkConfig = clickHouseSinkConfig; + this.statistics = statistics; } /** * the logic is only for topic partition scoop @@ -49,14 +51,23 @@ private void doInsert(List records, RangeContainer rangeContainer) { LOGGER.trace("doInsert - No records to insert."); return; } - QueryIdentifier queryId = new QueryIdentifier(records.get(0).getRecordOffsetContainer().getTopic(), records.get(0).getRecordOffsetContainer().getPartition(), + + final Record firstRecord = records.get(0); + long eventReceiveLag = 0; + if (firstRecord.getSinkRecord().timestamp() != null) { + eventReceiveLag = System.currentTimeMillis() - firstRecord.getSinkRecord().timestamp(); + } + final String topic = firstRecord.getRecordOffsetContainer().getTopic(); + QueryIdentifier queryId = new QueryIdentifier(topic, records.get(0).getRecordOffsetContainer().getPartition(), rangeContainer.getMinOffset(), rangeContainer.getMaxOffset(), UUID.randomUUID().toString()); try { LOGGER.debug("doInsert - Records: [{}] - {}", records.size(), queryId); dbWriter.doInsert(records, queryId, errorReporter); + statistics.recordTopicStats(records.size(), queryId.getTopic(), eventReceiveLag); } catch (Exception e) { + statistics.recordTopicStatsOnFailure(records.size(), queryId.getTopic(), eventReceiveLag); throw new RuntimeException(queryId.toString(), e);//This way the queryId will propagate } } @@ -66,12 +77,21 @@ private void doInsert(List records) { LOGGER.trace("doInsert - No records to insert."); return; } - QueryIdentifier queryId = new QueryIdentifier(records.get(0).getRecordOffsetContainer().getTopic(), UUID.randomUUID().toString()); + + final Record firstRecord = records.get(0); + long eventReceiveLag = 0; + if (firstRecord.getSinkRecord().timestamp() != null) { + eventReceiveLag = System.currentTimeMillis() - firstRecord.getSinkRecord().timestamp(); + } + final String topic = firstRecord.getRecordOffsetContainer().getTopic(); + QueryIdentifier queryId = new QueryIdentifier(topic, UUID.randomUUID().toString()); try { LOGGER.info("doInsert - Records: [{}] - {}", records.size(), queryId); dbWriter.doInsert(records, queryId, errorReporter); + statistics.recordTopicStats(records.size(), topic, eventReceiveLag); } catch (Exception e) { + statistics.recordTopicStatsOnFailure(records.size(), topic, eventReceiveLag); throw new RuntimeException(queryId.toString(), e);//This way the queryId will propagate } } diff --git a/src/main/java/com/clickhouse/kafka/connect/util/jmx/SimpleMovingAverage.java b/src/main/java/com/clickhouse/kafka/connect/util/jmx/SimpleMovingAverage.java new file mode 100644 index 00000000..757d3ef1 --- /dev/null +++ b/src/main/java/com/clickhouse/kafka/connect/util/jmx/SimpleMovingAverage.java @@ -0,0 +1,37 @@ +package com.clickhouse.kafka.connect.util.jmx; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Not thread-safe Simple Moving Average implementation. + * This class accumulates sum of values and calculates average on demand. + * Values are stored in circular buffer to guarantee only last N values are used. + * It is needed to keep metric responsive after long periods of measurement. + * + */ +public class SimpleMovingAverage { + + public static final int DEFAULT_WINDOW_SIZE = 60; + + private final long[] values; + private final AtomicInteger head; + private final AtomicLong sum; + + public SimpleMovingAverage(int numOfValues) { + this.values = new long[numOfValues]; + this.head = new AtomicInteger(); + this.sum = new AtomicLong(); + } + + public void add(long value) { + int insertIndex = head.getAndIncrement() % values.length; + // update sum by subtracting the oldest value (at insertIndex) and adding new value. + sum.addAndGet(value - values[insertIndex]); + values[insertIndex] = value; + } + + public double get() { + return (double) sum.get() / values.length; + } +} diff --git a/src/main/java/com/clickhouse/kafka/connect/util/jmx/SinkTaskStatistics.java b/src/main/java/com/clickhouse/kafka/connect/util/jmx/SinkTaskStatistics.java index cb1fad1b..7faa320b 100644 --- a/src/main/java/com/clickhouse/kafka/connect/util/jmx/SinkTaskStatistics.java +++ b/src/main/java/com/clickhouse/kafka/connect/util/jmx/SinkTaskStatistics.java @@ -1,34 +1,166 @@ package com.clickhouse.kafka.connect.util.jmx; +import com.clickhouse.kafka.connect.sink.ProxySinkTask; +import com.clickhouse.kafka.connect.sink.Version; +import org.apache.kafka.connect.sink.SinkRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Deque; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicLong; + public class SinkTaskStatistics implements SinkTaskStatisticsMBean { - private volatile long receivedRecords; - private volatile long recordProcessingTime; - private volatile long taskProcessingTime; + private static final Logger LOGGER = LoggerFactory.getLogger(ProxySinkTask.class); + + private final AtomicLong receivedRecords; + private final AtomicLong recordProcessingTime; + private final AtomicLong taskProcessingTime; + private final AtomicLong insertedRecords; + private final AtomicLong failedRecords; + private final AtomicLong receivedBatches; + private final AtomicLong insertedBytes; + private final SimpleMovingAverage receiveLag; + private final int taskId; + private final Map topicStatistics; + private final Deque topicMBeans; + + + public SinkTaskStatistics(int taskId) { + this.taskId = taskId; + this.topicMBeans = new ConcurrentLinkedDeque<>(); + this.topicStatistics = new ConcurrentHashMap<>(); + + this.receivedRecords = new AtomicLong(0); + this.recordProcessingTime = new AtomicLong(0); + this.taskProcessingTime = new AtomicLong(0); + this.insertedRecords = new AtomicLong(0); + this.receivedBatches = new AtomicLong(0); + this.failedRecords = new AtomicLong(0); + this.receiveLag = new SimpleMovingAverage(SimpleMovingAverage.DEFAULT_WINDOW_SIZE); + this.insertedBytes = new AtomicLong(0); + } + + public void registerMBean() { + String name = getMBeanName(taskId); + LOGGER.info("Register MBean [{}]", name); + MBeanServerUtils.registerMBean(this, name); + } + + public void unregisterMBean() { + String name = getMBeanName(taskId); + LOGGER.info("Unregister MBean [{}]", name); + MBeanServerUtils.unregisterMBean(getMBeanName(taskId)); + for (String topicMBean : topicMBeans) { + LOGGER.info("Unregister topic MBean [{}]", topicMBean); + MBeanServerUtils.unregisterMBean(topicMBean); + } + } + @Override public long getReceivedRecords() { - return receivedRecords; + return receivedRecords.get(); } @Override public long getRecordProcessingTime() { - return recordProcessingTime; + return recordProcessingTime.get(); } @Override public long getTaskProcessingTime() { - return taskProcessingTime; + return taskProcessingTime.get(); + } + + @Override + public long getInsertedRecords() { + return insertedRecords.get(); } - public void receivedRecords(final int n ) { - this.receivedRecords += n; + @Override + public long getFailedRecords() { + return failedRecords.get(); + } + + @Override + public long getReceivedBatches() { + return receivedBatches.get(); + } + + + @Override + public long getMeanReceiveLag() { + return Double.valueOf(receiveLag.get()).longValue(); + } + + @Override + public long getInsertedBytes() { + return insertedBytes.get(); + } + + public void receivedRecords(final Collection records) { + this.receivedRecords.addAndGet(records.size()); + this.receivedBatches.addAndGet(1); + + try { + long receiveTime = System.currentTimeMillis(); + long eventTime = receiveTime; + Optional first = records.stream().findFirst(); + if (first.isPresent()) { + eventTime = first.get().timestamp(); + } + receiveLag.add(receiveTime - eventTime); + } catch (Exception e) { + LOGGER.warn("Failed to calculate receive lag", e); + } } public void recordProcessingTime(ExecutionTimer timer) { - this.recordProcessingTime += timer.nanosElapsed(); + this.recordProcessingTime.addAndGet(timer.nanosElapsed()); } public void taskProcessingTime(ExecutionTimer timer) { - this.taskProcessingTime += timer.nanosElapsed(); + this.taskProcessingTime.addAndGet(timer.nanosElapsed()); } + + public void recordTopicStats(int n, String table, long eventReceiveLag) { + insertedRecords.addAndGet(n); + topicStatistics.computeIfAbsent(table, this::createTopicStatistics).recordsInserted(n); + topicStatistics.computeIfAbsent(table, this::createTopicStatistics).batchInserted(1); + } + + public void recordTopicStatsOnFailure(int numOfRecords, String table, long eventReceiveLag) { + failedRecords.addAndGet(numOfRecords); + topicStatistics.computeIfAbsent(table, this::createTopicStatistics).recordsFailed(numOfRecords); + topicStatistics.computeIfAbsent(table, this::createTopicStatistics).batchesFailed(1); + } + + public void bytesInserted(long n) { + insertedBytes.addAndGet(n); + } + + private TopicStatistics createTopicStatistics(String topic) { + TopicStatistics topicStatistics = new TopicStatistics(); + String topicMBeanName = getTopicMBeanName(taskId, topic); + LOGGER.info("Register topic MBean [{}]", topicMBeanName); + topicMBeans.add(topicMBeanName); + return MBeanServerUtils.registerMBean(topicStatistics, topicMBeanName); + } + + public static String getMBeanName(int taskId) { + return String.format("com.clickhouse:type=ClickHouseKafkaConnector,name=SinkTask%d,version=%s", taskId, Version.ARTIFACT_VERSION); + } + + public static String getTopicMBeanName(int taskId, String topic) { + return String.format("com.clickhouse:type=ClickHouseKafkaConnector,name=SinkTask%d,version=%s,topic=%s", taskId, Version.ARTIFACT_VERSION, topic); + } + + public void insertTime(long t, String topic) { + topicStatistics.computeIfAbsent(topic, this::createTopicStatistics).insertTime(t); + } } diff --git a/src/main/java/com/clickhouse/kafka/connect/util/jmx/SinkTaskStatisticsMBean.java b/src/main/java/com/clickhouse/kafka/connect/util/jmx/SinkTaskStatisticsMBean.java index fae18d72..0ef0813b 100644 --- a/src/main/java/com/clickhouse/kafka/connect/util/jmx/SinkTaskStatisticsMBean.java +++ b/src/main/java/com/clickhouse/kafka/connect/util/jmx/SinkTaskStatisticsMBean.java @@ -2,10 +2,69 @@ public interface SinkTaskStatisticsMBean { + /** + * Total number of records received by a Sink Task instance. + * Implemented as counter and is reset on restart. + * + * @return counter value + */ long getReceivedRecords(); + /** + * Total time in milliseconds spent in analyzing records and grouping by topic. + * Implemented as counter and is reset on restart. + * + * @return counter value in milliseconds + */ long getRecordProcessingTime(); + /** + * Total time in milliseconds spent in whole include pre-processing and insert. + * Implemented as counter and is reset on restart. + * + * @return counter value in milliseconds + */ long getTaskProcessingTime(); + /** + * Total number of records inserted by Sink Task instance. + * Implemented as counter and is reset on restart. + * + * @return counter value + */ + long getInsertedRecords(); + + /** + * Total number of inserted bytes. Limited to Long.MAX_VALUE (8192 PB); + * Implemented as counter and is reset on restart. + * + * @return counter value + */ + long getInsertedBytes(); + + /** + * Total number of failed records. + * Implemented as counter and is reset on restart. + * + * @return counter value + */ + long getFailedRecords(); + + + /** + * Total number of received record batches by a Sink Task instance. + * Implemented as counter and is reset on restart. + * + * @return counter value + */ + long getReceivedBatches(); + + /** + * Mean receive lag calculated as receive by task time - record timestamp. + * Calculated using a first record timestamp in the batch. + * Implemented as simple moving average. Reset on restart. + * + * @return counter value + */ + long getMeanReceiveLag(); } diff --git a/src/main/java/com/clickhouse/kafka/connect/util/jmx/TopicStatistics.java b/src/main/java/com/clickhouse/kafka/connect/util/jmx/TopicStatistics.java new file mode 100644 index 00000000..ad904b90 --- /dev/null +++ b/src/main/java/com/clickhouse/kafka/connect/util/jmx/TopicStatistics.java @@ -0,0 +1,68 @@ +package com.clickhouse.kafka.connect.util.jmx; + +import java.util.concurrent.atomic.AtomicLong; + +public class TopicStatistics implements TopicStatisticsMBean { + + private final AtomicLong totalInsertedRecords; + private final AtomicLong totalNumberOfBatches; + private final AtomicLong totalFailedBatches; + private final AtomicLong totalFailedRecords; + + private final SimpleMovingAverage insertTime; + + public TopicStatistics() { + totalInsertedRecords = new AtomicLong(0); + totalNumberOfBatches = new AtomicLong(0); + totalFailedBatches = new AtomicLong(0); + totalFailedRecords = new AtomicLong(0); + insertTime = new SimpleMovingAverage(SimpleMovingAverage.DEFAULT_WINDOW_SIZE); + } + + + @Override + public long getTotalSuccessfulRecords() { + return totalInsertedRecords.get(); + } + + @Override + public long getTotalSuccessfulBatches() { + return totalNumberOfBatches.get(); + } + + + @Override + public long getMeanInsertTime() { + return Double.valueOf(insertTime.get()).longValue(); + } + + @Override + public long getTotalFailedBatches() { + return totalFailedBatches.get(); + } + + @Override + public long getTotalFailedRecords() { + return totalFailedRecords.get(); + } + + public void recordsInserted(long n) { + totalInsertedRecords.addAndGet(n); + } + + public void batchInserted(long n) { + totalNumberOfBatches.addAndGet(n); + } + + public void batchesFailed(long n) { + totalFailedBatches.addAndGet(n); + } + + public void recordsFailed(long n) { + totalFailedRecords.addAndGet(n); + } + + public void insertTime(long insertTime) { + this.insertTime.add(insertTime); + } +} diff --git a/src/main/java/com/clickhouse/kafka/connect/util/jmx/TopicStatisticsMBean.java b/src/main/java/com/clickhouse/kafka/connect/util/jmx/TopicStatisticsMBean.java new file mode 100644 index 00000000..0b22532f --- /dev/null +++ b/src/main/java/com/clickhouse/kafka/connect/util/jmx/TopicStatisticsMBean.java @@ -0,0 +1,46 @@ +package com.clickhouse.kafka.connect.util.jmx; + +public interface TopicStatisticsMBean { + + /** + * Total successful records in the topic. + * Implemented as counter. Reset on restart. + * + * @return counter value + */ + long getTotalSuccessfulRecords(); + + /** + * Total successful batches in the topic. + * Implemented as counter. Reset on restart. + * + * @return counter value + */ + long getTotalSuccessfulBatches(); + + /** + * Total failed batches in the topic. + * Implemented as counter. Reset on restart. + * + * @return counter value + */ + long getTotalFailedBatches(); + + /** + * Total failed records in the topic. + * Implemented as counter. Reset on restart. + * + * @return counter value + */ + long getTotalFailedRecords(); + + + /** + * Mean receive lag what is calculated as receive by task time - record timestamp. + * Calculated using a first record timestamp in the batch. + * Implemented as simple moving average. Reset on restart. + * + * @return counter value + */ + long getMeanInsertTime(); +} diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskTest.java index b4e1a64b..6121be3b 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskTest.java @@ -7,11 +7,10 @@ import com.clickhouse.client.ClickHouseResponse; import com.clickhouse.client.ClickHouseResponseSummary; import com.clickhouse.client.api.query.GenericRecord; -import com.clickhouse.client.api.query.Records; -import com.clickhouse.kafka.connect.ClickHouseSinkConnector; import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient; import com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers; import com.clickhouse.kafka.connect.sink.helper.SchemalessTestData; +import com.clickhouse.kafka.connect.util.jmx.SinkTaskStatistics; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import org.apache.kafka.common.record.TimestampType; @@ -19,10 +18,13 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import javax.management.InstanceNotFoundException; +import javax.management.MBeanServer; +import javax.management.ObjectName; import java.io.PrintWriter; import java.io.StringWriter; +import java.lang.management.ManagementFactory; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -210,4 +212,131 @@ public void clientNameTest() throws Exception { assertTrue(record.getString(1).startsWith(ClickHouseHelperClient.CONNECT_CLIENT_NAME)); } } + + @Test + public void statisticsTest() throws Exception { + Map props = createProps(); + props.put(ClickHouseSinkConfig.IGNORE_PARTITIONS_WHEN_BATCHING, "true"); + ClickHouseHelperClient chc = createClient(props); + String topic = createTopicName("topic.statistics_test-01"); + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` ( `off16` Int16, `str` String, `p_int8` Int8, `p_int16` Int16, `p_int32` Int32, " + + "`p_int64` Int64, `p_float32` Float32, `p_float64` Float64, `p_bool` Bool) Engine = MergeTree ORDER BY off16"); + Collection sr = SchemalessTestData.createPrimitiveTypes(topic, 1); + sr.addAll(SchemalessTestData.createPrimitiveTypes(topic, 2)); + sr.addAll(SchemalessTestData.createPrimitiveTypes(topic, 3)); + + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + final int taskId = chst.taskId(); + chst.put(sr); + Thread.sleep(5000); + chst.put(sr); + + final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + + // Task metrics + final String mbeanName = SinkTaskStatistics.getMBeanName(taskId); + ObjectName sinkMBean = new ObjectName(mbeanName); + Object receivedRecords = mBeanServer.getAttribute(sinkMBean, "ReceivedRecords"); + assertEquals(sr.size() * 2L, ((Long)receivedRecords).longValue()); + Object totalProcessingTime = mBeanServer.getAttribute(sinkMBean, "RecordProcessingTime"); + assertTrue((Long)totalProcessingTime > 1000L); + Object totalTaskProcessingTime = mBeanServer.getAttribute(sinkMBean, "TaskProcessingTime"); + assertTrue((Long)totalTaskProcessingTime > 1000L); + Object totalInsertedRecords = mBeanServer.getAttribute(sinkMBean, "InsertedRecords"); + assertEquals(sr.size() * 2L, ((Long)totalInsertedRecords).longValue()); + Object receivedBatches = mBeanServer.getAttribute(sinkMBean, "ReceivedBatches"); + assertEquals(2, ((Long)receivedBatches).longValue()); + Object failedRecords = mBeanServer.getAttribute(sinkMBean, "FailedRecords"); + assertEquals(0, ((Long)failedRecords).longValue()); + Object eventReceiveLag = mBeanServer.getAttribute(sinkMBean, "MeanReceiveLag"); + assertTrue((Long)eventReceiveLag > 0); + Object insertedBytes = mBeanServer.getAttribute(sinkMBean, "InsertedBytes"); + assertTrue((Long)insertedBytes >= 872838); + + + // Topic metrics + final ObjectName topicMbeanName = new ObjectName(SinkTaskStatistics.getTopicMBeanName(taskId, topic)); + Object insertedRecords = mBeanServer.getAttribute(topicMbeanName, "TotalSuccessfulRecords"); + assertEquals(sr.size() * 2L, ((Long)insertedRecords).longValue()); + Object insertedBatches = mBeanServer.getAttribute(topicMbeanName, "TotalSuccessfulBatches"); + assertEquals(2, ((Long)insertedBatches).longValue()); + + Object insertTime = mBeanServer.getAttribute(topicMbeanName, "MeanInsertTime"); + assertTrue((Long)insertTime >= 0); + + Object failedTopicRecords = mBeanServer.getAttribute(topicMbeanName, "TotalFailedRecords"); + assertEquals(0, ((Long)failedTopicRecords).longValue()); + Object failedTopicBatches = mBeanServer.getAttribute(topicMbeanName, "TotalFailedBatches"); + assertEquals(0, ((Long)failedTopicBatches).longValue()); + + chst.stop(); + + assertThrows(InstanceNotFoundException.class, () -> mBeanServer.getMBeanInfo(sinkMBean)); + assertThrows(InstanceNotFoundException.class, () -> mBeanServer.getMBeanInfo(topicMbeanName)); + } + + @Test + public void receiveLagTimeTest() throws Exception { + Map props = createProps(); + props.put(ClickHouseSinkConfig.IGNORE_PARTITIONS_WHEN_BATCHING, "true"); + ClickHouseHelperClient chc = createClient(props); + String topic = createTopicName("schemaless_simple_batch_test"); + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `str` String, `p_int8` Int8, `p_int16` Int16, `p_int32` Int32, " + + "`p_int64` Int64, `p_float32` Float32, `p_float64` Float64, `p_bool` Bool) Engine = MergeTree ORDER BY off16"); + + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + final int taskId = chst.taskId(); + int n = 40; + for (int i = 0; i < n; i++) { + + long k; + if ( i < n * 0.25) { + k = 2000; + } else if ( i < n * 0.50) { + k = 3000; + } else { + k = 500; + } + + List sr = SchemalessTestData.createPrimitiveTypes(topic, 1); + SinkRecord first = sr.get(0); + long createTime = System.currentTimeMillis() - k; + first = first.newRecord(first.topic(), first.kafkaPartition(), first.keySchema(), first.key(), first.valueSchema(), + first.value(), createTime); + sr.set(0, first); + chst.put(sr); + + Thread.sleep(1000); + } + MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + + ObjectName topicMbeanName = new ObjectName(SinkTaskStatistics.getMBeanName(taskId)); + Object eventReceiveLag = mBeanServer.getAttribute(topicMbeanName, "MeanReceiveLag"); + assertTrue((Long)eventReceiveLag < 2000L); + assertTrue((Long)eventReceiveLag > 400L, "eventReceiveLag: " + eventReceiveLag); + + for (int i = 0; i < n; i++) { + + long k = 300; + List sr = SchemalessTestData.createPrimitiveTypes(topic, 1); + SinkRecord first = sr.get(0); + long createTime = System.currentTimeMillis() - k; + first = first.newRecord(first.topic(), first.kafkaPartition(), first.keySchema(), first.key(), first.valueSchema(), + first.value(), createTime); + sr.set(0, first); + chst.put(sr); + + Thread.sleep(1000); + } + + eventReceiveLag = mBeanServer.getAttribute(topicMbeanName, "MeanReceiveLag"); + assertTrue((Long)eventReceiveLag < 1000L, "eventReceiveLag: " + eventReceiveLag); + assertTrue((Long)eventReceiveLag > 300L, "eventReceiveLag: " + eventReceiveLag); + + chst.stop(); + } } diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriterTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriterTest.java index a44d2163..33bec17a 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriterTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriterTest.java @@ -13,6 +13,7 @@ import com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers; import com.clickhouse.kafka.connect.test.junit.extension.FromVersionConditionExtension; import com.clickhouse.kafka.connect.util.Utils; +import com.clickhouse.kafka.connect.util.jmx.SinkTaskStatistics; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; @@ -46,7 +47,7 @@ public void setUp() { @Test public void writeUTF8StringPrimitive() throws IOException { - ClickHouseWriter writer = new ClickHouseWriter(); + ClickHouseWriter writer = new ClickHouseWriter(new SinkTaskStatistics(0)); Column column = Column.extractColumn(newDescriptor("utf8String", "String")); ClickHousePipedOutputStream out = new ClickHousePipedOutputStream(null) { List bytes = new ArrayList<>(); @@ -107,7 +108,7 @@ public void updateMapping() { ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16 ) Engine = MergeTree ORDER BY off16"); - ClickHouseWriter chw = new ClickHouseWriter(); + ClickHouseWriter chw = new ClickHouseWriter(new SinkTaskStatistics(0)); chw.setSinkConfig(createConfig()); chw.setClient(chc); @@ -134,7 +135,7 @@ public void doWriteColValue_Tuples() { ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s (`_id` String, `result` Tuple(`id` String, `isanswered` Int32, `relevancescore` Float64, `subject` String, `istextanswered` Int32 )) Engine = MergeTree ORDER BY _id"); - ClickHouseWriter chw = new ClickHouseWriter(); + ClickHouseWriter chw = new ClickHouseWriter(new SinkTaskStatistics(0)); chw.setSinkConfig(createConfig()); chw.setClient(chc); diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/processing/ProcessingTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/processing/ProcessingTest.java index 12bfd826..566693b1 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/processing/ProcessingTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/processing/ProcessingTest.java @@ -11,9 +11,12 @@ import com.clickhouse.kafka.connect.sink.state.StateProvider; import com.clickhouse.kafka.connect.sink.state.StateRecord; import com.clickhouse.kafka.connect.sink.state.provider.InMemoryState; +import com.clickhouse.kafka.connect.util.jmx.SinkTaskStatistics; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.Assert; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -51,13 +54,27 @@ private List createRecords(String database, String topic, int partition) return records; } + SinkTaskStatistics sinkTaskStatistics; + + @BeforeEach + void setupEach() { + sinkTaskStatistics = new SinkTaskStatistics(0); + sinkTaskStatistics.registerMBean(); + } + + @AfterEach + void tearDownEach() { + sinkTaskStatistics.unregisterMBean(); + sinkTaskStatistics = null; + } + @Test @DisplayName("ProcessAllAtOnceNewTest") public void ProcessAllAtOnceNewTest() throws IOException, ExecutionException, InterruptedException { List records = createRecords("test", 1); StateProvider stateProvider = new InMemoryState(); DBWriter dbWriter = new InMemoryDBWriter(); - Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>())); + Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()), sinkTaskStatistics); processing.doLogic(records); assertEquals(records.size(), dbWriter.recordsInserted()); } @@ -72,7 +89,7 @@ public void ProcessSplitNewTest() throws IOException, ExecutionException, Interr assertEquals(records.size(), recordsHead.size() + recordsTail.size()); StateProvider stateProvider = new InMemoryState(); DBWriter dbWriter = new InMemoryDBWriter(); - Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>())); + Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()), sinkTaskStatistics); processing.doLogic(recordsHead); assertEquals(recordsHead.size(), dbWriter.recordsInserted()); processing.doLogic(recordsTail); @@ -85,7 +102,7 @@ public void ProcessAllNewTwiceTest() throws IOException, ExecutionException, Int List records = createRecords("test", 1); StateProvider stateProvider = new InMemoryState(); DBWriter dbWriter = new InMemoryDBWriter(); - Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>())); + Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()), sinkTaskStatistics); processing.doLogic(records); assertEquals(records.size(), dbWriter.recordsInserted()); processing.doLogic(records); @@ -101,7 +118,7 @@ public void ProcessAllNewFailedSetStateAfterProcessingTest() throws IOException, //List recordsTail = records.subList(splitPoint, records.size()); StateProvider stateProvider = new InMemoryState(); DBWriter dbWriter = new InMemoryDBWriter(); - Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>())); + Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()), sinkTaskStatistics); processing.doLogic(recordsHead); assertEquals(recordsHead.size(), dbWriter.recordsInserted()); StateRecord stateRecord = stateProvider.getStateRecord("test", 1); @@ -117,7 +134,7 @@ public void ProcessContainsBeforeProcessingTest() throws IOException, ExecutionE List containsRecords = records.subList(345,850); StateProvider stateProvider = new InMemoryState(); DBWriter dbWriter = new InMemoryDBWriter(); - Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>())); + Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()), sinkTaskStatistics); processing.doLogic(records); assertEquals(records.size(), dbWriter.recordsInserted()); StateRecord stateRecord = stateProvider.getStateRecord("test", 1); @@ -132,7 +149,7 @@ public void ProcessContainsAfterProcessingTest() throws IOException, ExecutionEx List containsRecords = records.subList(345,850); StateProvider stateProvider = new InMemoryState(); DBWriter dbWriter = new InMemoryDBWriter(); - Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>())); + Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()), sinkTaskStatistics); processing.doLogic(records); assertEquals(records.size(), dbWriter.recordsInserted()); processing.doLogic(containsRecords); @@ -147,7 +164,7 @@ public void ProcessOverlappingBeforeProcessingTest() throws IOException, Executi List containsRecords = records.subList(345,850); StateProvider stateProvider = new InMemoryState(); DBWriter dbWriter = new InMemoryDBWriter(); - Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>())); + Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()), sinkTaskStatistics); processing.doLogic(records); assertEquals(records.size(), dbWriter.recordsInserted()); processing.doLogic(containsRecords); @@ -165,7 +182,7 @@ public void ProcessSplitNewWithBeforeProcessingTest() throws IOException, Execut assertEquals(records.size(), recordsHead.size() + recordsTail.size()); StateProvider stateProvider = new InMemoryState(); DBWriter dbWriter = new InMemoryDBWriter(); - Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>())); + Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()), sinkTaskStatistics); processing.doLogic(recordsHead); assertEquals(recordsHead.size(), dbWriter.recordsInserted()); StateRecord stateRecord = stateProvider.getStateRecord("test", 1); @@ -182,7 +199,7 @@ public void ProcessDeletedTopicBeforeProcessingTest() throws IOException, Execut List containsRecords = records.subList(0,150); StateProvider stateProvider = new InMemoryState(); DBWriter dbWriter = new InMemoryDBWriter(); - Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>())); + Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()), sinkTaskStatistics); processing.doLogic(records); assertEquals(records.size(), dbWriter.recordsInserted()); StateRecord stateRecord = stateProvider.getStateRecord("test", 1); @@ -199,7 +216,7 @@ public void ProcessingWithDLQTest() throws IOException, ExecutionException, Inte StateProvider stateProvider = new InMemoryState(); DBWriter dbWriter = new InMemoryDBWriter(); ClickHouseSinkConfig clickHouseSinkConfig = new ClickHouseSinkConfig(new HashMap<>()); - Processing processing = new Processing(stateProvider, dbWriter, er, clickHouseSinkConfig); + Processing processing = new Processing(stateProvider, dbWriter, er, clickHouseSinkConfig, sinkTaskStatistics); processing.doLogic(records); assertEquals(records.size(), dbWriter.recordsInserted()); StateRecord stateRecord = stateProvider.getStateRecord("test", 1); @@ -217,7 +234,7 @@ public void ProcessPartialOverlappingBeforeProcessingTest() throws IOException, List recordsTail = records.subList(splitPointLow, records.size()); StateProvider stateProvider = new InMemoryState(); DBWriter dbWriter = new InMemoryDBWriter(); - Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>())); + Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()), sinkTaskStatistics); processing.doLogic(recordsHead); assertEquals(recordsHead.size(), dbWriter.recordsInserted()); StateRecord stateRecord = stateProvider.getStateRecord("test", 1); @@ -236,7 +253,7 @@ public void ProcessPartialOverlappingAfterProcessingTest() throws IOException, E List recordsTail = records.subList(splitPointLow, records.size()); StateProvider stateProvider = new InMemoryState(); DBWriter dbWriter = new InMemoryDBWriter(); - Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>())); + Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()), sinkTaskStatistics); processing.doLogic(recordsHead); assertEquals(recordsHead.size(), dbWriter.recordsInserted()); processing.doLogic(recordsTail); @@ -251,13 +268,13 @@ public void ProcessOldRecordsTest() throws IOException, ExecutionException, Inte StateProvider stateProvider = new InMemoryState(); stateProvider.setStateRecord(new StateRecord("test", 1, 5000, 4000, State.AFTER_PROCESSING)); DBWriter dbWriter = new InMemoryDBWriter(); - Processing processingWithoutConfig = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>())); + Processing processingWithoutConfig = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()), sinkTaskStatistics); Assert.assertThrows(RuntimeException.class, () -> processingWithoutConfig.doLogic(recordsHead)); HashMap config = new HashMap<>(); config.put(ClickHouseSinkConfig.TOLERATE_STATE_MISMATCH, "true"); ClickHouseSinkConfig clickHouseConfig = new ClickHouseSinkConfig(config); - Processing processing = new Processing(stateProvider, dbWriter, null, clickHouseConfig); + Processing processing = new Processing(stateProvider, dbWriter, null, clickHouseConfig, sinkTaskStatistics); processing.doLogic(recordsHead); assertEquals(0, dbWriter.recordsInserted()); } diff --git a/src/test/java/com/clickhouse/kafka/connect/util/jmx/SimpleMovingAverageTest.java b/src/test/java/com/clickhouse/kafka/connect/util/jmx/SimpleMovingAverageTest.java new file mode 100644 index 00000000..28543a51 --- /dev/null +++ b/src/test/java/com/clickhouse/kafka/connect/util/jmx/SimpleMovingAverageTest.java @@ -0,0 +1,65 @@ +package com.clickhouse.kafka.connect.util.jmx; + +import org.junit.jupiter.api.Test; + +import java.util.Random; + +import static org.junit.jupiter.api.Assertions.*; + +class SimpleMovingAverageTest { + + @Test + void testAverage() { + SimpleMovingAverage sma = new SimpleMovingAverage(60); + + long[] values = new long[60]; + for (int i = 0; i < values.length; i++) { + values[i] = (i < values.length / 2) ? 500 : 1000; + } + + for (long value : values) { + sma.add(value); + } + + double sma_val1 = sma.get(); + assertTrue(sma_val1 > 500); + + for (int i = 0; i < values.length; i++) { + values[i] = (i < values.length / 2) ? 1000 : 300; + } + + for (long value : values) { + sma.add(value); + } + + assertTrue(sma.get() < sma_val1); + } + + + @Test + void testManyValues() { + SimpleMovingAverage sma = new SimpleMovingAverage(1000); + + long[] values = new long[100000]; + Random random = new Random(); + for (int i = 0; i < values.length; i++) { + values[i] = random.nextInt(1000); + } + + for (long value : values) { + sma.add(value); + } + + double sma_val1 = sma.get(); + for (int i = 0; i < values.length; i++) { + values[i] = random.nextInt(1000); + } + + for (long value : values) { + sma.add(value); + } + + double sma_val2 = sma.get(); + assertTrue(sma_val1 != sma_val2); + } +} \ No newline at end of file diff --git a/src/testFixtures/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseTestHelpers.java b/src/testFixtures/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseTestHelpers.java index a38f7853..139f5848 100644 --- a/src/testFixtures/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseTestHelpers.java +++ b/src/testFixtures/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseTestHelpers.java @@ -227,8 +227,10 @@ public static int countRows(ClickHouseHelperClient chc, String tableName) { } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { + LOGGER.error("Error while counting rows. Query was " + queryCount, e); throw new RuntimeException(e); } catch (Exception e) { + LOGGER.error("Error while counting rows. Query was " + queryCount, e); throw new RuntimeException(e); } }