Skip to content

Commit 191670c

Browse files
committed
Sync ProtonJ2 with upstream
1 parent 5653115 commit 191670c

File tree

10 files changed

+178
-43
lines changed

10 files changed

+178
-43
lines changed

src/main/qpid/org/apache/qpid/protonj2/client/impl/ClientConnection.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -620,8 +620,6 @@ void flush() {
620620
//----- Private implementation events handlers and utility methods
621621

622622
private void handleLocalOpen(org.apache.qpid.protonj2.engine.Connection connection) {
623-
connection.tickAuto(getScheduler());
624-
625623
if (options.openTimeout() > 0) {
626624
executor.schedule(() -> {
627625
if (!openFuture.isDone()) {
@@ -921,6 +919,7 @@ private void initializeProtonResources(ReconnectLocation location) throws Client
921919
.localCloseHandler(this::handleLocalClose)
922920
.openHandler(this::handleRemoteOpen)
923921
.closeHandler(this::handleRemoteClose);
922+
protonConnection.tickAuto(getScheduler());
924923

925924
configureEngineSaslSupport();
926925
}

src/main/qpid/org/apache/qpid/protonj2/client/transport/netty4/EpollSupport.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import io.netty.channel.MultiThreadIoEventLoopGroup;
2222
import io.netty.channel.epoll.EpollIoHandler;
23-
import io.netty.channel.nio.NioIoHandler;
2423
import org.apache.qpid.protonj2.client.TransportOptions;
2524
import org.slf4j.Logger;
2625
import org.slf4j.LoggerFactory;
@@ -37,19 +36,33 @@ public final class EpollSupport {
3736
public static final String NAME = "EPOLL";
3837

3938
public static boolean isAvailable(TransportOptions transportOptions) {
39+
return transportOptions.allowNativeIO() && isAvailable();
40+
}
41+
42+
public static boolean isAvailable() {
4043
try {
41-
return transportOptions.allowNativeIO() && Epoll.isAvailable();
44+
return Epoll.isAvailable();
4245
} catch (NoClassDefFoundError ncdfe) {
4346
LOG.debug("Unable to check for Epoll support due to missing class definition", ncdfe);
4447
return false;
4548
}
4649
}
4750

4851
public static EventLoopGroup createGroup(int nThreads, ThreadFactory ioThreadFactory) {
52+
ensureAvailability();
4953
return new MultiThreadIoEventLoopGroup(nThreads, ioThreadFactory, EpollIoHandler.newFactory());
5054
}
5155

5256
public static Class<? extends Channel> getChannelClass() {
57+
ensureAvailability();
58+
5359
return EpollSocketChannel.class;
5460
}
61+
62+
public static void ensureAvailability() {
63+
if (!isAvailable()) {
64+
throw new UnsupportedOperationException(
65+
"Netty Epoll support is not enabled because the Netty library indicates it is not present or disabled");
66+
}
67+
}
5568
}

src/main/qpid/org/apache/qpid/protonj2/client/transport/netty4/IOUringSupport.java

Lines changed: 53 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.qpid.protonj2.client.transport.netty4;
1818

19+
import java.lang.reflect.Method;
1920
import java.util.concurrent.ThreadFactory;
2021

2122
import io.netty.channel.MultiThreadIoEventLoopGroup;
@@ -26,29 +27,73 @@
2627

2728
import io.netty.channel.Channel;
2829
import io.netty.channel.EventLoopGroup;
29-
import io.netty.channel.uring.IoUring;
30-
import io.netty.channel.uring.IoUringSocketChannel;
3130

31+
@SuppressWarnings("unchecked")
3232
public final class IOUringSupport {
3333

3434
private static final Logger LOG = LoggerFactory.getLogger(IOUringSupport.class);
3535

3636
public static final String NAME = "IO_URING";
3737

38-
public static boolean isAvailable(TransportOptions transportOptions) {
38+
private static final boolean AVAILABLE;
39+
private static final Class<? extends Channel> SOCKET_CHANNEL_CLASS;
40+
41+
static {
42+
boolean available = false;
43+
Class<? extends Channel> socketChannelClass = null;
44+
45+
// Try for new Netty built in IoUring before falling back to incubator checks
3946
try {
40-
return transportOptions.allowNativeIO() && IoUring.isAvailable();
41-
} catch (NoClassDefFoundError ncdfe) {
42-
LOG.debug("Unable to check for IO_Uring support due to missing class definition", ncdfe);
43-
return false;
47+
final Class<?> ioUring = Class.forName("io.netty.channel.uring.IoUring");
48+
final Method isAvailable = ioUring.getDeclaredMethod("isAvailable", (Class<?>[])null);
49+
final Class<?> eventLoopGroup = Class.forName("io.netty.channel.MultiThreadIoEventLoopGroup");
50+
final Class<?> ioUringHandler = Class.forName("io.netty.channel.uring.IoUringIoHandler");
51+
final Class<?> ioUringHandlerFactory = Class.forName("io.netty.channel.IoHandlerFactory");
52+
53+
socketChannelClass = (Class<? extends Channel>) Class.forName("io.netty.channel.uring.IoUringSocketChannel");
54+
available = (boolean) isAvailable.invoke(null);
55+
} catch (Exception e) {
56+
LOG.debug("Unable to enable netty io_uring support due to error", e);
4457
}
58+
59+
if (!available) {
60+
try {
61+
final Class<?> ioUring = Class.forName("io.netty.incubator.channel.uring.IOUring");
62+
final Method isAvailable = ioUring.getDeclaredMethod("isAvailable");
63+
final Class<?> eventLoopGroup = Class.forName("io.netty.incubator.channel.uring.IOUringEventLoopGroup");
64+
65+
socketChannelClass = (Class<? extends Channel>) Class.forName("io.netty.incubator.channel.uring.IOUringSocketChannel");
66+
available = (boolean) isAvailable.invoke(null);
67+
} catch (Exception e) {
68+
LOG.debug("Unable to enable netty incubator io_uring support due to error", e);
69+
}
70+
}
71+
72+
AVAILABLE = available;
73+
SOCKET_CHANNEL_CLASS = socketChannelClass;
74+
}
75+
76+
public static boolean isAvailable(TransportOptions transportOptions) {
77+
return transportOptions.allowNativeIO() && AVAILABLE;
78+
}
79+
80+
public static boolean isAvailable() {
81+
return AVAILABLE;
4582
}
4683

4784
public static EventLoopGroup createGroup(int nThreads, ThreadFactory ioThreadFactory) {
4885
return new MultiThreadIoEventLoopGroup(nThreads, ioThreadFactory, IoUringIoHandler.newFactory());
4986
}
5087

5188
public static Class<? extends Channel> getChannelClass() {
52-
return IoUringSocketChannel.class;
89+
ensureAvailability();
90+
return SOCKET_CHANNEL_CLASS;
91+
}
92+
93+
public static void ensureAvailability() {
94+
if (!AVAILABLE) {
95+
throw new UnsupportedOperationException(
96+
"Netty io_ring support is not enabled because the Netty library indicates it is not present or disabled");
97+
}
5398
}
5499
}

src/main/qpid/org/apache/qpid/protonj2/client/transport/netty4/KQueueSupport.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,33 @@ public final class KQueueSupport {
3636
public static final String NAME = "KQUEUE";
3737

3838
public static boolean isAvailable(TransportOptions transportOptions) {
39+
return transportOptions.allowNativeIO() && isAvailable();
40+
}
41+
42+
public static boolean isAvailable() {
3943
try {
40-
return transportOptions.allowNativeIO() && KQueue.isAvailable();
44+
return KQueue.isAvailable();
4145
} catch (NoClassDefFoundError ncdfe) {
4246
LOG.debug("Unable to check for KQueue support due to missing class definition", ncdfe);
4347
return false;
4448
}
4549
}
4650

4751
public static EventLoopGroup createGroup(int nThreads, ThreadFactory ioThreadFactory) {
48-
return new MultiThreadIoEventLoopGroup(nThreads, KQueueIoHandler.newFactory());
52+
ensureAvailability();
53+
return new MultiThreadIoEventLoopGroup(nThreads, ioThreadFactory, KQueueIoHandler.newFactory());
4954
}
5055

5156
public static Class<? extends Channel> getChannelClass() {
57+
ensureAvailability();
58+
5259
return KQueueSocketChannel.class;
5360
}
61+
62+
public static void ensureAvailability() {
63+
if (!isAvailable()) {
64+
throw new UnsupportedOperationException(
65+
"Netty KQueue support is not enabled because the Netty library indicates it is not present or disabled");
66+
}
67+
}
5468
}

src/main/qpid/org/apache/qpid/protonj2/client/transport/netty4/SslSupport.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -187,12 +187,7 @@ public static SSLEngine createJdkSslEngine(String host, int port, SSLContext con
187187
engine.setEnabledProtocols(buildEnabledProtocols(engine, options));
188188
engine.setEnabledCipherSuites(buildEnabledCipherSuites(engine, options));
189189
engine.setUseClientMode(true);
190-
191-
if (options.verifyHost()) {
192-
SSLParameters sslParameters = engine.getSSLParameters();
193-
sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
194-
engine.setSSLParameters(sslParameters);
195-
}
190+
engine.setSSLParameters(createSSLParameters(engine, options));
196191

197192
return engine;
198193
}
@@ -274,12 +269,25 @@ public static SSLEngine createOpenSslEngine(ByteBufAllocator allocator, String h
274269
engine.setEnabledProtocols(buildEnabledProtocols(engine, options));
275270
engine.setEnabledCipherSuites(buildEnabledCipherSuites(engine, options));
276271
engine.setUseClientMode(true);
272+
engine.setSSLParameters(createSSLParameters(engine, options));
277273

278274
return engine;
279275
}
280276

281277
//----- Internal support methods -----------------------------------------//
282278

279+
private static SSLParameters createSSLParameters(SSLEngine engine, SslOptions options) {
280+
final SSLParameters sslParameters = engine.getSSLParameters();
281+
282+
if (options.verifyHost()) {
283+
sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
284+
} else {
285+
sslParameters.setEndpointIdentificationAlgorithm(null);
286+
}
287+
288+
return sslParameters;
289+
}
290+
283291
private static String[] buildEnabledProtocols(SSLEngine engine, SslOptions options) {
284292
List<String> enabledProtocols = new ArrayList<String>();
285293

src/main/qpid/org/apache/qpid/protonj2/client/transport/netty4/TcpTransport.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import io.netty.bootstrap.Bootstrap;
4242
import io.netty.buffer.ByteBuf;
4343
import io.netty.channel.Channel;
44+
import io.netty.channel.ChannelFuture;
4445
import io.netty.channel.ChannelFutureListener;
4546
import io.netty.channel.ChannelHandlerContext;
4647
import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -155,7 +156,15 @@ public void initChannel(Channel transportChannel) throws Exception {
155156

156157
configureNetty(bootstrap, options);
157158

158-
bootstrap.connect(getHost(), getPort()).addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
159+
bootstrap.connect(getHost(), getPort()).addListener(new ChannelFutureListener() {
160+
161+
@Override
162+
public void operationComplete(ChannelFuture future) throws Exception {
163+
if (!future.isSuccess()) {
164+
handleTransportFailure(future.channel(), future.cause());
165+
}
166+
}
167+
});
159168

160169
return this;
161170
}

src/main/qpid/org/apache/qpid/protonj2/codec/Encoder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -591,7 +591,7 @@ public interface Encoder {
591591
void writeBinary(ProtonBuffer buffer, EncoderState state, ProtonBuffer value) throws EncodeException;
592592

593593
/**
594-
* Writes the contents of the given {@link byte[]} value into the provided {@link ProtonBuffer}
594+
* Writes the contents of the given <code>byte[]</code> value into the provided {@link ProtonBuffer}
595595
* instance as an AMQP Binary type.
596596
* <p>
597597
* If the provided value to write is null an AMQP null type is encoded into the target buffer.

src/main/qpid/org/apache/qpid/protonj2/engine/Engine.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,12 +187,16 @@ default void accept(ProtonBuffer input) throws EngineStateException {
187187
* Allows the engine to manage idle timeout processing by providing it the single threaded executor
188188
* context where all transport work is done which ensures singled threaded access while removing the
189189
* need for the client library or server application to manage calls to the {@link Engine#tick} methods.
190+
* <p>
191+
* The API should allow for configuring auto idle timeout handling before the connection has opened and
192+
* should react to both local and remote open performatives passing through the engine to configure read
193+
* and write checks under the constraints of the local and remote idle timeout configurations.
190194
*
191195
* @param executor
192196
* The single threaded execution context where all engine work takes place.
193197
*
194198
* @throws IllegalStateException if the {@link Engine} is already performing auto tick handling.
195-
* @throws EngineStateException if the Engine state precludes accepting new input.
199+
* @throws EngineStateException if the Engine state precludes accepting new input (shutdown or failed).
196200
*
197201
* @return this {@link Engine}
198202
*/
@@ -202,12 +206,16 @@ default void accept(ProtonBuffer input) throws EngineStateException {
202206
* Allows the engine to manage idle timeout processing by providing it the single threaded executor
203207
* context where all transport work is done which ensures singled threaded access while removing the
204208
* need for the client library or server application to manage calls to the {@link Engine#tick} methods.
209+
* <p>
210+
* The API should allow for configuring auto idle timeout handling before the connection has opened and
211+
* should react to both local and remote open performatives passing through the engine to configure read
212+
* and write checks under the constraints of the local and remote idle timeout configurations.
205213
*
206214
* @param scheduler
207215
* The single threaded execution context where all engine work takes place.
208216
*
209217
* @throws IllegalStateException if the {@link Engine} is already performing auto tick handling.
210-
* @throws EngineStateException if the Engine state precludes accepting new input.
218+
* @throws EngineStateException if the Engine state precludes accepting new input (shutdown or failed).
211219
*
212220
* @return this {@link Engine}
213221
*/

src/main/qpid/org/apache/qpid/protonj2/engine/impl/ProtonConnection.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public class ProtonConnection extends ProtonEndpoint<Connection> implements Conn
9494
* Create a new unbound Connection instance.
9595
*
9696
* @param engine
97-
* Parent engine that created and owns this {@link Connection} insatnce.
97+
* Parent engine that created and owns this {@link Connection} instance.
9898
*/
9999
ProtonConnection(ProtonEngine engine) {
100100
super(engine);
@@ -128,6 +128,7 @@ public ProtonConnection open() throws EngineStateException {
128128
syncLocalStateWithRemote();
129129
} finally {
130130
fireLocalOpen();
131+
engine.handleLocalOpen(this);
131132
}
132133
}
133134

@@ -446,6 +447,7 @@ public void handleOpen(Open open, ProtonBuffer payload, int channel, ProtonEngin
446447
remoteOpen = open;
447448

448449
fireRemoteOpen();
450+
engine.handleRemoteOpen(this);
449451
}
450452

451453
@Override

0 commit comments

Comments
 (0)