2
2
3
3
import java .io .EOFException ;
4
4
import java .io .IOException ;
5
+ import java .nio .ByteBuffer ;
6
+ import java .util .HashMap ;
5
7
import java .util .Set ;
6
8
import java .util .concurrent .ConcurrentHashMap ;
9
+ import java .util .concurrent .Executors ;
10
+ import java .util .concurrent .ScheduledExecutorService ;
11
+ import java .util .concurrent .TimeUnit ;
7
12
8
13
import org .eclipse .jetty .websocket .api .Session ;
9
14
15
+ import info .unterrainer .commons .jreutils .ShutdownHook ;
10
16
import info .unterrainer .oauthtokenmanager .OauthTokenManager ;
11
17
import io .javalin .websocket .WsBinaryMessageContext ;
12
18
import io .javalin .websocket .WsCloseContext ;
@@ -21,6 +27,37 @@ public class WsOauthHandlerBase extends WsHandlerBase {
21
27
protected OauthTokenManager tokenHandler ;
22
28
protected Set <WsConnectContext > clientsConnected = ConcurrentHashMap .newKeySet ();
23
29
protected Set <WsConnectContext > clientsQuarantined = ConcurrentHashMap .newKeySet ();
30
+ protected HashMap <Session , String > tenantIdsBySession = new HashMap <>();
31
+
32
+ protected ScheduledExecutorService hb = Executors .newSingleThreadScheduledExecutor (r -> {
33
+ Thread t = new Thread (r , "ws-heartbeat" );
34
+ t .setDaemon (true );
35
+ return t ;
36
+ });
37
+
38
+ public WsOauthHandlerBase () {
39
+ super ();
40
+ ShutdownHook .register (() -> {
41
+ hb .close ();
42
+ hb = null ;
43
+ });
44
+
45
+ hb .scheduleAtFixedRate (() -> {
46
+ for (WsConnectContext c : clientsConnected ) {
47
+ Session s = c .session ;
48
+ if (s .isOpen ()) {
49
+ try {
50
+ s .getRemote ().sendPing (ByteBuffer .allocate (1 ));
51
+ } catch (Exception e ) {
52
+ try {
53
+ s .close (1000 , "heartbeat failed" );
54
+ } catch (Exception ignore ) {
55
+ }
56
+ }
57
+ }
58
+ }
59
+ }, 30 , 30 , TimeUnit .SECONDS );
60
+ }
24
61
25
62
void setTokenHandler (OauthTokenManager tokenHandler ) {
26
63
this .tokenHandler = tokenHandler ;
@@ -64,7 +101,8 @@ public void onConnect(WsConnectContext ctx) throws Exception {
64
101
}
65
102
log .debug ("New client token: [{}]" , token );
66
103
try {
67
- tokenHandler .checkAccess (token );
104
+ String tenantId = tokenHandler .checkAccess (token );
105
+ tenantIdsBySession .put (ctx .session , tenantId );
68
106
clientsConnected .add (ctx );
69
107
} catch (Exception e ) {
70
108
log .debug ("Token validation failed for client [{}]. Disconnecting." , ctx .session .getRemoteAddress (), e );
@@ -90,7 +128,8 @@ public final void onMessage(WsMessageContext ctx) throws Exception {
90
128
return ;
91
129
}
92
130
try {
93
- tokenHandler .checkAccess (ctx .message ());
131
+ String tenantId = tokenHandler .checkAccess (ctx .message ());
132
+ tenantIdsBySession .put (ctx .session , tenantId );
94
133
WsConnectContext client = getQuarantinedClient (ctx .session );
95
134
log .debug ("Client [{}] passed token validation. Moving from quarantine to connected." ,
96
135
ctx .session .getRemoteAddress ());
0 commit comments