Skip to content

Commit d45912a

Browse files
committed
Merge pull request #43 from mustafaiman/heartbeat
heartbeat service
2 parents 4ee55b1 + 0795fa0 commit d45912a

File tree

10 files changed

+260
-33
lines changed

10 files changed

+260
-33
lines changed

src/Config.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,10 @@ export class ListenerConfig {
4141

4242
export class ClientConfig {
4343
instanceName: string;
44-
properties = {};
44+
properties: any = {
45+
'hazelcast.client.heartbeat.interval': 5000,
46+
'hazelcast.client.heartbeat.timeout': 60000
47+
};
4548
groupConfig: GroupConfig = new GroupConfig();
4649
networkConfig: ClientNetworkConfig = new ClientNetworkConfig();
4750
listenerConfigs: ListenerConfig[];

src/ConnectionHeartbeatListener.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
import ClientConnection = require('./invocation/ClientConnection');
2+
export interface ConnectionHeartbeatListener {
3+
onHeartbeatRestored?: (connection?: ClientConnection) => void;
4+
onHeartbeatStopped?: (connection?: ClientConnection) => void;
5+
}

src/ConnectionListener.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
import ClientConnection = require('./invocation/ClientConnection');
2+
export interface ConnectionListener {
3+
onConnectionOpened?: (connection: ClientConnection) => void;
4+
onConnectionClosed?: (connection: ClientConnection) => void;
5+
}

src/HazelcastClient.ts

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {IMap} from './IMap';
88
import {JsonSerializationService} from './serialization/SerializationService';
99
import PartitionService = require('./PartitionService');
1010
import ClusterService = require('./invocation/ClusterService');
11+
import Heartbeat = require('./Heartbeat');
1112

1213
class HazelcastClient {
1314

@@ -18,6 +19,7 @@ class HazelcastClient {
1819
private partitionService: PartitionService;
1920
private clusterService: ClusterService;
2021
private proxyManager: ProxyManager;
22+
private heartbeat: Heartbeat;
2123

2224
public static newHazelcastClient(config?: ClientConfig): Q.Promise<HazelcastClient> {
2325
var client: HazelcastClient = new HazelcastClient(config);
@@ -35,16 +37,22 @@ class HazelcastClient {
3537
this.partitionService = new PartitionService(this);
3638
this.connectionManager = new ClientConnectionManager(this);
3739
this.clusterService = new ClusterService(this);
40+
this.heartbeat = new Heartbeat(this);
3841
}
3942

4043
private init(): Q.Promise<HazelcastClient> {
4144
var deferred = Q.defer<HazelcastClient>();
4245

43-
this.clusterService.start().then(() => {
44-
return this.partitionService.initialize();
45-
}).then(() => {
46-
deferred.resolve(this);
47-
}).catch((e) => {
46+
this.clusterService.start()
47+
.then(() => {
48+
return this.partitionService.initialize();
49+
})
50+
.then(() => {
51+
return this.heartbeat.start();
52+
})
53+
.then(() => {
54+
deferred.resolve(this);
55+
}).catch((e) => {
4856
deferred.reject(e);
4957
});
5058

@@ -82,6 +90,10 @@ class HazelcastClient {
8290
public getClusterService(): ClusterService {
8391
return this.clusterService;
8492
}
93+
94+
getHeartbeat(): Heartbeat {
95+
return this.heartbeat;
96+
}
8597
}
8698

8799
export = HazelcastClient;

src/Heartbeat.ts

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import {ClientPingCodec} from './codec/ClientPingCodec';
2+
import HazelcastClient = require('./HazelcastClient');
3+
import ClientConnection = require('./invocation/ClientConnection');
4+
import {ConnectionHeartbeatListener} from './ConnectionHeartbeatListener';
5+
import Q = require('q');
6+
7+
var PROPERTY_HEARTBEAT_INTERVAL: string = 'hazelcast.client.heartbeat.interval';
8+
var PROPERTY_HEARTBEAT_TIMEOUT: string = 'hazelcast.client.heartbeat.timeout';
9+
10+
class Heartbeat {
11+
12+
private client: HazelcastClient;
13+
private heartbeatTimeout: number;
14+
private heartbeatInterval: number;
15+
private listeners: ConnectionHeartbeatListener[] = [];
16+
17+
//Actually it is a NodeJS.Timer. Another typing file that comes with a module we use causes TSD to see
18+
//return type of setTimeout as number. Because of this we defined timer property as `any` type.
19+
private timer: any;
20+
21+
constructor(client: HazelcastClient) {
22+
this.client = client;
23+
this.heartbeatInterval = this.client.getConfig().properties[PROPERTY_HEARTBEAT_INTERVAL];
24+
this.heartbeatTimeout = this.client.getConfig().properties[PROPERTY_HEARTBEAT_TIMEOUT];
25+
}
26+
27+
start() {
28+
this.timer = setTimeout(this.heartbeatFunction.bind(this), this.heartbeatInterval);
29+
}
30+
31+
cancel() {
32+
clearTimeout(this.timer);
33+
}
34+
35+
addListener(heartbeatListener: ConnectionHeartbeatListener) {
36+
this.listeners.push(heartbeatListener);
37+
}
38+
39+
private heartbeatFunction() {
40+
var estConnections = this.client.getConnectionManager().establishedConnections;
41+
for (var address in estConnections) {
42+
if ( estConnections.hasOwnProperty(address)) {
43+
var conn = estConnections[address];
44+
var timeSinceLastRead = new Date().getTime() - conn.lastRead;
45+
if (timeSinceLastRead > this.heartbeatTimeout) {
46+
if (conn.heartbeating) {
47+
setImmediate(this.onHeartbeatStopped.bind(this), conn);
48+
}
49+
}
50+
if (timeSinceLastRead > this.heartbeatInterval) {
51+
var req = ClientPingCodec.encodeRequest();
52+
this.client.getInvocationService().invokeOnConnection(conn, req);
53+
} else {
54+
if (!conn.heartbeating) {
55+
setImmediate(this.onHeartbeatRestored.bind(this), conn);
56+
}
57+
}
58+
}
59+
}
60+
this.timer = setTimeout(this.heartbeatFunction.bind(this), this.heartbeatInterval);
61+
}
62+
63+
private onHeartbeatStopped(connection: ClientConnection) {
64+
connection.heartbeating = false;
65+
console.log('heartbeat stopped on ' + connection.address);
66+
this.listeners.forEach((listener) => {
67+
if (listener.hasOwnProperty('onHeartbeatStopped')) {
68+
setImmediate(listener.onHeartbeatStopped.bind(this), connection);
69+
}
70+
});
71+
}
72+
73+
private onHeartbeatRestored(connection: ClientConnection) {
74+
connection.heartbeating = true;
75+
console.log('heartbeat restored on ' + connection.address);
76+
this.listeners.forEach((listener) => {
77+
if (listener.hasOwnProperty('onHeartbeatRestored')) {
78+
setImmediate(listener.onHeartbeatRestored.bind(this), connection);
79+
}
80+
});
81+
}
82+
83+
}
84+
85+
export = Heartbeat;

src/codec/ClientPingCodec.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/* tslint:disable */
2+
import ClientMessage = require('../ClientMessage');
3+
var REQUEST_TYPE = 0x000f;
4+
var RESPONSE_TYPE = 100;
5+
var RETRYABLE = true;
6+
7+
8+
export class ClientPingCodec{
9+
static calculateSize(){
10+
return 0;
11+
}
12+
13+
static encodeRequest() {
14+
15+
var clientMessage = ClientMessage.newClientMessage(this.calculateSize());
16+
clientMessage.setMessageType(REQUEST_TYPE);
17+
clientMessage.setRetryable(RETRYABLE)
18+
clientMessage.updateFrameLength();
19+
return clientMessage;
20+
}
21+
22+
static decodeResponse(clientMessage: ClientMessage) {
23+
return;
24+
}
25+
}

src/invocation/ClientConnection.ts

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,27 @@ import Address = require('../Address');
44
import {BitsUtil} from '../BitsUtil';
55

66
class ClientConnection {
7-
public address: Address;
8-
public socket: net.Socket;
7+
address: Address;
8+
socket: net.Socket;
9+
lastRead: number;
10+
heartbeating = true;
11+
912
private readBuffer: Buffer;
1013

1114
constructor(address: Address) {
1215
this.address = address;
1316
this.readBuffer = new Buffer(0);
17+
this.lastRead = 0;
1418
}
1519

16-
public getAddress(): Address {
20+
getAddress(): Address {
1721
return this.address;
1822
}
1923

2024
connect(): Q.Promise<ClientConnection> {
2125
var ready = Q.defer<ClientConnection>();
2226

2327
this.socket = net.connect(this.address.port, this.address.host, () => {
24-
console.log('Connection established to ' + this.address );
2528

2629
// Send the protocol version
2730
var buffer = new Buffer(3);
@@ -42,8 +45,13 @@ class ClientConnection {
4245
this.socket.write(buffer);
4346
}
4447

48+
close() {
49+
this.socket.destroy();
50+
}
51+
4552
registerResponseCallback(callback: Function) {
4653
this.socket.on('data', (buffer: Buffer) => {
54+
this.lastRead = new Date().getTime();
4755
this.readBuffer = Buffer.concat([this.readBuffer, buffer], this.readBuffer.length + buffer.length);
4856
while (this.readBuffer.length >= BitsUtil.INT_SIZE_IN_BYTES ) {
4957
var frameSize = this.readBuffer.readInt32LE(0);

src/invocation/ClientConnectionManager.ts

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,20 @@ import {GroupConfig, ClientNetworkConfig} from '../Config';
99

1010
import ConnectionAuthenticator = require('./ConnectionAuthenticator');
1111
import HazelcastClient = require('../HazelcastClient');
12+
import {ConnectionListener} from '../ConnectionListener';
1213

1314
class ClientConnectionManager {
1415

1516
private client: HazelcastClient;
16-
17+
private listeners: ConnectionListener[] = [];
1718
private pendingConnections: {[address: string]: Q.Deferred<ClientConnection>} = {};
18-
private establishedConnections: {[address: string]: ClientConnection} = {};
19+
establishedConnections: {[address: string]: ClientConnection} = {};
1920

2021
constructor(client: HazelcastClient) {
2122
this.client = client;
2223
}
2324

24-
public getOrConnect(address: Address): Q.Promise<ClientConnection> {
25+
getOrConnect(address: Address): Q.Promise<ClientConnection> {
2526
var addressIndex = address.toString();
2627
var result: Q.Deferred<ClientConnection> = Q.defer<ClientConnection>();
2728

@@ -43,21 +44,23 @@ class ClientConnectionManager {
4344
var clientConnection = new ClientConnection(address);
4445

4546
clientConnection.connect().then((connection: ClientConnection) => {
46-
4747
connection.registerResponseCallback((data: Buffer) => {
4848
this.client.getInvocationService().processResponse(data);
4949
});
5050

5151
var callback = (authenticated: boolean) => {
5252
if (authenticated) {
5353
result.resolve(connection);
54-
this.establishedConnections[addressIndex] = connection;
54+
this.establishedConnections[connection.address.toString()] = connection;
5555
} else {
5656
result.reject(new Error('Authentication failed'));
5757
}
5858
};
59-
60-
this.authenticate(connection).then(callback).finally(() => {
59+
this.authenticate(connection).then(callback).then(() => {
60+
this.onConnectionOpened(connection);
61+
}).catch((e: any) => {
62+
result.reject(e);
63+
}).finally(() => {
6164
delete this.pendingConnections[addressIndex];
6265
});
6366
}).catch((e: any) => {
@@ -67,6 +70,40 @@ class ClientConnectionManager {
6770
return result.promise;
6871
}
6972

73+
destroyConnection(address: Address): void {
74+
var addressStr = address.toString();
75+
if (this.pendingConnections.hasOwnProperty(addressStr)) {
76+
this.pendingConnections[addressStr].reject(null);
77+
}
78+
if (this.establishedConnections.hasOwnProperty(addressStr)) {
79+
var conn = this.establishedConnections[addressStr];
80+
conn.close();
81+
this.onConnectionClosed(conn);
82+
delete this.establishedConnections[addressStr];
83+
}
84+
}
85+
86+
addListener(listener: ConnectionListener) {
87+
this.listeners.push(listener);
88+
}
89+
90+
private onConnectionClosed(connection: ClientConnection) {
91+
this.listeners.forEach((listener) => {
92+
if (listener.hasOwnProperty('onConnectionClosed')) {
93+
setImmediate(listener.onConnectionClosed.bind(this), connection);
94+
}
95+
});
96+
}
97+
98+
private onConnectionOpened(connection: ClientConnection) {
99+
console.log('Authenticated to ' + connection.address);
100+
this.listeners.forEach((listener) => {
101+
if (listener.hasOwnProperty('onConnectionOpened')) {
102+
setImmediate(listener.onConnectionOpened.bind(this), connection);
103+
}
104+
});
105+
}
106+
70107
private authenticate(connection: ClientConnection): Q.Promise<boolean> {
71108
var name = this.client.getConfig().groupConfig.name;
72109
var password = this.client.getConfig().groupConfig.password;

0 commit comments

Comments
 (0)