3131import java .nio .BufferOverflowException ;
3232import java .nio .ByteBuffer ;
3333import java .nio .channels .SelectionKey ;
34+ import java .nio .charset .StandardCharsets ;
3435import java .util .Deque ;
3536import java .util .Iterator ;
3637import java .util .List ;
38+ import java .util .Map ;
3739import java .util .Queue ;
40+ import java .util .concurrent .ConcurrentHashMap ;
3841import java .util .concurrent .ConcurrentLinkedDeque ;
3942import java .util .concurrent .ConcurrentLinkedQueue ;
4043import java .util .concurrent .atomic .AtomicInteger ;
6871import org .apache .hc .core5 .http .nio .command .StaleCheckCommand ;
6972import org .apache .hc .core5 .http .protocol .HttpContext ;
7073import org .apache .hc .core5 .http .protocol .HttpProcessor ;
74+ import org .apache .hc .core5 .http .HttpHeaders ;
7175import org .apache .hc .core5 .http2 .H2ConnectionException ;
7276import org .apache .hc .core5 .http2 .H2Error ;
7377import org .apache .hc .core5 .http2 .H2StreamResetException ;
8589import org .apache .hc .core5 .http2 .nio .AsyncPingHandler ;
8690import org .apache .hc .core5 .http2 .nio .command .PingCommand ;
8791import org .apache .hc .core5 .http2 .nio .command .PushResponseCommand ;
92+ import org .apache .hc .core5 .http2 .priority .PriorityParamsParser ;
93+ import org .apache .hc .core5 .http2 .priority .PriorityValue ;
94+ import org .apache .hc .core5 .http2 .priority .PriorityFormatter ;
8895import org .apache .hc .core5 .io .CloseMode ;
8996import org .apache .hc .core5 .reactor .Command ;
9097import org .apache .hc .core5 .reactor .ProtocolIOSession ;
96103
97104abstract class AbstractH2StreamMultiplexer implements Identifiable , HttpConnection {
98105
99- private static final long CONNECTION_WINDOW_LOW_MARK = 10 * 1024 * 1024 ; // 10 MiB
106+ private static final long CONNECTION_WINDOW_LOW_MARK = 10 * 1024 * 1024 ;
100107
101108 enum ConnectionHandshake { READY , ACTIVE , GRACEFUL_SHUTDOWN , SHUTDOWN }
102109 enum SettingsHandshake { READY , TRANSMITTED , ACKED }
@@ -135,6 +142,9 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED }
135142 private EndpointDetails endpointDetails ;
136143 private boolean goAwayReceived ;
137144
145+ private final Map <Integer , PriorityValue > priorities = new ConcurrentHashMap <>();
146+ private volatile boolean peerNoRfc7540Priorities ;
147+
138148 AbstractH2StreamMultiplexer (
139149 final ProtocolIOSession ioSession ,
140150 final FrameFactory frameFactory ,
@@ -902,15 +912,13 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio
902912 consumeSettingsFrame (payload );
903913 remoteSettingState = SettingsHandshake .TRANSMITTED ;
904914 }
905- // Send ACK
906915 final RawFrame response = frameFactory .createSettingsAck ();
907916 commitFrame (response );
908917 remoteSettingState = SettingsHandshake .ACKED ;
909918 }
910919 }
911920 break ;
912921 case PRIORITY :
913- // Stream priority not supported
914922 break ;
915923 case PUSH_PROMISE : {
916924 acceptPushFrame ();
@@ -995,6 +1003,29 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio
9951003 }
9961004 ioSession .setEvent (SelectionKey .OP_WRITE );
9971005 break ;
1006+ case PRIORITY_UPDATE : {
1007+ if (streamId != 0 ) {
1008+ throw new H2ConnectionException (H2Error .PROTOCOL_ERROR , "PRIORITY_UPDATE must be on stream 0" );
1009+ }
1010+ final ByteBuffer payload = frame .getPayload ();
1011+ if (payload == null || payload .remaining () < 4 ) {
1012+ throw new H2ConnectionException (H2Error .FRAME_SIZE_ERROR , "Invalid PRIORITY_UPDATE payload" );
1013+ }
1014+ final int prioritizedId = payload .getInt () & 0x7fffffff ;
1015+ final int len = payload .remaining ();
1016+ final String field ;
1017+ if (len > 0 ) {
1018+ final byte [] b = new byte [len ];
1019+ payload .get (b );
1020+ field = new String (b , StandardCharsets .US_ASCII );
1021+ } else {
1022+ field = "" ;
1023+ }
1024+ final PriorityValue pv = PriorityParamsParser .parse (field ).toValueWithDefaults ();
1025+ priorities .put (prioritizedId , pv );
1026+ requestSessionOutput ();
1027+ }
1028+ break ;
9981029 }
9991030 }
10001031
@@ -1059,7 +1090,6 @@ private void consumeHeaderFrame(final RawFrame frame, final H2Stream stream) thr
10591090 }
10601091 final ByteBuffer payload = frame .getPayloadContent ();
10611092 if (frame .isFlagSet (FrameFlag .PRIORITY )) {
1062- // Priority not supported
10631093 payload .getInt ();
10641094 payload .get ();
10651095 }
@@ -1068,6 +1098,7 @@ private void consumeHeaderFrame(final RawFrame frame, final H2Stream stream) thr
10681098 if (streamListener != null ) {
10691099 streamListener .onHeaderInput (this , streamId , headers );
10701100 }
1101+ recordPriorityFromHeaders (streamId , headers );
10711102 stream .consumeHeader (headers , frame .isFlagSet (FrameFlag .END_STREAM ));
10721103 } else {
10731104 continuation .copyPayload (payload );
@@ -1086,6 +1117,7 @@ private void consumeContinuationFrame(final RawFrame frame, final H2Stream strea
10861117 if (streamListener != null ) {
10871118 streamListener .onHeaderInput (this , streamId , headers );
10881119 }
1120+ recordPriorityFromHeaders (streamId , headers );
10891121 if (continuation .type == FrameType .PUSH_PROMISE .getValue ()) {
10901122 stream .consumePromise (headers );
10911123 } else {
@@ -1142,6 +1174,9 @@ private void consumeSettingsFrame(final ByteBuffer payload) throws IOException {
11421174 throw new H2ConnectionException (H2Error .PROTOCOL_ERROR , ex .getMessage ());
11431175 }
11441176 break ;
1177+ case SETTINGS_NO_RFC7540_PRIORITIES :
1178+ peerNoRfc7540Priorities = value == 1 ;
1179+ break ;
11451180 }
11461181 }
11471182 }
@@ -1334,6 +1369,38 @@ H2Stream createStream(final H2StreamChannel channel, final H2StreamHandler strea
13341369 return streams .createActive (channel , streamHandler );
13351370 }
13361371
1372+ public final void sendPriorityUpdate (final int prioritizedStreamId , final PriorityValue value ) throws IOException {
1373+ if (value == null ) {
1374+ return ;
1375+ }
1376+ final String field = PriorityFormatter .format (value );
1377+ if (field == null ) {
1378+ return ;
1379+ }
1380+ final byte [] ascii = field .getBytes (StandardCharsets .US_ASCII );
1381+ final ByteArrayBuffer buf = new ByteArrayBuffer (4 + ascii .length );
1382+ buf .append ((byte ) (prioritizedStreamId >> 24 ));
1383+ buf .append ((byte ) (prioritizedStreamId >> 16 ));
1384+ buf .append ((byte ) (prioritizedStreamId >> 8 ));
1385+ buf .append ((byte ) prioritizedStreamId );
1386+ buf .append (ascii , 0 , ascii .length );
1387+ final RawFrame frame = frameFactory .createPriorityUpdate (ByteBuffer .wrap (buf .array (), 0 , buf .length ()));
1388+ commitFrame (frame );
1389+ }
1390+
1391+ private void recordPriorityFromHeaders (final int streamId , final List <? extends Header > headers ) {
1392+ if (headers == null || headers .isEmpty ()) {
1393+ return ;
1394+ }
1395+ for (final Header h : headers ) {
1396+ if (HttpHeaders .PRIORITY .equalsIgnoreCase (h .getName ())) {
1397+ final PriorityValue pv = PriorityParamsParser .parse (h .getValue ()).toValueWithDefaults ();
1398+ priorities .put (streamId , pv );
1399+ break ;
1400+ }
1401+ }
1402+ }
1403+
13371404 class H2StreamChannelImpl implements H2StreamChannel {
13381405
13391406 private final int id ;
@@ -1381,6 +1448,25 @@ public void submit(final List<Header> headers, final boolean endStream) throws I
13811448 return ;
13821449 }
13831450 ensureNotClosed ();
1451+ if (peerNoRfc7540Priorities && streams .isSameSide (id )) {
1452+ for (final Header h : headers ) {
1453+ if (HttpHeaders .PRIORITY .equalsIgnoreCase (h .getName ())) {
1454+ final byte [] ascii = h .getValue () != null
1455+ ? h .getValue ().getBytes (StandardCharsets .US_ASCII )
1456+ : new byte [0 ];
1457+ final ByteArrayBuffer b = new ByteArrayBuffer (4 + ascii .length );
1458+ b .append ((byte ) (id >> 24 ));
1459+ b .append ((byte ) (id >> 16 ));
1460+ b .append ((byte ) (id >> 8 ));
1461+ b .append ((byte ) id );
1462+ b .append (ascii , 0 , ascii .length );
1463+ final ByteBuffer pl = ByteBuffer .wrap (b .array (), 0 , b .length ());
1464+ final RawFrame priUpd = new RawFrame (FrameType .PRIORITY_UPDATE .getValue (), 0 , 0 , pl );
1465+ commitFrameInternal (priUpd );
1466+ break ;
1467+ }
1468+ }
1469+ }
13841470 commitHeaders (id , headers , endStream );
13851471 if (endStream ) {
13861472 localClosed = true ;
@@ -1518,4 +1604,4 @@ public String toString() {
15181604
15191605 }
15201606
1521- }
1607+ }
0 commit comments