Skip to content

Commit 3619ebd

Browse files
committed
Support OAuth2 authentication
This commit introduces OAuth2 settings (token endpoint URI, client secret, etc) in the stream environment builder. When OAuth2 is in use, the library fetches a token from the HTTP(S) endpoint and use it to authenticate against the broker. The library also takes care of refreshing tokens: when a token is about to expire, a new one is fetched, and connections re-authenticate. Fixes #781
1 parent ea127e3 commit 3619ebd

30 files changed

+2486
-41
lines changed

ci/start-broker.sh

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ cp -R "${PWD}"/tls-gen/basic/result/* rabbitmq-configuration/tls
2020
chmod o+r rabbitmq-configuration/tls/*
2121
chmod g+r rabbitmq-configuration/tls/*
2222

23-
echo "[rabbitmq_stream,rabbitmq_mqtt,rabbitmq_stomp,rabbitmq_amqp1_0,rabbitmq_auth_mechanism_ssl]." >> rabbitmq-configuration/enabled_plugins
23+
echo "[rabbitmq_stream,rabbitmq_mqtt,rabbitmq_stomp,rabbitmq_amqp1_0,rabbitmq_auth_mechanism_ssl,rabbitmq_auth_backend_oauth2]." >> rabbitmq-configuration/enabled_plugins
2424

2525
echo "loopback_users = none
2626
@@ -37,8 +37,25 @@ auth_mechanisms.1 = PLAIN
3737
auth_mechanisms.2 = ANONYMOUS
3838
auth_mechanisms.3 = EXTERNAL
3939
40+
auth_backends.1 = internal
41+
auth_backends.2 = rabbit_auth_backend_oauth2
42+
4043
stream.listeners.ssl.1 = 5551" >> rabbitmq-configuration/rabbitmq.conf
4144

45+
echo "[
46+
{rabbitmq_auth_backend_oauth2, [{key_config,
47+
[{signing_keys,
48+
#{<<\"token-key\">> =>
49+
{map,
50+
#{<<\"alg\">> => <<\"HS256\">>,
51+
<<\"k\">> => <<\"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGH\">>,
52+
<<\"kid\">> => <<\"token-key\">>,
53+
<<\"kty\">> => <<\"oct\">>,
54+
<<\"use\">> => <<\"sig\">>,
55+
<<\"value\">> => <<\"token-key\">>}}}}]},
56+
{resource_server_id,<<\"rabbitmq\">>}]}
57+
]." >> rabbitmq-configuration/advanced.config
58+
4259
echo "Running RabbitMQ ${RABBITMQ_IMAGE}"
4360

4461
docker rm -f rabbitmq 2>/dev/null || echo "rabbitmq was not running"

pom.xml

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
<proton-j.version>0.34.1</proton-j.version>
5555
<metrics.version>4.2.33</metrics.version>
5656
<micrometer.version>1.15.3</micrometer.version>
57+
<gson.version>2.13.1</gson.version>
5758
<swiftmq-client.version>13.1.2</swiftmq-client.version>
5859
<picocli.version>4.7.5</picocli.version>
5960
<commons-compress.version>1.28.0</commons-compress.version>
@@ -66,10 +67,10 @@
6667
<amqp-client.version>5.26.0</amqp-client.version>
6768
<commons-lang3.version>3.18.0</commons-lang3.version>
6869
<commons-codec.version>1.19.0</commons-codec.version>
69-
<gson.version>2.13.1</gson.version>
7070
<vavr.version>0.10.7</vavr.version>
7171
<paho.version>1.2.5</paho.version>
7272
<micrometer-tracing-test.version>1.5.3</micrometer-tracing-test.version>
73+
<bouncycastle.version>1.81</bouncycastle.version>
7374
<micrometer-docs-generator.version>1.0.4</micrometer-docs-generator.version>
7475
<netty-tcnative.version>2.0.72.Final</netty-tcnative.version>
7576
<maven.compiler.plugin.version>3.14.0</maven.compiler.plugin.version>
@@ -105,6 +106,7 @@
105106
<central-publishing-maven-plugin.version>0.8.0</central-publishing-maven-plugin.version>
106107
<maven.javadoc.skip>true</maven.javadoc.skip>
107108
<gpg.skip>true</gpg.skip>
109+
<jose4j.version>0.9.6</jose4j.version>
108110
</properties>
109111

110112
<dependencies>
@@ -185,6 +187,13 @@
185187
<optional>true</optional>
186188
</dependency>
187189

190+
<dependency>
191+
<groupId>com.google.code.gson</groupId>
192+
<artifactId>gson</artifactId>
193+
<version>${gson.version}</version>
194+
<optional>true</optional>
195+
</dependency>
196+
188197
<dependency>
189198
<groupId>org.junit.jupiter</groupId>
190199
<artifactId>junit-jupiter-engine</artifactId>
@@ -285,13 +294,6 @@
285294
<scope>test</scope>
286295
</dependency>
287296

288-
<dependency>
289-
<groupId>com.google.code.gson</groupId>
290-
<artifactId>gson</artifactId>
291-
<version>${gson.version}</version>
292-
<scope>test</scope>
293-
</dependency>
294-
295297
<dependency>
296298
<groupId>io.vavr</groupId>
297299
<artifactId>vavr</artifactId>
@@ -313,6 +315,20 @@
313315
<scope>test</scope>
314316
</dependency>
315317

318+
<dependency>
319+
<groupId>org.bouncycastle</groupId>
320+
<artifactId>bcpkix-jdk18on</artifactId>
321+
<version>${bouncycastle.version}</version>
322+
<scope>test</scope>
323+
</dependency>
324+
325+
<dependency>
326+
<groupId>org.bitbucket.b_c</groupId>
327+
<artifactId>jose4j</artifactId>
328+
<version>${jose4j.version}</version>
329+
<scope>test</scope>
330+
</dependency>
331+
316332
<dependency>
317333
<groupId>org.openjdk.jmh</groupId>
318334
<artifactId>jmh-core</artifactId>

src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.Map;
3232
import java.util.concurrent.ScheduledExecutorService;
3333
import java.util.function.Consumer;
34+
import javax.net.ssl.SSLContext;
3435

3536
/**
3637
* API to configure and create an {@link Environment}.
@@ -517,4 +518,90 @@ interface NettyConfiguration {
517518
*/
518519
EnvironmentBuilder environmentBuilder();
519520
}
521+
522+
/**
523+
* OAuth 2 settings.
524+
*
525+
* @return OAuth 2 settings
526+
* @see OAuth2Configuration
527+
* @since 1.3.0
528+
*/
529+
OAuth2Configuration oauth2();
530+
531+
/**
532+
* Configuration to retrieve a token using the <a
533+
* href="https://tools.ietf.org/html/rfc6749#section-4.4">OAuth 2 Client Credentials flow</a>.
534+
*
535+
* @since 1.3.0
536+
*/
537+
interface OAuth2Configuration {
538+
539+
/**
540+
* Set the URI to access to get the token.
541+
*
542+
* <p>TLS is supported by providing a <code>HTTPS</code> URI and setting a {@link
543+
* javax.net.ssl.SSLContext}. See {@link #tls()} for more information. <em>Applications in
544+
* production should always use HTTPS to retrieve tokens.</em>
545+
*
546+
* @param uri access URI
547+
* @return OAuth 2 configuration
548+
* @see #sslContext(javax.net.ssl.SSLContext)
549+
*/
550+
OAuth2Configuration tokenEndpointUri(String uri);
551+
552+
/**
553+
* Set the OAuth 2 client ID
554+
*
555+
* <p>The client ID usually identifies the application that requests a token.
556+
*
557+
* @param clientId client ID
558+
* @return OAuth 2 configuration
559+
*/
560+
OAuth2Configuration clientId(String clientId);
561+
562+
/**
563+
* Set the secret (password) to use to get a token.
564+
*
565+
* @param clientSecret client secret
566+
* @return OAuth 2 configuration
567+
*/
568+
OAuth2Configuration clientSecret(String clientSecret);
569+
570+
/**
571+
* Set the grant type to use when requesting the token.
572+
*
573+
* <p>The default is <code>client_credentials</code>, but some OAuth 2 servers can use
574+
* non-standard grant types to request tokens with extra-information.
575+
*
576+
* @param grantType grant type
577+
* @return OAuth 2 configuration
578+
*/
579+
OAuth2Configuration grantType(String grantType);
580+
581+
/**
582+
* Set a parameter to pass in the request.
583+
*
584+
* <p>The OAuth 2 server may require extra parameters to narrow down the identity of the user.
585+
*
586+
* @param name name of the parameter
587+
* @param value value of the parameter
588+
* @return OAuth 2 configuration
589+
*/
590+
OAuth2Configuration parameter(String name, String value);
591+
592+
/**
593+
* {@link javax.net.ssl.SSLContext} for HTTPS requests.
594+
*
595+
* @param sslContext the SSL context
596+
* @return OAuth 2 configuration
597+
*/
598+
OAuth2Configuration sslContext(SSLContext sslContext);
599+
600+
/**
601+
* Go back to the environment builder
602+
*
603+
* @return the environment builder
604+
*/
605+
EnvironmentBuilder environmentBuilder();
606+
}
520607
}

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
import com.rabbitmq.stream.impl.ServerFrameHandler.FrameHandlerInfo;
4949
import com.rabbitmq.stream.metrics.MetricsCollector;
5050
import com.rabbitmq.stream.metrics.NoOpMetricsCollector;
51+
import com.rabbitmq.stream.oauth2.CredentialsManager;
52+
import com.rabbitmq.stream.oauth2.CredentialsManager.Registration;
5153
import com.rabbitmq.stream.sasl.CredentialsProvider;
5254
import com.rabbitmq.stream.sasl.DefaultSaslConfiguration;
5355
import com.rabbitmq.stream.sasl.DefaultUsernamePasswordCredentialsProvider;
@@ -115,6 +117,7 @@
115117
*/
116118
public class Client implements AutoCloseable {
117119

120+
private static final AtomicLong ID_SEQUENCE = new AtomicLong(0);
118121
private static final Charset CHARSET = StandardCharsets.UTF_8;
119122
public static final int DEFAULT_PORT = 5552;
120123
public static final int DEFAULT_TLS_PORT = 5551;
@@ -170,7 +173,6 @@ public long applyAsLong(Object value) {
170173
};
171174
private final AtomicInteger correlationSequence = new AtomicInteger(0);
172175
private final SaslConfiguration saslConfiguration;
173-
private final CredentialsProvider credentialsProvider;
174176
private final Runnable nettyClosing;
175177
private final int maxFrameSize;
176178
private final boolean frameSizeCapped;
@@ -190,6 +192,7 @@ public long applyAsLong(Object value) {
190192
private final Runnable streamStatsCommandVersionsCheck;
191193
private final boolean filteringSupported;
192194
private final Runnable superStreamManagementCommandVersionsCheck;
195+
private final Registration credentialsRegistration;
193196

194197
@SuppressFBWarnings("CT_CONSTRUCTOR_THROW")
195198
public Client() {
@@ -206,7 +209,6 @@ public Client(ClientParameters parameters) {
206209
this.creditNotification = parameters.creditNotification;
207210
this.codec = parameters.codec == null ? Codecs.DEFAULT : parameters.codec;
208211
this.saslConfiguration = parameters.saslConfiguration;
209-
this.credentialsProvider = parameters.credentialsProvider;
210212
this.chunkChecksum = parameters.chunkChecksum;
211213
this.metricsCollector = parameters.metricsCollector;
212214
this.metadataListener = parameters.metadataListener;
@@ -381,8 +383,36 @@ public void initChannel(SocketChannel ch) {
381383
debug(() -> "starting SASL handshake");
382384
this.saslMechanisms = getSaslMechanisms();
383385
debug(() -> "SASL mechanisms supported by server ({})", this.saslMechanisms);
386+
387+
CredentialsProvider credentialsProvider = parameters.credentialsProvider;
388+
CredentialsManager credentialsManager = parameters.credentialsManager;
389+
CredentialsManager.AuthenticationCallback authCallback, renewCallback;
390+
String regName =
391+
clientConnectionName.isBlank()
392+
? String.valueOf(ID_SEQUENCE.getAndIncrement())
393+
: clientConnectionName + "-" + ID_SEQUENCE.getAndIncrement();
394+
if (credentialsManager == null) {
395+
this.credentialsRegistration = CredentialsManagerFactory.get();
396+
authCallback = (u, p) -> this.authenticate(credentialsProvider);
397+
} else {
398+
renewCallback =
399+
authCallback =
400+
(u, p) -> {
401+
if (u == null && p == null) {
402+
// no username/password provided by the credentials manager
403+
// using the credentials manager
404+
this.authenticate(credentialsProvider);
405+
} else {
406+
// the credentials manager provides username/password (e.g. after token
407+
// retrieval)
408+
// we use them with a one-time credentials provider
409+
this.authenticate(new DefaultUsernamePasswordCredentialsProvider(u, p));
410+
}
411+
};
412+
this.credentialsRegistration = credentialsManager.register(regName, renewCallback);
413+
}
384414
debug(() -> "starting authentication");
385-
authenticate(this.credentialsProvider);
415+
this.credentialsRegistration.connect(authCallback);
386416
debug(() -> "authenticated");
387417
this.tuneState.await(Duration.ofSeconds(10));
388418
this.maxFrameSize = this.tuneState.getMaxFrameSize();
@@ -1454,6 +1484,9 @@ void closingSequence(ShutdownContext.ShutdownReason reason) {
14541484
if (reason != null) {
14551485
this.shutdownListenerCallback.accept(reason);
14561486
}
1487+
if (this.credentialsRegistration != null) {
1488+
this.credentialsRegistration.close();
1489+
}
14571490
this.nettyClosing.run();
14581491
this.failOutstandingRequests();
14591492
if (this.closeDispatchingExecutorService != null) {
@@ -2378,6 +2411,10 @@ String label() {
23782411
}
23792412
}
23802413

2414+
static ClientParameters cp() {
2415+
return new ClientParameters();
2416+
}
2417+
23812418
public static class ClientParameters {
23822419

23832420
private final Map<String, String> clientProperties = new ConcurrentHashMap<>();
@@ -2410,6 +2447,7 @@ public static class ClientParameters {
24102447
private SaslConfiguration saslConfiguration = DefaultSaslConfiguration.PLAIN;
24112448
private CredentialsProvider credentialsProvider =
24122449
new DefaultUsernamePasswordCredentialsProvider(DEFAULT_USERNAME, "guest");
2450+
private CredentialsManager credentialsManager;
24132451
private ChunkChecksum chunkChecksum = JdkChunkChecksum.CRC32_SINGLETON;
24142452
private MetricsCollector metricsCollector = NoOpMetricsCollector.SINGLETON;
24152453
private SslContext sslContext;
@@ -2492,6 +2530,11 @@ public ClientParameters credentialsProvider(CredentialsProvider credentialsProvi
24922530
return this;
24932531
}
24942532

2533+
public ClientParameters credentialsManager(CredentialsManager credentialsManager) {
2534+
this.credentialsManager = credentialsManager;
2535+
return this;
2536+
}
2537+
24952538
public ClientParameters username(String username) {
24962539
if (this.credentialsProvider instanceof UsernamePasswordCredentialsProvider) {
24972540
this.credentialsProvider =

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,18 @@ SubscriptionState state() {
506506
return this.state.get();
507507
}
508508

509+
private void markConsuming() {
510+
if (this.consumer != null) {
511+
this.consumer.consuming();
512+
}
513+
}
514+
515+
private void markNotConsuming() {
516+
if (this.consumer != null) {
517+
this.consumer.notConsuming();
518+
}
519+
}
520+
509521
String label() {
510522
return String.format(
511523
"[id=%d, stream=%s, name=%s, consumer=%d]",
@@ -700,6 +712,7 @@ private ClientSubscriptionsManager(
700712
"Subscription connection has {} consumer(s) over {} stream(s) to recover",
701713
this.subscriptionTrackers.stream().filter(Objects::nonNull).count(),
702714
this.streamToStreamSubscriptions.size());
715+
iterate(this.subscriptionTrackers, SubscriptionTracker::markNotConsuming);
703716
environment
704717
.scheduledExecutorService()
705718
.execute(
@@ -774,6 +787,7 @@ private ClientSubscriptionsManager(
774787
}
775788

776789
if (affectedSubscriptions != null && !affectedSubscriptions.isEmpty()) {
790+
iterate(affectedSubscriptions, SubscriptionTracker::markNotConsuming);
777791
environment
778792
.scheduledExecutorService()
779793
.execute(
@@ -1132,6 +1146,7 @@ void add(
11321146
throw e;
11331147
}
11341148
subscriptionTracker.state(SubscriptionState.ACTIVE);
1149+
subscriptionTracker.markConsuming();
11351150
LOGGER.debug("Subscribed to '{}'", subscriptionTracker.stream);
11361151
} finally {
11371152
this.subscriptionManagerLock.unlock();
@@ -1397,4 +1412,13 @@ static Broker pickBroker(
13971412
Function<List<Broker>, Broker> picker, Collection<BrokerWrapper> candidates) {
13981413
return picker.apply(keepReplicasIfPossible(candidates));
13991414
}
1415+
1416+
private static void iterate(
1417+
Collection<SubscriptionTracker> l, java.util.function.Consumer<SubscriptionTracker> c) {
1418+
for (SubscriptionTracker tracker : l) {
1419+
if (tracker != null) {
1420+
c.accept(tracker);
1421+
}
1422+
}
1423+
}
14001424
}

0 commit comments

Comments
 (0)