Skip to content

Commit 80fa1fa

Browse files
authored
Merge pull request #809 from rabbitmq/oauth2
Support OAuth2 authentication
2 parents ea127e3 + 83bf238 commit 80fa1fa

32 files changed

+2529
-42
lines changed

ci/start-broker.sh

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ cp -R "${PWD}"/tls-gen/basic/result/* rabbitmq-configuration/tls
2020
chmod o+r rabbitmq-configuration/tls/*
2121
chmod g+r rabbitmq-configuration/tls/*
2222

23-
echo "[rabbitmq_stream,rabbitmq_mqtt,rabbitmq_stomp,rabbitmq_amqp1_0,rabbitmq_auth_mechanism_ssl]." >> rabbitmq-configuration/enabled_plugins
23+
echo "[rabbitmq_stream,rabbitmq_mqtt,rabbitmq_stomp,rabbitmq_amqp1_0,rabbitmq_auth_mechanism_ssl,rabbitmq_auth_backend_oauth2]." >> rabbitmq-configuration/enabled_plugins
2424

2525
echo "loopback_users = none
2626
@@ -37,8 +37,25 @@ auth_mechanisms.1 = PLAIN
3737
auth_mechanisms.2 = ANONYMOUS
3838
auth_mechanisms.3 = EXTERNAL
3939
40+
auth_backends.1 = internal
41+
auth_backends.2 = rabbit_auth_backend_oauth2
42+
4043
stream.listeners.ssl.1 = 5551" >> rabbitmq-configuration/rabbitmq.conf
4144

45+
echo "[
46+
{rabbitmq_auth_backend_oauth2, [{key_config,
47+
[{signing_keys,
48+
#{<<\"token-key\">> =>
49+
{map,
50+
#{<<\"alg\">> => <<\"HS256\">>,
51+
<<\"k\">> => <<\"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGH\">>,
52+
<<\"kid\">> => <<\"token-key\">>,
53+
<<\"kty\">> => <<\"oct\">>,
54+
<<\"use\">> => <<\"sig\">>,
55+
<<\"value\">> => <<\"token-key\">>}}}}]},
56+
{resource_server_id,<<\"rabbitmq\">>}]}
57+
]." >> rabbitmq-configuration/advanced.config
58+
4259
echo "Running RabbitMQ ${RABBITMQ_IMAGE}"
4360

4461
docker rm -f rabbitmq 2>/dev/null || echo "rabbitmq was not running"

pom.xml

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
<proton-j.version>0.34.1</proton-j.version>
5555
<metrics.version>4.2.33</metrics.version>
5656
<micrometer.version>1.15.3</micrometer.version>
57+
<gson.version>2.13.1</gson.version>
5758
<swiftmq-client.version>13.1.2</swiftmq-client.version>
5859
<picocli.version>4.7.5</picocli.version>
5960
<commons-compress.version>1.28.0</commons-compress.version>
@@ -66,10 +67,10 @@
6667
<amqp-client.version>5.26.0</amqp-client.version>
6768
<commons-lang3.version>3.18.0</commons-lang3.version>
6869
<commons-codec.version>1.19.0</commons-codec.version>
69-
<gson.version>2.13.1</gson.version>
7070
<vavr.version>0.10.7</vavr.version>
7171
<paho.version>1.2.5</paho.version>
7272
<micrometer-tracing-test.version>1.5.3</micrometer-tracing-test.version>
73+
<bouncycastle.version>1.81</bouncycastle.version>
7374
<micrometer-docs-generator.version>1.0.4</micrometer-docs-generator.version>
7475
<netty-tcnative.version>2.0.72.Final</netty-tcnative.version>
7576
<maven.compiler.plugin.version>3.14.0</maven.compiler.plugin.version>
@@ -105,6 +106,7 @@
105106
<central-publishing-maven-plugin.version>0.8.0</central-publishing-maven-plugin.version>
106107
<maven.javadoc.skip>true</maven.javadoc.skip>
107108
<gpg.skip>true</gpg.skip>
109+
<jose4j.version>0.9.6</jose4j.version>
108110
</properties>
109111

110112
<dependencies>
@@ -185,6 +187,13 @@
185187
<optional>true</optional>
186188
</dependency>
187189

190+
<dependency>
191+
<groupId>com.google.code.gson</groupId>
192+
<artifactId>gson</artifactId>
193+
<version>${gson.version}</version>
194+
<optional>true</optional>
195+
</dependency>
196+
188197
<dependency>
189198
<groupId>org.junit.jupiter</groupId>
190199
<artifactId>junit-jupiter-engine</artifactId>
@@ -285,13 +294,6 @@
285294
<scope>test</scope>
286295
</dependency>
287296

288-
<dependency>
289-
<groupId>com.google.code.gson</groupId>
290-
<artifactId>gson</artifactId>
291-
<version>${gson.version}</version>
292-
<scope>test</scope>
293-
</dependency>
294-
295297
<dependency>
296298
<groupId>io.vavr</groupId>
297299
<artifactId>vavr</artifactId>
@@ -313,6 +315,20 @@
313315
<scope>test</scope>
314316
</dependency>
315317

318+
<dependency>
319+
<groupId>org.bouncycastle</groupId>
320+
<artifactId>bcpkix-jdk18on</artifactId>
321+
<version>${bouncycastle.version}</version>
322+
<scope>test</scope>
323+
</dependency>
324+
325+
<dependency>
326+
<groupId>org.bitbucket.b_c</groupId>
327+
<artifactId>jose4j</artifactId>
328+
<version>${jose4j.version}</version>
329+
<scope>test</scope>
330+
</dependency>
331+
316332
<dependency>
317333
<groupId>org.openjdk.jmh</groupId>
318334
<artifactId>jmh-core</artifactId>

src/docs/asciidoc/advanced-topics.adoc

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,31 @@ A defined set of values shared across the messages is a good candidate: geograph
8686
Cardinality of filter values can be from a few to a few thousands.
8787
Extreme cardinality (a couple or dozens of thousands) can make filtering less efficient.
8888

89+
=== OAuth 2 Support
90+
91+
The client can authenticate against an OAuth 2 server like https://github.com/cloudfoundry/uaa[UAA].
92+
It uses the https://tools.ietf.org/html/rfc6749#section-4.4[OAuth 2 Client Credentials flow].
93+
The https://www.rabbitmq.com/docs/oauth2[OAuth 2 plugin] must be enabled on the server side and configured to use the same OAuth 2 server as the client.
94+
95+
How to retrieve the OAuth 2 token is configured at the environment level:
96+
97+
.Configuring OAuth 2 token retrieval
98+
[source,java,indent=0]
99+
--------
100+
include::{test-examples}/EnvironmentUsage.java[tag=oauth2]
101+
--------
102+
<1> Access the OAuth 2 configuration
103+
<2> Set the token endpoint URI
104+
<3> Authenticate the client application
105+
<4> Set the grant type
106+
<5> Set optional parameters (depends on the OAuth 2 server)
107+
<6> Set the SSL context (e.g. to verify and trust the identity of the OAuth 2 server)
108+
109+
The environment retrieves tokens and uses them to create stream connections.
110+
It also takes care of refreshing the tokens before they expire and of re-authenticating existing connections so the broker does not close them when their token expires.
111+
112+
The environment uses the same token for all the connections it maintains.
113+
89114
=== Using Native `epoll`
90115

91116
The stream Java client uses the https://netty.io/[Netty] network framework and its Java NIO transport implementation by default.

src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.Map;
3232
import java.util.concurrent.ScheduledExecutorService;
3333
import java.util.function.Consumer;
34+
import javax.net.ssl.SSLContext;
3435

3536
/**
3637
* API to configure and create an {@link Environment}.
@@ -517,4 +518,90 @@ interface NettyConfiguration {
517518
*/
518519
EnvironmentBuilder environmentBuilder();
519520
}
521+
522+
/**
523+
* OAuth 2 settings.
524+
*
525+
* @return OAuth 2 settings
526+
* @see OAuth2Configuration
527+
* @since 1.3.0
528+
*/
529+
OAuth2Configuration oauth2();
530+
531+
/**
532+
* Configuration to retrieve a token using the <a
533+
* href="https://tools.ietf.org/html/rfc6749#section-4.4">OAuth 2 Client Credentials flow</a>.
534+
*
535+
* @since 1.3.0
536+
*/
537+
interface OAuth2Configuration {
538+
539+
/**
540+
* Set the URI to access to get the token.
541+
*
542+
* <p>TLS is supported by providing a <code>HTTPS</code> URI and setting a {@link
543+
* javax.net.ssl.SSLContext}. See {@link #tls()} for more information. <em>Applications in
544+
* production should always use HTTPS to retrieve tokens.</em>
545+
*
546+
* @param uri access URI
547+
* @return OAuth 2 configuration
548+
* @see #sslContext(javax.net.ssl.SSLContext)
549+
*/
550+
OAuth2Configuration tokenEndpointUri(String uri);
551+
552+
/**
553+
* Set the OAuth 2 client ID
554+
*
555+
* <p>The client ID usually identifies the application that requests a token.
556+
*
557+
* @param clientId client ID
558+
* @return OAuth 2 configuration
559+
*/
560+
OAuth2Configuration clientId(String clientId);
561+
562+
/**
563+
* Set the secret (password) to use to get a token.
564+
*
565+
* @param clientSecret client secret
566+
* @return OAuth 2 configuration
567+
*/
568+
OAuth2Configuration clientSecret(String clientSecret);
569+
570+
/**
571+
* Set the grant type to use when requesting the token.
572+
*
573+
* <p>The default is <code>client_credentials</code>, but some OAuth 2 servers can use
574+
* non-standard grant types to request tokens with extra-information.
575+
*
576+
* @param grantType grant type
577+
* @return OAuth 2 configuration
578+
*/
579+
OAuth2Configuration grantType(String grantType);
580+
581+
/**
582+
* Set a parameter to pass in the request.
583+
*
584+
* <p>The OAuth 2 server may require extra parameters to narrow down the identity of the user.
585+
*
586+
* @param name name of the parameter
587+
* @param value value of the parameter
588+
* @return OAuth 2 configuration
589+
*/
590+
OAuth2Configuration parameter(String name, String value);
591+
592+
/**
593+
* {@link javax.net.ssl.SSLContext} for HTTPS requests.
594+
*
595+
* @param sslContext the SSL context
596+
* @return OAuth 2 configuration
597+
*/
598+
OAuth2Configuration sslContext(SSLContext sslContext);
599+
600+
/**
601+
* Go back to the environment builder
602+
*
603+
* @return the environment builder
604+
*/
605+
EnvironmentBuilder environmentBuilder();
606+
}
520607
}

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
import com.rabbitmq.stream.impl.ServerFrameHandler.FrameHandlerInfo;
4949
import com.rabbitmq.stream.metrics.MetricsCollector;
5050
import com.rabbitmq.stream.metrics.NoOpMetricsCollector;
51+
import com.rabbitmq.stream.oauth2.CredentialsManager;
52+
import com.rabbitmq.stream.oauth2.CredentialsManager.Registration;
5153
import com.rabbitmq.stream.sasl.CredentialsProvider;
5254
import com.rabbitmq.stream.sasl.DefaultSaslConfiguration;
5355
import com.rabbitmq.stream.sasl.DefaultUsernamePasswordCredentialsProvider;
@@ -115,6 +117,7 @@
115117
*/
116118
public class Client implements AutoCloseable {
117119

120+
private static final AtomicLong ID_SEQUENCE = new AtomicLong(0);
118121
private static final Charset CHARSET = StandardCharsets.UTF_8;
119122
public static final int DEFAULT_PORT = 5552;
120123
public static final int DEFAULT_TLS_PORT = 5551;
@@ -170,7 +173,6 @@ public long applyAsLong(Object value) {
170173
};
171174
private final AtomicInteger correlationSequence = new AtomicInteger(0);
172175
private final SaslConfiguration saslConfiguration;
173-
private final CredentialsProvider credentialsProvider;
174176
private final Runnable nettyClosing;
175177
private final int maxFrameSize;
176178
private final boolean frameSizeCapped;
@@ -190,6 +192,7 @@ public long applyAsLong(Object value) {
190192
private final Runnable streamStatsCommandVersionsCheck;
191193
private final boolean filteringSupported;
192194
private final Runnable superStreamManagementCommandVersionsCheck;
195+
private final Registration credentialsRegistration;
193196

194197
@SuppressFBWarnings("CT_CONSTRUCTOR_THROW")
195198
public Client() {
@@ -206,7 +209,6 @@ public Client(ClientParameters parameters) {
206209
this.creditNotification = parameters.creditNotification;
207210
this.codec = parameters.codec == null ? Codecs.DEFAULT : parameters.codec;
208211
this.saslConfiguration = parameters.saslConfiguration;
209-
this.credentialsProvider = parameters.credentialsProvider;
210212
this.chunkChecksum = parameters.chunkChecksum;
211213
this.metricsCollector = parameters.metricsCollector;
212214
this.metadataListener = parameters.metadataListener;
@@ -381,8 +383,36 @@ public void initChannel(SocketChannel ch) {
381383
debug(() -> "starting SASL handshake");
382384
this.saslMechanisms = getSaslMechanisms();
383385
debug(() -> "SASL mechanisms supported by server ({})", this.saslMechanisms);
386+
387+
CredentialsProvider credentialsProvider = parameters.credentialsProvider;
388+
CredentialsManager credentialsManager = parameters.credentialsManager;
389+
CredentialsManager.AuthenticationCallback authCallback, renewCallback;
390+
String regName =
391+
clientConnectionName.isBlank()
392+
? String.valueOf(ID_SEQUENCE.getAndIncrement())
393+
: clientConnectionName + "-" + ID_SEQUENCE.getAndIncrement();
394+
if (credentialsManager == null) {
395+
this.credentialsRegistration = CredentialsManagerFactory.get();
396+
authCallback = (u, p) -> this.authenticate(credentialsProvider);
397+
} else {
398+
renewCallback =
399+
authCallback =
400+
(u, p) -> {
401+
if (u == null && p == null) {
402+
// no username/password provided by the credentials manager
403+
// using the credentials manager
404+
this.authenticate(credentialsProvider);
405+
} else {
406+
// the credentials manager provides username/password (e.g. after token
407+
// retrieval)
408+
// we use them with a one-time credentials provider
409+
this.authenticate(new DefaultUsernamePasswordCredentialsProvider(u, p));
410+
}
411+
};
412+
this.credentialsRegistration = credentialsManager.register(regName, renewCallback);
413+
}
384414
debug(() -> "starting authentication");
385-
authenticate(this.credentialsProvider);
415+
this.credentialsRegistration.connect(authCallback);
386416
debug(() -> "authenticated");
387417
this.tuneState.await(Duration.ofSeconds(10));
388418
this.maxFrameSize = this.tuneState.getMaxFrameSize();
@@ -1454,6 +1484,9 @@ void closingSequence(ShutdownContext.ShutdownReason reason) {
14541484
if (reason != null) {
14551485
this.shutdownListenerCallback.accept(reason);
14561486
}
1487+
if (this.credentialsRegistration != null) {
1488+
this.credentialsRegistration.close();
1489+
}
14571490
this.nettyClosing.run();
14581491
this.failOutstandingRequests();
14591492
if (this.closeDispatchingExecutorService != null) {
@@ -2378,6 +2411,10 @@ String label() {
23782411
}
23792412
}
23802413

2414+
static ClientParameters cp() {
2415+
return new ClientParameters();
2416+
}
2417+
23812418
public static class ClientParameters {
23822419

23832420
private final Map<String, String> clientProperties = new ConcurrentHashMap<>();
@@ -2410,6 +2447,7 @@ public static class ClientParameters {
24102447
private SaslConfiguration saslConfiguration = DefaultSaslConfiguration.PLAIN;
24112448
private CredentialsProvider credentialsProvider =
24122449
new DefaultUsernamePasswordCredentialsProvider(DEFAULT_USERNAME, "guest");
2450+
private CredentialsManager credentialsManager;
24132451
private ChunkChecksum chunkChecksum = JdkChunkChecksum.CRC32_SINGLETON;
24142452
private MetricsCollector metricsCollector = NoOpMetricsCollector.SINGLETON;
24152453
private SslContext sslContext;
@@ -2492,6 +2530,11 @@ public ClientParameters credentialsProvider(CredentialsProvider credentialsProvi
24922530
return this;
24932531
}
24942532

2533+
public ClientParameters credentialsManager(CredentialsManager credentialsManager) {
2534+
this.credentialsManager = credentialsManager;
2535+
return this;
2536+
}
2537+
24952538
public ClientParameters username(String username) {
24962539
if (this.credentialsProvider instanceof UsernamePasswordCredentialsProvider) {
24972540
this.credentialsProvider =

0 commit comments

Comments
 (0)