Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion ci/start-broker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ cp -R "${PWD}"/tls-gen/basic/result/* rabbitmq-configuration/tls
chmod o+r rabbitmq-configuration/tls/*
chmod g+r rabbitmq-configuration/tls/*

echo "[rabbitmq_stream,rabbitmq_mqtt,rabbitmq_stomp,rabbitmq_amqp1_0,rabbitmq_auth_mechanism_ssl]." >> rabbitmq-configuration/enabled_plugins
echo "[rabbitmq_stream,rabbitmq_mqtt,rabbitmq_stomp,rabbitmq_amqp1_0,rabbitmq_auth_mechanism_ssl,rabbitmq_auth_backend_oauth2]." >> rabbitmq-configuration/enabled_plugins

echo "loopback_users = none
Expand All @@ -37,8 +37,25 @@ auth_mechanisms.1 = PLAIN
auth_mechanisms.2 = ANONYMOUS
auth_mechanisms.3 = EXTERNAL
auth_backends.1 = internal
auth_backends.2 = rabbit_auth_backend_oauth2
stream.listeners.ssl.1 = 5551" >> rabbitmq-configuration/rabbitmq.conf

echo "[
{rabbitmq_auth_backend_oauth2, [{key_config,
[{signing_keys,
#{<<\"token-key\">> =>
{map,
#{<<\"alg\">> => <<\"HS256\">>,
<<\"k\">> => <<\"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGH\">>,
<<\"kid\">> => <<\"token-key\">>,
<<\"kty\">> => <<\"oct\">>,
<<\"use\">> => <<\"sig\">>,
<<\"value\">> => <<\"token-key\">>}}}}]},
{resource_server_id,<<\"rabbitmq\">>}]}
]." >> rabbitmq-configuration/advanced.config

echo "Running RabbitMQ ${RABBITMQ_IMAGE}"

docker rm -f rabbitmq 2>/dev/null || echo "rabbitmq was not running"
Expand Down
32 changes: 24 additions & 8 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
<proton-j.version>0.34.1</proton-j.version>
<metrics.version>4.2.33</metrics.version>
<micrometer.version>1.15.3</micrometer.version>
<gson.version>2.13.1</gson.version>
<swiftmq-client.version>13.1.2</swiftmq-client.version>
<picocli.version>4.7.5</picocli.version>
<commons-compress.version>1.28.0</commons-compress.version>
Expand All @@ -66,10 +67,10 @@
<amqp-client.version>5.26.0</amqp-client.version>
<commons-lang3.version>3.18.0</commons-lang3.version>
<commons-codec.version>1.19.0</commons-codec.version>
<gson.version>2.13.1</gson.version>
<vavr.version>0.10.7</vavr.version>
<paho.version>1.2.5</paho.version>
<micrometer-tracing-test.version>1.5.3</micrometer-tracing-test.version>
<bouncycastle.version>1.81</bouncycastle.version>
<micrometer-docs-generator.version>1.0.4</micrometer-docs-generator.version>
<netty-tcnative.version>2.0.72.Final</netty-tcnative.version>
<maven.compiler.plugin.version>3.14.0</maven.compiler.plugin.version>
Expand Down Expand Up @@ -105,6 +106,7 @@
<central-publishing-maven-plugin.version>0.8.0</central-publishing-maven-plugin.version>
<maven.javadoc.skip>true</maven.javadoc.skip>
<gpg.skip>true</gpg.skip>
<jose4j.version>0.9.6</jose4j.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -185,6 +187,13 @@
<optional>true</optional>
</dependency>

<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>${gson.version}</version>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
Expand Down Expand Up @@ -285,13 +294,6 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>${gson.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.vavr</groupId>
<artifactId>vavr</artifactId>
Expand All @@ -313,6 +315,20 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk18on</artifactId>
<version>${bouncycastle.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.bitbucket.b_c</groupId>
<artifactId>jose4j</artifactId>
<version>${jose4j.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
Expand Down
25 changes: 25 additions & 0 deletions src/docs/asciidoc/advanced-topics.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,31 @@ A defined set of values shared across the messages is a good candidate: geograph
Cardinality of filter values can be from a few to a few thousands.
Extreme cardinality (a couple or dozens of thousands) can make filtering less efficient.

=== OAuth 2 Support

The client can authenticate against an OAuth 2 server like https://github.com/cloudfoundry/uaa[UAA].
It uses the https://tools.ietf.org/html/rfc6749#section-4.4[OAuth 2 Client Credentials flow].
The https://www.rabbitmq.com/docs/oauth2[OAuth 2 plugin] must be enabled on the server side and configured to use the same OAuth 2 server as the client.

How to retrieve the OAuth 2 token is configured at the environment level:

.Configuring OAuth 2 token retrieval
[source,java,indent=0]
--------
include::{test-examples}/EnvironmentUsage.java[tag=oauth2]
--------
<1> Access the OAuth 2 configuration
<2> Set the token endpoint URI
<3> Authenticate the client application
<4> Set the grant type
<5> Set optional parameters (depends on the OAuth 2 server)
<6> Set the SSL context (e.g. to verify and trust the identity of the OAuth 2 server)

The environment retrieves tokens and uses them to create stream connections.
It also takes care of refreshing the tokens before they expire and of re-authenticating existing connections so the broker does not close them when their token expires.

The environment uses the same token for all the connections it maintains.

=== Using Native `epoll`

The stream Java client uses the https://netty.io/[Netty] network framework and its Java NIO transport implementation by default.
Expand Down
87 changes: 87 additions & 0 deletions src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import javax.net.ssl.SSLContext;

/**
* API to configure and create an {@link Environment}.
Expand Down Expand Up @@ -517,4 +518,90 @@ interface NettyConfiguration {
*/
EnvironmentBuilder environmentBuilder();
}

/**
* OAuth 2 settings.
*
* @return OAuth 2 settings
* @see OAuth2Configuration
* @since 1.3.0
*/
OAuth2Configuration oauth2();

/**
* Configuration to retrieve a token using the <a
* href="https://tools.ietf.org/html/rfc6749#section-4.4">OAuth 2 Client Credentials flow</a>.
*
* @since 1.3.0
*/
interface OAuth2Configuration {

/**
* Set the URI to access to get the token.
*
* <p>TLS is supported by providing a <code>HTTPS</code> URI and setting a {@link
* javax.net.ssl.SSLContext}. See {@link #tls()} for more information. <em>Applications in
* production should always use HTTPS to retrieve tokens.</em>
*
* @param uri access URI
* @return OAuth 2 configuration
* @see #sslContext(javax.net.ssl.SSLContext)
*/
OAuth2Configuration tokenEndpointUri(String uri);

/**
* Set the OAuth 2 client ID
*
* <p>The client ID usually identifies the application that requests a token.
*
* @param clientId client ID
* @return OAuth 2 configuration
*/
OAuth2Configuration clientId(String clientId);

/**
* Set the secret (password) to use to get a token.
*
* @param clientSecret client secret
* @return OAuth 2 configuration
*/
OAuth2Configuration clientSecret(String clientSecret);

/**
* Set the grant type to use when requesting the token.
*
* <p>The default is <code>client_credentials</code>, but some OAuth 2 servers can use
* non-standard grant types to request tokens with extra-information.
*
* @param grantType grant type
* @return OAuth 2 configuration
*/
OAuth2Configuration grantType(String grantType);

/**
* Set a parameter to pass in the request.
*
* <p>The OAuth 2 server may require extra parameters to narrow down the identity of the user.
*
* @param name name of the parameter
* @param value value of the parameter
* @return OAuth 2 configuration
*/
OAuth2Configuration parameter(String name, String value);

/**
* {@link javax.net.ssl.SSLContext} for HTTPS requests.
*
* @param sslContext the SSL context
* @return OAuth 2 configuration
*/
OAuth2Configuration sslContext(SSLContext sslContext);

/**
* Go back to the environment builder
*
* @return the environment builder
*/
EnvironmentBuilder environmentBuilder();
}
}
49 changes: 46 additions & 3 deletions src/main/java/com/rabbitmq/stream/impl/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import com.rabbitmq.stream.impl.ServerFrameHandler.FrameHandlerInfo;
import com.rabbitmq.stream.metrics.MetricsCollector;
import com.rabbitmq.stream.metrics.NoOpMetricsCollector;
import com.rabbitmq.stream.oauth2.CredentialsManager;
import com.rabbitmq.stream.oauth2.CredentialsManager.Registration;
import com.rabbitmq.stream.sasl.CredentialsProvider;
import com.rabbitmq.stream.sasl.DefaultSaslConfiguration;
import com.rabbitmq.stream.sasl.DefaultUsernamePasswordCredentialsProvider;
Expand Down Expand Up @@ -115,6 +117,7 @@
*/
public class Client implements AutoCloseable {

private static final AtomicLong ID_SEQUENCE = new AtomicLong(0);
private static final Charset CHARSET = StandardCharsets.UTF_8;
public static final int DEFAULT_PORT = 5552;
public static final int DEFAULT_TLS_PORT = 5551;
Expand Down Expand Up @@ -170,7 +173,6 @@ public long applyAsLong(Object value) {
};
private final AtomicInteger correlationSequence = new AtomicInteger(0);
private final SaslConfiguration saslConfiguration;
private final CredentialsProvider credentialsProvider;
private final Runnable nettyClosing;
private final int maxFrameSize;
private final boolean frameSizeCapped;
Expand All @@ -190,6 +192,7 @@ public long applyAsLong(Object value) {
private final Runnable streamStatsCommandVersionsCheck;
private final boolean filteringSupported;
private final Runnable superStreamManagementCommandVersionsCheck;
private final Registration credentialsRegistration;

@SuppressFBWarnings("CT_CONSTRUCTOR_THROW")
public Client() {
Expand All @@ -206,7 +209,6 @@ public Client(ClientParameters parameters) {
this.creditNotification = parameters.creditNotification;
this.codec = parameters.codec == null ? Codecs.DEFAULT : parameters.codec;
this.saslConfiguration = parameters.saslConfiguration;
this.credentialsProvider = parameters.credentialsProvider;
this.chunkChecksum = parameters.chunkChecksum;
this.metricsCollector = parameters.metricsCollector;
this.metadataListener = parameters.metadataListener;
Expand Down Expand Up @@ -381,8 +383,36 @@ public void initChannel(SocketChannel ch) {
debug(() -> "starting SASL handshake");
this.saslMechanisms = getSaslMechanisms();
debug(() -> "SASL mechanisms supported by server ({})", this.saslMechanisms);

CredentialsProvider credentialsProvider = parameters.credentialsProvider;
CredentialsManager credentialsManager = parameters.credentialsManager;
CredentialsManager.AuthenticationCallback authCallback, renewCallback;
String regName =
clientConnectionName.isBlank()
? String.valueOf(ID_SEQUENCE.getAndIncrement())
: clientConnectionName + "-" + ID_SEQUENCE.getAndIncrement();
if (credentialsManager == null) {
this.credentialsRegistration = CredentialsManagerFactory.get();
authCallback = (u, p) -> this.authenticate(credentialsProvider);
} else {
renewCallback =
authCallback =
(u, p) -> {
if (u == null && p == null) {
// no username/password provided by the credentials manager
// using the credentials manager
this.authenticate(credentialsProvider);
} else {
// the credentials manager provides username/password (e.g. after token
// retrieval)
// we use them with a one-time credentials provider
this.authenticate(new DefaultUsernamePasswordCredentialsProvider(u, p));
}
};
this.credentialsRegistration = credentialsManager.register(regName, renewCallback);
}
debug(() -> "starting authentication");
authenticate(this.credentialsProvider);
this.credentialsRegistration.connect(authCallback);
debug(() -> "authenticated");
this.tuneState.await(Duration.ofSeconds(10));
this.maxFrameSize = this.tuneState.getMaxFrameSize();
Expand Down Expand Up @@ -1454,6 +1484,9 @@ void closingSequence(ShutdownContext.ShutdownReason reason) {
if (reason != null) {
this.shutdownListenerCallback.accept(reason);
}
if (this.credentialsRegistration != null) {
this.credentialsRegistration.close();
}
this.nettyClosing.run();
this.failOutstandingRequests();
if (this.closeDispatchingExecutorService != null) {
Expand Down Expand Up @@ -2378,6 +2411,10 @@ String label() {
}
}

static ClientParameters cp() {
return new ClientParameters();
}

public static class ClientParameters {

private final Map<String, String> clientProperties = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -2410,6 +2447,7 @@ public static class ClientParameters {
private SaslConfiguration saslConfiguration = DefaultSaslConfiguration.PLAIN;
private CredentialsProvider credentialsProvider =
new DefaultUsernamePasswordCredentialsProvider(DEFAULT_USERNAME, "guest");
private CredentialsManager credentialsManager;
private ChunkChecksum chunkChecksum = JdkChunkChecksum.CRC32_SINGLETON;
private MetricsCollector metricsCollector = NoOpMetricsCollector.SINGLETON;
private SslContext sslContext;
Expand Down Expand Up @@ -2492,6 +2530,11 @@ public ClientParameters credentialsProvider(CredentialsProvider credentialsProvi
return this;
}

public ClientParameters credentialsManager(CredentialsManager credentialsManager) {
this.credentialsManager = credentialsManager;
return this;
}

public ClientParameters username(String username) {
if (this.credentialsProvider instanceof UsernamePasswordCredentialsProvider) {
this.credentialsProvider =
Expand Down
Loading