Skip to content

Commit fd58892

Browse files
committed
Kafka resiliency changes
1 parent 329e7cb commit fd58892

File tree

3 files changed

+434
-30
lines changed

3 files changed

+434
-30
lines changed

src/health/kafka.health.spec.ts

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
import { HealthCheckError } from '@nestjs/terminus';
2+
import { KafkaHealthIndicator } from './kafka.health';
3+
import {
4+
KafkaConnectionState,
5+
KafkaService,
6+
} from '../kafka/kafka.service';
7+
8+
jest.mock('../kafka/kafka.service', () => {
9+
const KafkaConnectionStateMock = {
10+
initializing: 'initializing',
11+
ready: 'ready',
12+
reconnecting: 'reconnecting',
13+
failed: 'failed',
14+
disabled: 'disabled',
15+
} as const;
16+
17+
class KafkaServiceMock {
18+
isConnected = jest.fn();
19+
getKafkaStatus = jest.fn();
20+
}
21+
22+
return {
23+
KafkaConnectionState: KafkaConnectionStateMock,
24+
KafkaService: KafkaServiceMock,
25+
};
26+
});
27+
28+
jest.mock('@platformatic/kafka', () => {
29+
class MockConsumer {
30+
consume = jest.fn();
31+
isConnected = jest.fn().mockReturnValue(true);
32+
close = jest.fn();
33+
on = jest.fn();
34+
}
35+
36+
class MockProducer {
37+
metadata = jest.fn();
38+
send = jest.fn();
39+
close = jest.fn();
40+
isConnected = jest.fn().mockReturnValue(true);
41+
}
42+
43+
return {
44+
Consumer: MockConsumer,
45+
Producer: MockProducer,
46+
MessagesStream: class {},
47+
ProduceAcks: { ALL: 'all' },
48+
jsonDeserializer: jest.fn(),
49+
jsonSerializer: jest.fn(),
50+
stringDeserializer: jest.fn(),
51+
stringSerializer: jest.fn(),
52+
};
53+
});
54+
55+
describe('KafkaHealthIndicator', () => {
56+
let kafkaService: jest.Mocked<
57+
Pick<KafkaService, 'isConnected' | 'getKafkaStatus'>
58+
>;
59+
let indicator: KafkaHealthIndicator;
60+
61+
beforeEach(() => {
62+
kafkaService = {
63+
isConnected: jest.fn(),
64+
getKafkaStatus: jest.fn(),
65+
} as unknown as jest.Mocked<
66+
Pick<KafkaService, 'isConnected' | 'getKafkaStatus'>
67+
>;
68+
69+
indicator = new KafkaHealthIndicator(
70+
kafkaService as unknown as KafkaService,
71+
);
72+
});
73+
74+
it('returns a healthy status when Kafka is connected', async () => {
75+
kafkaService.getKafkaStatus.mockReturnValue({
76+
state: KafkaConnectionState.ready,
77+
reconnectAttempts: 0,
78+
});
79+
kafkaService.isConnected.mockResolvedValue(true);
80+
81+
const result = await indicator.isHealthy('kafka');
82+
83+
expect(result.kafka.status).toBe('up');
84+
expect(result.kafka.state).toBe(KafkaConnectionState.ready);
85+
expect(result.kafka.reconnectAttempts).toBe(0);
86+
});
87+
88+
it('throws when Kafka state is failed', async () => {
89+
kafkaService.getKafkaStatus.mockReturnValue({
90+
state: KafkaConnectionState.failed,
91+
reconnectAttempts: 3,
92+
reason: 'Kafka reconnection attempts exhausted',
93+
});
94+
95+
await expect(indicator.isHealthy('kafka')).rejects.toBeInstanceOf(
96+
HealthCheckError,
97+
);
98+
expect(kafkaService.isConnected).not.toHaveBeenCalled();
99+
});
100+
101+
it('throws when Kafka connections are not ready', async () => {
102+
kafkaService.getKafkaStatus.mockReturnValue({
103+
state: KafkaConnectionState.ready,
104+
reconnectAttempts: 1,
105+
reason: 'Kafka is not connected',
106+
});
107+
kafkaService.isConnected.mockResolvedValue(false);
108+
109+
await expect(indicator.isHealthy('kafka')).rejects.toBeInstanceOf(
110+
HealthCheckError,
111+
);
112+
});
113+
});

src/health/kafka.health.ts

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
import { Injectable } from '@nestjs/common';
22
import { HealthIndicator, HealthCheckError } from '@nestjs/terminus';
3-
import { KafkaService } from '../kafka/kafka.service';
3+
import {
4+
KafkaConnectionState,
5+
KafkaService,
6+
} from '../kafka/kafka.service';
47
import { LoggerService } from '../common/services/logger.service';
58

69
@Injectable()
@@ -13,22 +16,40 @@ export class KafkaHealthIndicator extends HealthIndicator {
1316

1417
async isHealthy(key: string) {
1518
try {
19+
const status = this.kafkaService.getKafkaStatus();
20+
const timestamp = new Date().toISOString();
21+
22+
if (status.state === KafkaConnectionState.failed) {
23+
throw new HealthCheckError(
24+
'KafkaHealthCheck failed',
25+
this.getStatus(key, false, {
26+
state: status.state,
27+
reconnectAttempts: status.reconnectAttempts,
28+
reason:
29+
status.reason || 'Kafka reconnection attempts exhausted',
30+
timestamp,
31+
}),
32+
);
33+
}
34+
1635
const isConnected = await this.kafkaService.isConnected();
1736

1837
if (!isConnected) {
1938
throw new HealthCheckError(
2039
'KafkaHealthCheck failed',
21-
this.getStatus(key, false, { error: 'Kafka is not connected' }),
40+
this.getStatus(key, false, {
41+
state: status.state,
42+
reconnectAttempts: status.reconnectAttempts,
43+
reason: status.reason || 'Kafka is not connected',
44+
timestamp,
45+
}),
2246
);
2347
}
2448

2549
return this.getStatus(key, true, {
26-
status: 'up',
27-
timestamp: new Date().toISOString(),
28-
details: {
29-
producer: 'connected',
30-
consumers: 'active',
31-
},
50+
state: status.state,
51+
reconnectAttempts: status.reconnectAttempts,
52+
timestamp,
3253
});
3354
} catch (error: unknown) {
3455
const err = error as Error;
@@ -41,7 +62,7 @@ export class KafkaHealthIndicator extends HealthIndicator {
4162
throw new HealthCheckError(
4263
'KafkaHealthCheck failed',
4364
this.getStatus(key, false, {
44-
error: err.message,
65+
reason: err.message,
4566
timestamp: new Date().toISOString(),
4667
}),
4768
);

0 commit comments

Comments
 (0)