Skip to content

Commit 5bdd701

Browse files
committed
Add tests with kqueue and native TLS
1 parent f0640af commit 5bdd701

File tree

3 files changed

+97
-59
lines changed

3 files changed

+97
-59
lines changed

pom.xml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
<paho.version>1.2.5</paho.version>
7272
<micrometer-tracing-test.version>1.5.2</micrometer-tracing-test.version>
7373
<micrometer-docs-generator.version>1.0.4</micrometer-docs-generator.version>
74+
<netty-tcnative.version>2.0.72.Final</netty-tcnative.version>
7475
<maven.compiler.plugin.version>3.14.0</maven.compiler.plugin.version>
7576
<maven-surefire-plugin.version>3.5.3</maven-surefire-plugin.version>
7677
<maven-dependency-plugin.version>3.8.1</maven-dependency-plugin.version>
@@ -217,6 +218,30 @@
217218
<scope>test</scope>
218219
</dependency>
219220

221+
<dependency>
222+
<groupId>io.netty</groupId>
223+
<artifactId>netty-transport-native-kqueue</artifactId>
224+
<version>${netty.version}</version>
225+
<classifier>osx-aarch_64</classifier>
226+
<scope>test</scope>
227+
</dependency>
228+
229+
<dependency>
230+
<groupId>io.netty</groupId>
231+
<artifactId>netty-tcnative-boringssl-static</artifactId>
232+
<version>${netty-tcnative.version}</version>
233+
<classifier>linux-x86_64</classifier>
234+
<scope>test</scope>
235+
</dependency>
236+
237+
<dependency>
238+
<groupId>io.netty</groupId>
239+
<artifactId>netty-tcnative-boringssl-static</artifactId>
240+
<version>${netty-tcnative.version}</version>
241+
<classifier>osx-aarch_64</classifier>
242+
<scope>test</scope>
243+
</dependency>
244+
220245
<dependency>
221246
<groupId>org.assertj</groupId>
222247
<artifactId>assertj-core</artifactId>

src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java

Lines changed: 48 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,12 @@
5252
import com.rabbitmq.stream.impl.TestUtils.DisabledIfTlsNotEnabled;
5353
import io.netty.channel.Channel;
5454
import io.netty.channel.EventLoopGroup;
55+
import io.netty.channel.IoHandlerFactory;
5556
import io.netty.channel.MultiThreadIoEventLoopGroup;
5657
import io.netty.channel.epoll.EpollIoHandler;
5758
import io.netty.channel.epoll.EpollSocketChannel;
59+
import io.netty.channel.kqueue.KQueueIoHandler;
60+
import io.netty.channel.kqueue.KQueueSocketChannel;
5861
import io.netty.handler.ssl.SslHandler;
5962
import java.net.ConnectException;
6063
import java.nio.charset.StandardCharsets;
@@ -743,41 +746,14 @@ void nettyInitializersAreCalled() {
743746
@EnabledOnOs(OS.LINUX)
744747
@EnabledIfSystemProperty(named = "os.arch", matches = "amd64")
745748
void nativeEpollWorksOnLinux() {
746-
int messageCount = 10_000;
747-
EventLoopGroup epollEventLoopGroup =
748-
new MultiThreadIoEventLoopGroup(EpollIoHandler.newFactory());
749-
try {
750-
Set<Channel> channels = ConcurrentHashMap.newKeySet();
751-
try (Environment env =
752-
environmentBuilder
753-
.netty()
754-
.eventLoopGroup(epollEventLoopGroup)
755-
.bootstrapCustomizer(b -> b.channel(EpollSocketChannel.class))
756-
.channelCustomizer(ch -> channels.add(ch))
757-
.environmentBuilder()
758-
.build()) {
759-
Producer producer = env.producerBuilder().stream(this.stream).build();
760-
ConfirmationHandler handler = confirmationStatus -> {};
761-
IntStream.range(0, messageCount)
762-
.forEach(
763-
i ->
764-
producer.send(
765-
producer
766-
.messageBuilder()
767-
.addData("hello".getBytes(StandardCharsets.UTF_8))
768-
.build(),
769-
handler));
770-
CountDownLatch latch = new CountDownLatch(messageCount);
771-
env.consumerBuilder().stream(this.stream)
772-
.offset(OffsetSpecification.first())
773-
.messageHandler((context, message) -> latch.countDown())
774-
.build();
775-
assertThat(latchAssert(latch)).completes();
776-
}
777-
assertThat(channels).isNotEmpty().allMatch(ch -> ch instanceof EpollSocketChannel);
778-
} finally {
779-
epollEventLoopGroup.shutdownGracefully(0, 0, SECONDS);
780-
}
749+
nativeIo(EpollIoHandler.newFactory(), EpollSocketChannel.class);
750+
}
751+
752+
@Test
753+
@EnabledOnOs(OS.MAC)
754+
@EnabledIfSystemProperty(named = "os.arch", matches = "aarch64")
755+
void nativeKqueueWorksOnMac() {
756+
nativeIo(KQueueIoHandler.newFactory(), KQueueSocketChannel.class);
781757
}
782758

783759
@Test
@@ -847,4 +823,41 @@ void storeOffset() {
847823
assertThat(client.queryOffset(ref, stream).getOffset()).isEqualTo(0);
848824
}
849825
}
826+
827+
private void nativeIo(IoHandlerFactory ioHandlerFactory, Class<? extends Channel> chClass) {
828+
int messageCount = 10_000;
829+
EventLoopGroup epollEventLoopGroup = new MultiThreadIoEventLoopGroup(ioHandlerFactory);
830+
try {
831+
Set<Channel> channels = ConcurrentHashMap.newKeySet();
832+
try (Environment env =
833+
environmentBuilder
834+
.netty()
835+
.eventLoopGroup(epollEventLoopGroup)
836+
.bootstrapCustomizer(b -> b.channel(chClass))
837+
.channelCustomizer(channels::add)
838+
.environmentBuilder()
839+
.build()) {
840+
Producer producer = env.producerBuilder().stream(this.stream).build();
841+
ConfirmationHandler handler = confirmationStatus -> {};
842+
IntStream.range(0, messageCount)
843+
.forEach(
844+
i ->
845+
producer.send(
846+
producer
847+
.messageBuilder()
848+
.addData("hello".getBytes(StandardCharsets.UTF_8))
849+
.build(),
850+
handler));
851+
CountDownLatch latch = new CountDownLatch(messageCount);
852+
env.consumerBuilder().stream(this.stream)
853+
.offset(OffsetSpecification.first())
854+
.messageHandler((context, message) -> latch.countDown())
855+
.build();
856+
assertThat(latchAssert(latch)).completes();
857+
}
858+
assertThat(channels).isNotEmpty().allMatch(ch -> ch.getClass().isAssignableFrom(chClass));
859+
} finally {
860+
epollEventLoopGroup.shutdownGracefully(0, 0, SECONDS);
861+
}
862+
}
850863
}

src/test/java/com/rabbitmq/stream/impl/TlsTest.java

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import com.rabbitmq.stream.sasl.DefaultSaslConfiguration;
3333
import io.netty.handler.ssl.SslContext;
3434
import io.netty.handler.ssl.SslContextBuilder;
35+
import io.netty.handler.ssl.SslProvider;
3536
import java.io.File;
3637
import java.io.FileInputStream;
3738
import java.net.InetAddress;
@@ -54,19 +55,26 @@
5455
import javax.net.ssl.SSLHandshakeException;
5556
import org.junit.jupiter.api.Test;
5657
import org.junit.jupiter.api.extension.ExtendWith;
58+
import org.junit.jupiter.params.Parameter;
59+
import org.junit.jupiter.params.ParameterizedClass;
60+
import org.junit.jupiter.params.provider.EnumSource;
5761

5862
@DisabledIfTlsNotEnabled
5963
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
64+
@ParameterizedClass
65+
@EnumSource(names = {"JDK", "OPENSSL"})
6066
public class TlsTest {
6167

68+
@Parameter SslProvider sslProvider;
69+
6270
String stream;
6371

6472
TestUtils.ClientFactory cf;
6573
int credit = 10;
6674

67-
static SslContext alwaysTrustSslContext() {
75+
SslContext alwaysTrustSslContext() {
6876
try {
69-
return SslContextBuilder.forClient().trustManager(TRUST_EVERYTHING_TRUST_MANAGER).build();
77+
return builder().trustManager(TRUST_EVERYTHING_TRUST_MANAGER).build();
7078
} catch (SSLException e) {
7179
throw new RuntimeException(e);
7280
}
@@ -191,7 +199,7 @@ void unverifiedConnection() {
191199
void verifiedConnectionWithCorrectServerCertificate() throws Exception {
192200
// in server certificate SAN
193201
String hostname = "localhost";
194-
SslContext context = SslContextBuilder.forClient().trustManager(caCertificate()).build();
202+
SslContext context = builder().trustManager(caCertificate()).build();
195203
cf.get(new ClientParameters().host(hostname).sslContext(context));
196204
}
197205

@@ -200,25 +208,22 @@ void verifiedConnectionWithCorrectServerCertificateWithSni() throws Exception {
200208
// not in server certificate SAN, but setting SNI makes it work
201209
String hostname = "127.0.0.1";
202210
SslContext context =
203-
SslContextBuilder.forClient()
204-
.trustManager(caCertificate())
205-
.serverName(new SNIHostName("localhost"))
206-
.build();
211+
builder().trustManager(caCertificate()).serverName(new SNIHostName("localhost")).build();
207212
cf.get(new ClientParameters().host(hostname).sslContext(context));
208213
}
209214

210215
@Test
211216
void verifiedConnectionWithCorrectServerCertificateFailsIfHostnameNotInSan() throws Exception {
212217
// not in server certificate SAN
213218
String hostname = "127.0.0.1";
214-
SslContext context = SslContextBuilder.forClient().trustManager(caCertificate()).build();
219+
SslContext context = builder().trustManager(caCertificate()).build();
215220
assertThatThrownBy(() -> cf.get(new ClientParameters().host(hostname).sslContext(context)))
216221
.hasCauseInstanceOf(SSLHandshakeException.class);
217222
}
218223

219224
@Test
220225
void verifiedConnectionWithWrongServerCertificate() throws Exception {
221-
SslContext context = SslContextBuilder.forClient().trustManager(clientCertificate()).build();
226+
SslContext context = builder().trustManager(clientCertificate()).build();
222227
assertThatThrownBy(() -> cf.get(new ClientParameters().sslContext(context)))
223228
.isInstanceOf(StreamException.class)
224229
.hasCauseInstanceOf(SSLHandshakeException.class);
@@ -227,7 +232,7 @@ void verifiedConnectionWithWrongServerCertificate() throws Exception {
227232
@Test
228233
void verifiedConnectionWithCorrectClientPrivateKey() throws Exception {
229234
SslContext context =
230-
SslContextBuilder.forClient()
235+
builder()
231236
.trustManager(caCertificate())
232237
.keyManager(clientKey(), clientCertificate())
233238
.build();
@@ -241,10 +246,7 @@ void verifiedConnectionWithCorrectClientPrivateKey() throws Exception {
241246
void saslExternalShouldSucceedWithUserForClientCertificate() throws Exception {
242247
X509Certificate clientCertificate = clientCertificate();
243248
SslContext context =
244-
SslContextBuilder.forClient()
245-
.trustManager(caCertificate())
246-
.keyManager(clientKey(), clientCertificate)
247-
.build();
249+
builder().trustManager(caCertificate()).keyManager(clientKey(), clientCertificate).build();
248250

249251
String username = clientCertificate.getSubjectX500Principal().getName();
250252
Cli.rabbitmqctlIgnoreError(format("delete_user %s", username));
@@ -268,10 +270,7 @@ void saslExternalShouldSucceedWithUserForClientCertificate() throws Exception {
268270
void saslExternalShouldFailIfNoUserForClientCertificate() throws Exception {
269271
X509Certificate clientCertificate = clientCertificate();
270272
SslContext context =
271-
SslContextBuilder.forClient()
272-
.trustManager(caCertificate())
273-
.keyManager(clientKey(), clientCertificate)
274-
.build();
273+
builder().trustManager(caCertificate()).keyManager(clientKey(), clientCertificate).build();
275274

276275
String username = clientCertificate.getSubjectX500Principal().getName();
277276
Cli.rabbitmqctlIgnoreError(format("delete_user %s", username));
@@ -288,7 +287,7 @@ void saslExternalShouldFailIfNoUserForClientCertificate() throws Exception {
288287

289288
@Test
290289
void hostnameVerificationShouldFailWhenSettingHostToLoopbackInterface() throws Exception {
291-
SslContext context = SslContextBuilder.forClient().trustManager(caCertificate()).build();
290+
SslContext context = builder().trustManager(caCertificate()).build();
292291
assertThatThrownBy(() -> cf.get(new ClientParameters().sslContext(context).host("127.0.0.1")))
293292
.isInstanceOf(StreamException.class)
294293
.hasCauseInstanceOf(SSLHandshakeException.class);
@@ -298,10 +297,7 @@ void hostnameVerificationShouldFailWhenSettingHostToLoopbackInterface() throws E
298297
void shouldConnectWhenSettingHostToLoopbackInterfaceAndDisablingHostnameVerification()
299298
throws Exception {
300299
SslContext context =
301-
SslContextBuilder.forClient()
302-
.endpointIdentificationAlgorithm(null)
303-
.trustManager(caCertificate())
304-
.build();
300+
builder().endpointIdentificationAlgorithm(null).trustManager(caCertificate()).build();
305301
cf.get(new ClientParameters().sslContext(context).host("127.0.0.1"));
306302
}
307303

@@ -325,7 +321,7 @@ void environmentPublisherConsumer() throws Exception {
325321
.uri("rabbitmq-stream+tls://localhost")
326322
.addressResolver(addr -> new Address("localhost", Client.DEFAULT_TLS_PORT))
327323
.tls()
328-
.sslContext(SslContextBuilder.forClient().trustManager(caCertificate()).build())
324+
.sslContext(builder().trustManager(caCertificate()).build())
329325
.environmentBuilder()
330326
.build()) {
331327

@@ -371,4 +367,8 @@ private static String hostname() {
371367
private static String tlsArtefactPath(String in) {
372368
return in.replace("$(hostname)", hostname()).replace("$(hostname -s)", hostname());
373369
}
370+
371+
private SslContextBuilder builder() {
372+
return SslContextBuilder.forClient().sslProvider(sslProvider);
373+
}
374374
}

0 commit comments

Comments
 (0)