Skip to content

Commit d9cd311

Browse files
committed
Sync ProtonJ2
1 parent 44f6857 commit d9cd311

25 files changed

+229
-212
lines changed

pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,9 @@
134134
<classifier>osx-x86_64</classifier>
135135
</dependency>
136136
<dependency>
137-
<groupId>io.netty.incubator</groupId>
138-
<artifactId>netty-incubator-transport-native-io_uring</artifactId>
139-
<version>${netty4.iouring.version}</version>
137+
<groupId>io.netty</groupId>
138+
<artifactId>netty-transport-native-io_uring</artifactId>
139+
<version>${netty4.version}</version>
140140
<classifier>linux-x86_64</classifier>
141141
</dependency>
142142
<!-- End of QPid dependencies -->

src/main/qpid/org/apache/qpid/protonj2/buffer/ProtonBufferAccessors.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,16 @@
2424
*/
2525
public interface ProtonBufferAccessors {
2626

27+
/**
28+
* Look ahead an return the next byte that would be read from a call to readByte or
29+
* a call to getByte at the current read offset.
30+
*
31+
* @return the next readable byte without advancing the read offset.
32+
*
33+
* @throws IndexOutOfBoundsException if there is no readable bytes left in the buffer.
34+
*/
35+
byte peekByte();
36+
2737
/**
2838
* Reads a single byte at the given index and returns it without modification to the target
2939
* buffer read offset.

src/main/qpid/org/apache/qpid/protonj2/buffer/impl/ProtonByteArrayBuffer.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,12 @@ public int hashCode() {
351351

352352
//----- Indexed Get operations
353353

354+
@Override
355+
public byte peekByte() {
356+
checkPeek();
357+
return ProtonBufferUtils.readByte(array, offset(readOffset));
358+
}
359+
354360
@Override
355361
public byte getByte(int index) {
356362
checkGet(index, Byte.BYTES);
@@ -965,6 +971,16 @@ private void checkWrite(int index, int size, boolean allowExpansion) {
965971
}
966972
}
967973

974+
private void checkPeek() {
975+
if (readOffset == writeOffset) {
976+
if (closed) {
977+
throw ProtonBufferUtils.genericBufferIsClosed(this);
978+
} else {
979+
throw ProtonBufferUtils.genericOutOfBounds(this, readOffset);
980+
}
981+
}
982+
}
983+
968984
private void checkRead(int index, int size) {
969985
if (index < 0 || writeOffset < index + size || closed) {
970986
if (closed) {

src/main/qpid/org/apache/qpid/protonj2/buffer/impl/ProtonCompositeBufferImpl.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,12 @@ public ProtonCompositeBuffer splitComponentsCeil(int splitOffset) {
758758

759759
//----- Offset based get operations
760760

761+
@Override
762+
public byte peekByte() {
763+
checkPeek();
764+
return findIndexedAccessor(readOffset, Byte.BYTES).getByte(readOffset);
765+
}
766+
761767
@Override
762768
public byte getByte(int index) {
763769
checkGetBounds(index, Byte.BYTES);
@@ -1426,6 +1432,12 @@ private static RuntimeException closeBuffers(ProtonBuffer[] source, ProtonBuffer
14261432

14271433
//----- Internal API for composite buffer
14281434

1435+
private void checkPeek() {
1436+
if (readOffset == writeOffset) {
1437+
throw generateIndexOutOfBounds(readOffset, false);
1438+
}
1439+
}
1440+
14291441
private void checkGetBounds(int index, int size) {
14301442
if (index < 0 || capacity < index + size) {
14311443
throw generateIndexOutOfBounds(index, false);
@@ -1701,6 +1713,11 @@ private int offset(int index) {
17011713
// fall within the available portion of the buffer in this chunk or
17021714
// an IOOBE will again be thrown.
17031715

1716+
@Override
1717+
public byte peekByte() {
1718+
return parent.buffers[chunkIndex].peekByte();
1719+
}
1720+
17041721
@Override
17051722
public byte getByte(int index) {
17061723
return parent.buffers[chunkIndex].getByte(offset(index));
@@ -1817,6 +1834,11 @@ public ProtonBufferAccessors prepare(int chunk) {
18171834
return this;
18181835
}
18191836

1837+
@Override
1838+
public byte peekByte() {
1839+
throw new UnsupportedOperationException();
1840+
}
1841+
18201842
@Override
18211843
public byte getByte(int index) {
18221844
throw new UnsupportedOperationException();

src/main/qpid/org/apache/qpid/protonj2/buffer/netty/Netty4ToProtonBufferAdapter.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -516,6 +516,12 @@ public int hashCode() {
516516

517517
//----- Primitive Get Methods
518518

519+
@Override
520+
public byte peekByte() {
521+
checkPeek();
522+
return resource.getByte(readOffset);
523+
}
524+
519525
@Override
520526
public byte getByte(int index) {
521527
checkGet(index, Byte.BYTES);
@@ -1035,6 +1041,16 @@ protected RuntimeException resourceIsClosedException() {
10351041

10361042
//----- Internal utilities for mapping to netty
10371043

1044+
private void checkPeek() {
1045+
if (readOffset == writeOffset) {
1046+
if (closed) {
1047+
throw ProtonBufferUtils.genericBufferIsClosed(this);
1048+
} else {
1049+
throw ProtonBufferUtils.genericOutOfBounds(this, readOffset);
1050+
}
1051+
}
1052+
}
1053+
10381054
private void checkRead(int index, int size) {
10391055
if (index < 0 || writeOffset < index + size || closed) {
10401056
if (closed) {

src/main/qpid/org/apache/qpid/protonj2/client/SaslOptions.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@
2424
/**
2525
* Connection options that are applied to the SASL layer.
2626
*/
27-
public class SaslOptions {
27+
public class SaslOptions implements Cloneable {
2828

29-
/**
30-
* The client default configuration value for SASL enabled state (default is true)
31-
*/
29+
/**
30+
* The client default configuration value for SASL enabled state (default is true)
31+
*/
3232
public static final boolean DEFAULT_SASL_ENABLED = true;
3333

3434
private boolean saslEnabled = DEFAULT_SASL_ENABLED;

src/main/qpid/org/apache/qpid/protonj2/client/impl/ClientDeliveryState.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,10 @@ public static class ClientAccepted extends ClientDeliveryState {
128128

129129
private static final ClientAccepted INSTANCE = new ClientAccepted();
130130

131+
private ClientAccepted() {
132+
// Singleton
133+
}
134+
131135
@Override
132136
public Type getType() {
133137
return Type.ACCEPTED;
@@ -153,6 +157,10 @@ public static class ClientReleased extends ClientDeliveryState {
153157

154158
private static final ClientReleased INSTANCE = new ClientReleased();
155159

160+
private ClientReleased() {
161+
// Singleton
162+
}
163+
156164
@Override
157165
public Type getType() {
158166
return Type.RELEASED;

src/main/qpid/org/apache/qpid/protonj2/client/impl/ClientNextReceiverSelector.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,6 @@ private ClientReceiver selectNextAvailable() {
162162
return result != null ? result : selectFirstAvailable();
163163
}
164164

165-
@SuppressWarnings("resource")
166165
private ClientReceiver selectFirstAvailable() {
167166
return session.getProtonSession().receivers().stream()
168167
.filter((r) -> r.getLinkedResource() instanceof ClientReceiver &&
@@ -172,7 +171,6 @@ private ClientReceiver selectFirstAvailable() {
172171
.orElse(null);
173172
}
174173

175-
@SuppressWarnings("resource")
176174
private ClientReceiver selectLargestBacklog() {
177175
return session.getProtonSession().receivers().stream()
178176
.filter((r) -> r.getLinkedResource() instanceof ClientReceiver &&
@@ -182,7 +180,6 @@ private ClientReceiver selectLargestBacklog() {
182180
.orElse(null);
183181
}
184182

185-
@SuppressWarnings("resource")
186183
private ClientReceiver selectSmallestBacklog() {
187184
return session.getProtonSession().receivers().stream()
188185
.filter((r) -> r.getLinkedResource() instanceof ClientReceiver &&

src/main/qpid/org/apache/qpid/protonj2/client/transport/netty4/IOUringSupport.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,16 @@
1818

1919
import java.util.concurrent.ThreadFactory;
2020

21+
import io.netty.channel.MultiThreadIoEventLoopGroup;
22+
import io.netty.channel.uring.IoUringIoHandler;
2123
import org.apache.qpid.protonj2.client.TransportOptions;
2224
import org.slf4j.Logger;
2325
import org.slf4j.LoggerFactory;
2426

2527
import io.netty.channel.Channel;
2628
import io.netty.channel.EventLoopGroup;
27-
import io.netty.incubator.channel.uring.IOUring;
28-
import io.netty.incubator.channel.uring.IOUringEventLoopGroup;
29-
import io.netty.incubator.channel.uring.IOUringSocketChannel;
29+
import io.netty.channel.uring.IoUring;
30+
import io.netty.channel.uring.IoUringSocketChannel;
3031

3132
public final class IOUringSupport {
3233

@@ -36,18 +37,18 @@ public final class IOUringSupport {
3637

3738
public static boolean isAvailable(TransportOptions transportOptions) {
3839
try {
39-
return transportOptions.allowNativeIO() && IOUring.isAvailable();
40+
return transportOptions.allowNativeIO() && IoUring.isAvailable();
4041
} catch (NoClassDefFoundError ncdfe) {
4142
LOG.debug("Unable to check for IO_Uring support due to missing class definition", ncdfe);
4243
return false;
4344
}
4445
}
4546

4647
public static EventLoopGroup createGroup(int nThreads, ThreadFactory ioThreadFactory) {
47-
return new IOUringEventLoopGroup(nThreads, ioThreadFactory);
48+
return new MultiThreadIoEventLoopGroup(nThreads, ioThreadFactory, IoUringIoHandler.newFactory());
4849
}
4950

5051
public static Class<? extends Channel> getChannelClass() {
51-
return IOUringSocketChannel.class;
52+
return IoUringSocketChannel.class;
5253
}
5354
}

src/main/qpid/org/apache/qpid/protonj2/client/transport/netty4/WebSocketTransport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ protected void addAdditionalHandlers(ChannelPipeline pipeline) {
153153
pipeline.addLast(new HttpClientCodec());
154154
pipeline.addLast(new HttpObjectAggregator(8192));
155155
if (options.webSocketCompression()) {
156-
pipeline.addLast(WebSocketClientCompressionHandler.INSTANCE);
156+
pipeline.addLast(new WebSocketClientCompressionHandler(0));
157157
}
158158
}
159159

0 commit comments

Comments
 (0)