Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,16 @@
</developer>
</developers>
<dependencies>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>4.2.30</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-jmx</artifactId>
<version>4.2.30</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>kusto-data</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConfig.BehaviorOnError;
import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriter;
Expand All @@ -27,6 +29,7 @@
import com.microsoft.azure.kusto.kafka.connect.sink.formatWriter.ByteRecordWriterProvider;
import com.microsoft.azure.kusto.kafka.connect.sink.formatWriter.JsonRecordWriterProvider;
import com.microsoft.azure.kusto.kafka.connect.sink.formatWriter.StringRecordWriterProvider;
import com.microsoft.azure.kusto.kafka.connect.sink.metrics.KustoKafkaMetricsUtil;

/**
* This class is used to write gzipped rolling files.
Expand Down Expand Up @@ -58,6 +61,14 @@ public class FileWriter implements Closeable {
private boolean stopped = false;
private boolean isDlqEnabled = false;

private Counter fileCountOnStage;
private Counter fileCountPurged;
private Counter bufferSizeBytes;
private Counter bufferRecordCount;
private Counter flushedOffset;
private Counter purgedOffset;
private Counter failedTempFileDeletions;

/**
* @param basePath - This is path to which to write the files to.
* @param fileThreshold - Max size, uncompressed bytes.
Expand All @@ -73,7 +84,9 @@ public FileWriter(String basePath,
ReentrantReadWriteLock reentrantLock,
IngestionProperties.DataFormat format,
BehaviorOnError behaviorOnError,
boolean isDlqEnabled) {
boolean isDlqEnabled,
String tpname,
MetricRegistry metricRegistry) {
this.getFilePath = getFilePath;
this.basePath = basePath;
this.fileThreshold = fileThreshold;
Expand All @@ -86,6 +99,17 @@ public FileWriter(String basePath,
// If we failed on flush we want to throw the error from the put() flow.
flushError = null;
this.format = format;
initializeMetrics(tpname, metricRegistry);
}

private void initializeMetrics(String tpname, MetricRegistry metricRegistry) {
this.fileCountOnStage = metricRegistry.counter(KustoKafkaMetricsUtil.constructMetricName(tpname, KustoKafkaMetricsUtil.FILE_COUNT_SUB_DOMAIN, KustoKafkaMetricsUtil.FILE_COUNT_ON_STAGE));
this.fileCountPurged = metricRegistry.counter(KustoKafkaMetricsUtil.constructMetricName(tpname, KustoKafkaMetricsUtil.FILE_COUNT_SUB_DOMAIN, KustoKafkaMetricsUtil.FILE_COUNT_PURGED));
this.bufferSizeBytes = metricRegistry.counter(KustoKafkaMetricsUtil.constructMetricName(tpname, KustoKafkaMetricsUtil.BUFFER_SUB_DOMAIN, KustoKafkaMetricsUtil.BUFFER_SIZE_BYTES));
this.bufferRecordCount = metricRegistry.counter(KustoKafkaMetricsUtil.constructMetricName(tpname, KustoKafkaMetricsUtil.BUFFER_SUB_DOMAIN, KustoKafkaMetricsUtil.BUFFER_RECORD_COUNT));
this.flushedOffset = metricRegistry.counter(KustoKafkaMetricsUtil.constructMetricName(tpname, KustoKafkaMetricsUtil.OFFSET_SUB_DOMAIN, KustoKafkaMetricsUtil.FLUSHED_OFFSET));
this.purgedOffset = metricRegistry.counter(KustoKafkaMetricsUtil.constructMetricName(tpname, KustoKafkaMetricsUtil.OFFSET_SUB_DOMAIN, KustoKafkaMetricsUtil.PURGED_OFFSET));
this.failedTempFileDeletions = metricRegistry.counter(KustoKafkaMetricsUtil.constructMetricName(tpname, KustoKafkaMetricsUtil.FILE_COUNT_SUB_DOMAIN, KustoKafkaMetricsUtil.FAILED_TEMP_FILE_DELETIONS));
}

boolean isDirty() {
Expand Down Expand Up @@ -153,6 +177,7 @@ public void openFile(@Nullable Long offset) throws IOException {
countingStream = new CountingOutputStream(new GZIPOutputStream(fos));
outputStream = countingStream.getOutputStream();
recordWriter = recordWriterProvider.getRecordWriter(currentFile.path, countingStream);
fileCountOnStage.inc();
}

void rotate(@Nullable Long offset) throws IOException, DataException {
Expand Down Expand Up @@ -211,6 +236,11 @@ private void dumpFile() throws IOException {
boolean deleted = temp.file.delete();
if (!deleted) {
log.warn("Couldn't delete temporary file. File exists: {}", temp.file.exists());
failedTempFileDeletions.inc();
}else {
fileCountPurged.inc();
purgedOffset.inc(flushedOffset.getCount());
fileCountOnStage.dec();
}
}
}
Expand Down Expand Up @@ -297,6 +327,16 @@ public void writeData(SinkRecord sinkRecord) throws IOException, DataException {
}
currentFile.rawBytes = countingStream.numBytes;
currentFile.numRecords++;
synchronized (bufferSizeBytes) {
bufferSizeBytes.dec(bufferSizeBytes.getCount()); // Reset the counter to zero
bufferSizeBytes.inc(currentFile.rawBytes); // Set the counter to the current size
}
synchronized (bufferRecordCount) {
bufferRecordCount.dec(bufferRecordCount.getCount()); // Reset the counter to zero
bufferRecordCount.inc(currentFile.numRecords); // Set the counter to the current number of records
}
flushedOffset.inc();

if (this.flushInterval == 0 || currentFile.rawBytes > fileThreshold || shouldWriteAvroAsBytes) {
rotate(sinkRecord.kafkaOffset());
resetFlushTimer(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.IOException;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
Expand All @@ -25,6 +26,7 @@
import com.azure.core.credential.TokenRequestContext;
import com.azure.identity.WorkloadIdentityCredential;
import com.azure.identity.WorkloadIdentityCredentialBuilder;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.microsoft.azure.kusto.data.*;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
Expand All @@ -35,6 +37,8 @@
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionMapping;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.kafka.connect.sink.metrics.KustoKafkaMetricsJmxReporter;
import com.microsoft.azure.kusto.kafka.connect.sink.metrics.KustoKafkaMetricsUtil;

/**
* Kusto sink uses file system to buffer records.
Expand Down Expand Up @@ -63,6 +67,8 @@ public class KustoSinkTask extends SinkTask {
private boolean isDlqEnabled;
private String dlqTopicName;
private Producer<byte[], byte[]> dlqProducer;
private MetricRegistry metricRegistry;
private KustoKafkaMetricsJmxReporter jmxReporter;

public KustoSinkTask() {
assignment = new HashSet<>();
Expand Down Expand Up @@ -371,7 +377,7 @@ public void open(Collection<TopicPartition> partitions) {
} else {
IngestClient client = ingestionProps.streaming ? streamingIngestClient : kustoIngestClient;
TopicPartitionWriter writer = new TopicPartitionWriter(tp, client, ingestionProps, config, isDlqEnabled,
dlqTopicName, dlqProducer);
dlqTopicName, dlqProducer, metricRegistry);
writer.open();
writers.put(tp, writer);
}
Expand Down Expand Up @@ -435,11 +441,32 @@ public void start(Map<String, String> props) {
if (context != null) {
open(context.assignment());
}

// Initialize metricRegistry and JmxReporter
metricRegistry = new MetricRegistry();
jmxReporter = new KustoKafkaMetricsJmxReporter(metricRegistry, "KustoSinkConnector");
jmxReporter.start();
log.info("JmxReporter started for KustoSinkConnector");

// Register metrics
if (context != null) {
for (TopicPartition tp : context.assignment()) {
metricRegistry.counter(KustoKafkaMetricsUtil.constructMetricName(
tp.topic(), KustoKafkaMetricsUtil.OFFSET_SUB_DOMAIN, KustoKafkaMetricsUtil.COMMITTED_OFFSET));
metricRegistry.timer(KustoKafkaMetricsUtil.constructMetricName(
tp.topic(), KustoKafkaMetricsUtil.LATENCY_SUB_DOMAIN, KustoKafkaMetricsUtil.EventType.KAFKA_LAG.getMetricName()));
}
}
}

@Override
public void stop() {
log.warn("Stopping KustoSinkTask");
// Unregister metrics
if (jmxReporter != null) {
jmxReporter.removeMetricsFromRegistry("KustoSinkConnector");
jmxReporter.stop();
}
// First stop so that no more ingestions trigger from timer flushes
for (TopicPartitionWriter writer : writers.values()) {
writer.stop();
Expand All @@ -455,6 +482,7 @@ public void stop() {
} catch (IOException e) {
log.error("Error closing kusto client", e);
}

}

@Override
Expand All @@ -477,6 +505,13 @@ public void put(Collection<SinkRecord> records) {
} else {
writer.writeRecord(sinkRecord);
}
Long timestamp = sinkRecord.timestamp();
if (timestamp != null) {
long kafkaLagValue = System.currentTimeMillis() - timestamp;
metricRegistry.timer(KustoKafkaMetricsUtil.constructMetricName(
sinkRecord.topic(), KustoKafkaMetricsUtil.LATENCY_SUB_DOMAIN, KustoKafkaMetricsUtil.EventType.KAFKA_LAG.getMetricName()))
.update(kafkaLagValue, TimeUnit.MILLISECONDS);
}
}
if (lastRecord != null) {
log.debug("Last record offset: {}", lastRecord.kafkaOffset());
Expand All @@ -503,6 +538,8 @@ public Map<TopicPartition, OffsetAndMetadata> preCommit(
log.debug("Forwarding to framework request to commit offset: {} for {} while the offset is {}", offset,
tp, offsets.get(tp));
offsetsToCommit.put(tp, new OffsetAndMetadata(offset));
metricRegistry.counter(KustoKafkaMetricsUtil.constructMetricName(
tp.topic(), KustoKafkaMetricsUtil.OFFSET_SUB_DOMAIN, KustoKafkaMetricsUtil.COMMITTED_OFFSET)).inc();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.microsoft.azure.kusto.data.exceptions.KustoDataExceptionBase;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.ManagedStreamingIngestClient;
Expand All @@ -33,6 +36,7 @@
import com.microsoft.azure.kusto.ingest.result.IngestionStatusResult;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConfig.BehaviorOnError;
import com.microsoft.azure.kusto.kafka.connect.sink.metrics.KustoKafkaMetricsUtil;

public class TopicPartitionWriter {

Expand All @@ -56,9 +60,22 @@ public class TopicPartitionWriter {
long currentOffset;
Long lastCommittedOffset;
private final ReentrantReadWriteLock reentrantReadWriteLock;
private final MetricRegistry metricRegistry;

private Counter fileCountOnIngestion;
private Counter fileCountTableStageIngestionFail;
private Counter dlqRecordCount;
private Counter ingestionErrorCount;
private Counter ingestionSuccessCount;
private Counter processedOffset;
private Counter committedOffset;
private Timer commitLag;
private Timer ingestionLag;
private long writeTime;


TopicPartitionWriter(TopicPartition tp, IngestClient client, TopicIngestionProperties ingestionProps,
KustoSinkConfig config, boolean isDlqEnabled, String dlqTopicName, Producer<byte[], byte[]> dlqProducer) {
KustoSinkConfig config, boolean isDlqEnabled, String dlqTopicName, Producer<byte[], byte[]> dlqProducer, MetricRegistry metricRegistry) {
this.tp = tp;
this.client = client;
this.ingestionProps = ingestionProps;
Expand All @@ -73,6 +90,20 @@ public class TopicPartitionWriter {
this.isDlqEnabled = isDlqEnabled;
this.dlqTopicName = dlqTopicName;
this.dlqProducer = dlqProducer;
this.metricRegistry = metricRegistry;
initializeMetrics(tp.topic(), metricRegistry);
}

private void initializeMetrics(String topic, MetricRegistry metricRegistry) {
this.fileCountOnIngestion = metricRegistry.counter(KustoKafkaMetricsUtil.constructMetricName(topic, KustoKafkaMetricsUtil.FILE_COUNT_SUB_DOMAIN, KustoKafkaMetricsUtil.FILE_COUNT_ON_INGESTION));
this.fileCountTableStageIngestionFail = metricRegistry.counter(KustoKafkaMetricsUtil.constructMetricName(topic, KustoKafkaMetricsUtil.FILE_COUNT_SUB_DOMAIN, KustoKafkaMetricsUtil.FILE_COUNT_TABLE_STAGE_INGESTION_FAIL));
this.dlqRecordCount = metricRegistry.counter(KustoKafkaMetricsUtil.constructMetricName(topic, KustoKafkaMetricsUtil.DLQ_SUB_DOMAIN, KustoKafkaMetricsUtil.DLQ_RECORD_COUNT));
this.ingestionErrorCount = metricRegistry.counter(KustoKafkaMetricsUtil.constructMetricName(topic, KustoKafkaMetricsUtil.DLQ_SUB_DOMAIN, KustoKafkaMetricsUtil.INGESTION_ERROR_COUNT));
this.ingestionSuccessCount = metricRegistry.counter(KustoKafkaMetricsUtil.constructMetricName(topic, KustoKafkaMetricsUtil.DLQ_SUB_DOMAIN, KustoKafkaMetricsUtil.INGESTION_SUCCESS_COUNT));
this.processedOffset = metricRegistry.counter(KustoKafkaMetricsUtil.constructMetricName(topic, KustoKafkaMetricsUtil.OFFSET_SUB_DOMAIN, KustoKafkaMetricsUtil.PROCESSED_OFFSET));
this.committedOffset = metricRegistry.counter(KustoKafkaMetricsUtil.constructMetricName(topic, KustoKafkaMetricsUtil.OFFSET_SUB_DOMAIN, KustoKafkaMetricsUtil.COMMITTED_OFFSET));
this.commitLag = metricRegistry.timer(KustoKafkaMetricsUtil.constructMetricName(topic, KustoKafkaMetricsUtil.LATENCY_SUB_DOMAIN, KustoKafkaMetricsUtil.EventType.COMMIT_LAG.getMetricName()));
this.ingestionLag = metricRegistry.timer(KustoKafkaMetricsUtil.constructMetricName(topic, KustoKafkaMetricsUtil.LATENCY_SUB_DOMAIN, KustoKafkaMetricsUtil.EventType.INGESTION_LAG.getMetricName()));
}

static String getTempDirectoryName(String tempDirPath) {
Expand All @@ -83,13 +114,20 @@ static String getTempDirectoryName(String tempDirPath) {

public void handleRollFile(SourceFile fileDescriptor) {
FileSourceInfo fileSourceInfo = new FileSourceInfo(fileDescriptor.path, fileDescriptor.rawBytes);

if (writeTime == 0) {
log.warn("writeTime is not initialized properly before invoking handleRollFile. Setting it to the current time.");
writeTime = System.currentTimeMillis(); // Initialize writeTime if not already set
}
/*
* Since retries can be for a longer duration the Kafka Consumer may leave the group. This will result in a new Consumer reading records from the last
* committed offset leading to duplication of records in KustoDB. Also, if the error persists, it might also result in duplicate records being written
* into DLQ topic. Recommendation is to set the following worker configuration as `connector.client.config.override.policy=All` and set the
* `consumer.override.max.poll.interval.ms` config to a high enough value to avoid consumer leaving the group while the Connector is retrying.
*/
fileCountOnIngestion.inc();
long uploadStartTime = System.currentTimeMillis(); // Record the start time of file upload
commitLag.update(uploadStartTime - writeTime, TimeUnit.MILLISECONDS);

for (int retryAttempts = 0; true; retryAttempts++) {
try {
IngestionResult ingestionResult = client.ingestFromFile(fileSourceInfo, ingestionProps.ingestionProperties);
Expand All @@ -112,8 +150,16 @@ public void handleRollFile(SourceFile fileDescriptor) {
log.info("Kusto ingestion: file ({}) of size ({}) at current offset ({}) with status ({})",
fileDescriptor.path, fileDescriptor.rawBytes, currentOffset,ingestionStatus);
this.lastCommittedOffset = currentOffset;
committedOffset.inc(); // Increment the committed offset counter
fileCountOnIngestion.dec();
long ingestionEndTime = System.currentTimeMillis(); // Record the end time of ingestion
ingestionLag.update(ingestionEndTime - uploadStartTime, TimeUnit.MILLISECONDS); // Update ingestion-lag
ingestionSuccessCount.inc();
return;
} catch (IngestionServiceException exception) {
fileCountTableStageIngestionFail.inc();
ingestionErrorCount.inc();
fileCountOnIngestion.dec();
if (ingestionProps.streaming) {
Throwable innerException = exception.getCause();
if (innerException instanceof KustoDataExceptionBase &&
Expand All @@ -125,6 +171,9 @@ public void handleRollFile(SourceFile fileDescriptor) {
// retrying transient exceptions
backOffForRemainingAttempts(retryAttempts, exception, fileDescriptor);
} catch (IngestionClientException | URISyntaxException exception) {
fileCountTableStageIngestionFail.inc();
ingestionErrorCount.inc();
fileCountOnIngestion.dec();
throw new ConnectException(exception);
}
}
Expand Down Expand Up @@ -198,6 +247,7 @@ public void sendFailedRecordToDlq(SinkRecord sinkRecord) {
exception);
}
});
dlqRecordCount.inc();
} catch (IllegalStateException e) {
log.error("Failed to write records to miscellaneous dead-letter queue topic, "
+ "kafka producer has already been closed. Exception={0}", e);
Expand All @@ -218,6 +268,9 @@ void writeRecord(SinkRecord sinkRecord) throws ConnectException {
try (AutoCloseableLock ignored = new AutoCloseableLock(reentrantReadWriteLock.readLock())) {
this.currentOffset = sinkRecord.kafkaOffset();
fileWriter.writeData(sinkRecord);
processedOffset.inc();
// Record the time when data is written to the file
writeTime = System.currentTimeMillis();
} catch (IOException | DataException ex) {
handleErrors(sinkRecord, ex);
}
Expand Down Expand Up @@ -247,7 +300,9 @@ void open() {
reentrantReadWriteLock,
ingestionProps.ingestionProperties.getDataFormat(),
behaviorOnError,
isDlqEnabled);
isDlqEnabled,
tp.topic(),
metricRegistry);
}

void close() {
Expand Down
Loading
Loading