-
Notifications
You must be signed in to change notification settings - Fork 299
Open
Description
I am running a Kafka consumer using kafkajs in a Node.js worker to process deployment logs. However, the consumer keeps consuming the same messages infinitely, causing duplicate log entries in my database.
Additionally, when I try to reset offsets, I get the error:
// Initialize Kafka consumer inside the try block
const consumer = kafka.consumer({
groupId: 'deployment-log-group',
sessionTimeout: 30000,
heartbeatInterval: 3000
});
// Define processLogMessage with access to llmInstance
const processLogMessage = async ({ topic, partition, message }) => {
try {
if (!message.value) {
console.warn("Skipping empty message");
return;
}
const logMessage = JSON.parse(message.value.toString());
console.log('Processing message:', { topic, partition, offset: message.offset });
// Extract deployment details safely
const {
PROJECT_ID = null,
DEPLOYMENT_ID = null,
GIT_URI = null,
log = null,
timestamp = null,
logLevel = 'info',
fileName = null,
fileSize = null,
fileSizeInBytes = null,
timeTaken = null
} = logMessage || {};
if (!DEPLOYMENT_ID || !PROJECT_ID) {
console.warn("Missing required fields, skipping message");
return;
}
// Process message logic
const { classification, reasoning } = await classifyLogs(logMessage.log, llmInstance);
console.log("AI REASONING...", reasoning);
if (classification === "CRITICAL_FAILURE") {
console.error("Critical failure detected. Marking deployment as failed.");
await prisma.deployment.update({
where: { id: DEPLOYMENT_ID },
data: { status: 'FAILED' },
});
} else if (classification === "RECOVERABLE_ERROR") {
console.warn("Recoverable error detected. Initiating retry mechanism.");
await failedQueue.add('retryJob', {
deploymentId: DEPLOYMENT_ID,
projectId: PROJECT_ID,
gitUrl: GIT_URI,
error: reasoning,
});
} else if (classification === "SUCCESS") {
await prisma.deployment.update({
where: { id: DEPLOYMENT_ID },
data: { status: 'ACTIVE' },
});
} else {
console.log("Log classified as:", JSON.stringify(classification));
}
console.log(`Log received: ${log} | Deployment: ${DEPLOYMENT_ID} | Project: ${PROJECT_ID}`);
// Insert log into ClickHouse
const logEntry = {
event_id: uuidv4(),
project_id: PROJECT_ID,
deployment_id: DEPLOYMENT_ID,
log_message: log,
log_level: logLevel,
file_name: fileName,
file_size: fileSize,
file_size_in_bytes: fileSizeInBytes,
time_taken: timeTaken,
git_url: GIT_URI,
kafka_offset: message.offset,
kafka_partition: partition
};
const aiEntry = {
deployment_id: DEPLOYMENT_ID,
log_event_id: logMessage.event_id || uuidv4(),
classification: classification,
reasoning: reasoning,
};
const clickHouseClient = await getClickHouseClient();
// Process ClickHouse insertions
// const { query_id: logQueryId } = await clickHouseClient.insert({
// table: 'deployment_logs',
// values: [logEntry],
// format: 'JSONEachRow',
// });
console.log(`Log inserted to ClickHouse with query ID:`);
// const { query_id: aiQueryId } = await clickHouseClient.insert({
// table: 'deployment_ai_analysis',
// values: [aiEntry],
// format: 'JSONEachRow',
// });
console.log(`AI analysis inserted into ClickHouse with query ID:`);
// Remove the resolveOffset call since we're using autoCommit
// resolveOffset(message.offset); <- Remove this line
} catch (error) {
console.error("Failed to parse log message:", error.message);
// Don't throw the error to prevent message processing from stopping
}
};
(async () => {
try {
await consumer.connect();
console.log("Kafka consumer connected successfully");
await consumer.subscribe({
topic: 'builder-logs',
fromBeginning: false
});
// Modify the consumer.run configuration
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
await processLogMessage({ topic, partition, message });
// Manually commit offset after successful processing
await consumer.commitOffsets([{
topic,
partition,
offset: (Number(message.offset) + 1).toString()
}]);
console.log(`Committed offset ${message.offset} for partition ${partition}`);
} catch (error) {
console.error(`Failed to process/commit message ${message.offset}:`, error);
// Consider adding retry logic here for transient errors
}
},
autoCommit: false // Disable auto-commit
});
} catch (error) {
console.error('Failed to initialize consumer:', error);
await consumer.disconnect();
process.exit(1);
}
})();
}
initializeWorker().catch(error => {
console.error('Failed to initialize worker:', error);
process.exit(1);
});
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels