1
- // Copyright (c) 2024 Broadcom. All Rights Reserved.
1
+ // Copyright (c) 2024-2025 Broadcom. All Rights Reserved.
2
2
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3
3
//
4
- // Licensed under the Apache License, Version 2.0 (the "License");
5
- // you may not use this file except in compliance with the License.
6
- // You may obtain a copy of the License at
4
+ // This software, the RabbitMQ Stream Java client library, is dual-licensed under the
5
+ // Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
6
+ // For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
7
+ // please see LICENSE-APACHE2.
7
8
//
8
- // http://www.apache.org/licenses/LICENSE-2.0
9
- //
10
- // Unless required by applicable law or agreed to in writing, software
11
- // distributed under the License is distributed on an "AS IS" BASIS,
12
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
- // See the License for the specific language governing permissions and
14
- // limitations under the License.
9
+ // This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10
+ // either express or implied. See the LICENSE file for specific language governing
11
+ // rights and limitations of this software.
15
12
//
16
13
// If you have any questions regarding licensing, please contact us at
17
14
@@ -80,16 +77,16 @@ private boolean expiresSoon(Token ignores) {
80
77
}
81
78
82
79
private Token getToken () {
83
- if (LOGGER . isDebugEnabled ()) {
80
+ if (debug ()) {
84
81
LOGGER .debug (
85
82
"Requesting new token ({})..." , registrationSummary (this .registrations .values ()));
86
83
}
87
84
long start = 0L ;
88
- if (LOGGER . isDebugEnabled ()) {
85
+ if (debug ()) {
89
86
start = System .nanoTime ();
90
87
}
91
88
Token token = requester .request ();
92
- if (LOGGER . isDebugEnabled ()) {
89
+ if (debug ()) {
93
90
LOGGER .debug (
94
91
"Got new token in {} ms, token expires on {} ({})" ,
95
92
Duration .ofNanos (System .nanoTime () - start ),
@@ -128,14 +125,14 @@ private void updateRegistrations(Token t) {
128
125
registration .registrationToken = this .token ;
129
126
refreshedCount ++;
130
127
} else {
131
- if (LOGGER . isDebugEnabled ()) {
128
+ if (debug ()) {
132
129
LOGGER .debug (
133
130
"Not updating registration {} (closed or already has the new token)" ,
134
131
registration .name ());
135
132
}
136
133
}
137
134
} else {
138
- if (LOGGER . isDebugEnabled ()) {
135
+ if (debug ()) {
139
136
LOGGER .debug (
140
137
"Not updating registration {} (the token has changed)" , registration .name ());
141
138
}
@@ -160,33 +157,46 @@ private void token(Token t) {
160
157
private void scheduleTokenRefresh (Token t ) {
161
158
if (this .schedulingRefresh .compareAndSet (false , true )) {
162
159
if (this .refreshTask != null ) {
160
+ if (debug ()) {
161
+ LOGGER .debug ("Cancelling refresh task (scheduling a new one)" );
162
+ }
163
163
this .refreshTask .cancel (false );
164
164
}
165
165
Duration delay = this .refreshDelayStrategy .apply (t .expirationTime ());
166
166
if (!this .registrations .isEmpty ()) {
167
- if (LOGGER . isDebugEnabled ()) {
167
+ if (debug ()) {
168
168
LOGGER .debug (
169
- "Scheduling token retrieval in {} ({})" ,
169
+ "Scheduling token update in {} ({})" ,
170
170
delay ,
171
171
registrationSummary (this .registrations .values ()));
172
172
}
173
173
this .refreshTask =
174
174
this .scheduledExecutorService .schedule (
175
175
() -> {
176
+ if (debug ()) {
177
+ LOGGER .debug ("Starting token update task" );
178
+ }
176
179
Token previousToken = this .token ;
177
180
this .lock ();
178
181
try {
179
182
if (this .token .equals (previousToken )) {
180
183
Token newToken = getToken ();
181
184
token (newToken );
182
185
updateRegistrations (newToken );
186
+ } else {
187
+ if (debug ()) {
188
+ LOGGER .debug ("Token has already been updated" );
189
+ }
183
190
}
184
191
} finally {
185
192
unlock ();
186
193
}
187
194
},
188
195
delay .toMillis (),
189
196
TimeUnit .MILLISECONDS );
197
+ if (debug ()) {
198
+ LOGGER .debug ("Task scheduled" );
199
+ }
190
200
} else {
191
201
this .refreshTask = null ;
192
202
}
@@ -214,6 +224,9 @@ private RegistrationImpl(Long id, String name, AuthenticationCallback updateCall
214
224
215
225
@ Override
216
226
public void connect (AuthenticationCallback callback ) {
227
+ if (debug ()) {
228
+ LOGGER .debug ("Connecting registration {}" , this .name );
229
+ }
217
230
boolean shouldRefresh = false ;
218
231
Token tokenToUse ;
219
232
lock ();
@@ -235,6 +248,11 @@ public void connect(AuthenticationCallback callback) {
235
248
} finally {
236
249
unlock ();
237
250
}
251
+ if (debug ()) {
252
+ if (debug ()) {
253
+ LOGGER .debug ("Authenticating registration {}" , this .name );
254
+ }
255
+ }
238
256
callback .authenticate ("" , tokenToUse .value ());
239
257
if (shouldRefresh ) {
240
258
updateRegistrations (tokenToUse );
@@ -244,6 +262,7 @@ public void connect(AuthenticationCallback callback) {
244
262
@ Override
245
263
public void close () {
246
264
if (this .closed .compareAndSet (false , true )) {
265
+ LOGGER .debug ("Closing credentials registration {}" , this .name );
247
266
registrations .remove (this .id );
248
267
ScheduledFuture <?> task = refreshTask ;
249
268
if (registrations .isEmpty () && task != null ) {
@@ -325,4 +344,8 @@ public Duration apply(Instant expirationTime) {
325
344
private static String registrationSummary (Collection <? extends Registration > registrations ) {
326
345
return registrations .stream ().map (Registration ::toString ).collect (Collectors .joining (", " ));
327
346
}
347
+
348
+ private static boolean debug () {
349
+ return LOGGER .isDebugEnabled ();
350
+ }
328
351
}
0 commit comments