Skip to content

Commit 6ccd8cb

Browse files
committed
Kafka integration
1 parent 4743bfb commit 6ccd8cb

10 files changed

+567
-2
lines changed

.env.sample

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,34 @@
11
POSTGRES_SCHEMA="public"
22
DATABASE_URL="postgresql://johndoe:randompassword@localhost:5432/mydb?schema=${POSTGRES_SCHEMA}"
33

4+
# Kafka Configuration
5+
KAFKA_BROKERS=localhost:9092
6+
KAFKA_CLIENT_ID=tc-review-api
7+
KAFKA_GROUP_ID=tc-review-consumer-group
8+
KAFKA_SSL_ENABLED=false
9+
10+
# SASL Configuration (optional - uncomment if needed)
11+
# KAFKA_SASL_MECHANISM=plain
12+
# KAFKA_SASL_USERNAME=
13+
# KAFKA_SASL_PASSWORD=
14+
15+
# Consumer Configuration
16+
KAFKA_SESSION_TIMEOUT=30000
17+
KAFKA_HEARTBEAT_INTERVAL=3000
18+
KAFKA_MAX_WAIT_TIME=5000
19+
KAFKA_CONNECTION_TIMEOUT=10000
20+
KAFKA_REQUEST_TIMEOUT=30000
21+
22+
# Retry Configuration
23+
KAFKA_RETRY_ATTEMPTS=5
24+
KAFKA_INITIAL_RETRY_TIME=100
25+
KAFKA_MAX_RETRY_TIME=30000
26+
27+
# Dead Letter Queue Configuration
28+
KAFKA_DLQ_ENABLED=true
29+
KAFKA_DLQ_TOPIC_SUFFIX=.dlq
30+
KAFKA_DLQ_MAX_RETRIES=3
31+
432
# API configs
533
BUS_API_URL="https://api.topcoder-dev.com/v5/bus/events"
634
CHALLENGE_API_URL="https://api.topcoder-dev.com/v5/challenges/"

docker-compose.kafka.yml

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
services:
2+
zookeeper:
3+
image: confluentinc/cp-zookeeper:7.4.0
4+
hostname: zookeeper
5+
container_name: zookeeper
6+
ports:
7+
- "2181:2181"
8+
environment:
9+
ZOOKEEPER_CLIENT_PORT: 2181
10+
ZOOKEEPER_TICK_TIME: 2000
11+
networks:
12+
- kafka-network
13+
14+
kafka:
15+
image: confluentinc/cp-kafka:7.4.0
16+
hostname: kafka
17+
container_name: kafka
18+
depends_on:
19+
- zookeeper
20+
ports:
21+
- "9092:9092"
22+
- "9101:9101"
23+
environment:
24+
KAFKA_BROKER_ID: 1
25+
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
26+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
27+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
28+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
29+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
30+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
31+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
32+
KAFKA_JMX_PORT: 9101
33+
KAFKA_JMX_HOSTNAME: localhost
34+
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
35+
networks:
36+
- kafka-network
37+
38+
kafka-ui:
39+
image: provectuslabs/kafka-ui:latest
40+
container_name: kafka-ui
41+
depends_on:
42+
- kafka
43+
ports:
44+
- "8080:8080"
45+
environment:
46+
KAFKA_CLUSTERS_0_NAME: local
47+
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
48+
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
49+
networks:
50+
- kafka-network
51+
52+
networks:
53+
kafka-network:
54+
driver: bridge

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@
3939
"nanoid": "~5.1.2",
4040
"reflect-metadata": "^0.2.2",
4141
"rxjs": "^7.8.1",
42-
"tc-core-library-js": "appirio-tech/tc-core-library-js.git#v3.0.1"
42+
"tc-core-library-js": "appirio-tech/tc-core-library-js.git#v3.0.1",
43+
"kafkajs": "^2.2.4"
4344
},
4445
"devDependencies": {
4546
"@eslint/eslintrc": "^3.2.0",

pnpm-lock.yaml

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/shared/modules/global/globalProviders.module.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@ import { M2MService } from './m2m.service';
1010
import { ChallengeApiService } from './challenge.service';
1111
import { EventBusService } from './eventBus.service';
1212
import { MemberService } from './member.service';
13+
import { KafkaModule } from '../kafka/kafka.module';
1314

1415
// Global module for providing global providers
1516
// Add any provider you want to be global here
1617
@Global()
1718
@Module({
18-
imports: [HttpModule],
19+
imports: [HttpModule, KafkaModule.forRoot()],
1920
providers: [
2021
{
2122
provide: APP_GUARD,
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import { LoggerService } from '../global/logger.service';
2+
3+
export abstract class BaseEventHandler {
4+
protected logger: LoggerService;
5+
6+
constructor(logger: LoggerService) {
7+
this.logger = logger;
8+
}
9+
10+
abstract handle(message: any): Promise<void>;
11+
abstract getTopic(): string;
12+
13+
protected logMessage(message: any): void {
14+
this.logger.log({
15+
message: 'Processing Kafka message',
16+
topic: this.getTopic(),
17+
payload: message,
18+
});
19+
}
20+
21+
protected validateMessage(message: any): boolean {
22+
return message !== null && message !== undefined;
23+
}
24+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import { Injectable, OnModuleInit } from '@nestjs/common';
2+
import { BaseEventHandler } from '../base-event.handler';
3+
import { KafkaHandlerRegistry } from '../kafka-handler.registry';
4+
import { LoggerService } from '../../global/logger.service';
5+
6+
@Injectable()
7+
export class AVScanActionScanHandler
8+
extends BaseEventHandler
9+
implements OnModuleInit
10+
{
11+
private readonly topic = 'avscan.action.scan';
12+
13+
constructor(private readonly handlerRegistry: KafkaHandlerRegistry) {
14+
super(LoggerService.forRoot('AVScanActionScanHandler'));
15+
}
16+
17+
onModuleInit() {
18+
this.handlerRegistry.registerHandler(this.topic, this);
19+
this.logger.log(`Registered handler for topic: ${this.topic}`);
20+
}
21+
22+
getTopic(): string {
23+
return this.topic;
24+
}
25+
26+
async handle(message: any): Promise<void> {
27+
try {
28+
this.logger.log({
29+
message: 'Processing AVScan Action Scan event',
30+
topic: this.topic,
31+
payload: message,
32+
});
33+
34+
if (!this.validateMessage(message)) {
35+
this.logger.warn('Invalid message received');
36+
return;
37+
}
38+
39+
this.logger.log('=== AVScan Action Scan Event ===');
40+
this.logger.log('Topic:', this.topic);
41+
this.logger.log('Payload:', JSON.stringify(message, null, 2));
42+
this.logger.log('==============================');
43+
44+
await Promise.resolve(); // Add await to satisfy linter
45+
46+
this.logger.log('AVScan Action Scan event processed successfully');
47+
} catch (error) {
48+
this.logger.error('Error processing AVScan Action Scan event', error);
49+
throw error;
50+
}
51+
}
52+
}

0 commit comments

Comments
 (0)