3131import java .nio .BufferOverflowException ;
3232import java .nio .ByteBuffer ;
3333import java .nio .channels .SelectionKey ;
34+ import java .nio .charset .StandardCharsets ;
3435import java .util .Deque ;
3536import java .util .Iterator ;
3637import java .util .List ;
38+ import java .util .Map ;
3739import java .util .Queue ;
40+ import java .util .concurrent .ConcurrentHashMap ;
3841import java .util .concurrent .ConcurrentLinkedDeque ;
3942import java .util .concurrent .ConcurrentLinkedQueue ;
4043import java .util .concurrent .atomic .AtomicInteger ;
6669import org .apache .hc .core5 .http .nio .command .ShutdownCommand ;
6770import org .apache .hc .core5 .http .protocol .HttpContext ;
6871import org .apache .hc .core5 .http .protocol .HttpProcessor ;
72+ import org .apache .hc .core5 .http .HttpHeaders ;
6973import org .apache .hc .core5 .http2 .H2ConnectionException ;
7074import org .apache .hc .core5 .http2 .H2Error ;
7175import org .apache .hc .core5 .http2 .H2StreamResetException ;
8387import org .apache .hc .core5 .http2 .nio .AsyncPingHandler ;
8488import org .apache .hc .core5 .http2 .nio .command .PingCommand ;
8589import org .apache .hc .core5 .http2 .nio .command .PushResponseCommand ;
90+ import org .apache .hc .core5 .http2 .priority .PriorityParamsParser ;
91+ import org .apache .hc .core5 .http2 .priority .PriorityValue ;
92+ import org .apache .hc .core5 .http2 .priority .PriorityFormatter ;
8693import org .apache .hc .core5 .io .CloseMode ;
8794import org .apache .hc .core5 .reactor .Command ;
8895import org .apache .hc .core5 .reactor .ProtocolIOSession ;
94101
95102abstract class AbstractH2StreamMultiplexer implements Identifiable , HttpConnection {
96103
97- private static final long CONNECTION_WINDOW_LOW_MARK = 10 * 1024 * 1024 ; // 10 MiB
104+ private static final long CONNECTION_WINDOW_LOW_MARK = 10 * 1024 * 1024 ;
98105
99106 enum ConnectionHandshake { READY , ACTIVE , GRACEFUL_SHUTDOWN , SHUTDOWN }
100107 enum SettingsHandshake { READY , TRANSMITTED , ACKED }
@@ -133,6 +140,9 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED }
133140 private EndpointDetails endpointDetails ;
134141 private boolean goAwayReceived ;
135142
143+ private final Map <Integer , PriorityValue > priorities = new ConcurrentHashMap <>();
144+ private volatile boolean peerNoRfc7540Priorities ;
145+
136146 AbstractH2StreamMultiplexer (
137147 final ProtocolIOSession ioSession ,
138148 final FrameFactory frameFactory ,
@@ -892,15 +902,13 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio
892902 consumeSettingsFrame (payload );
893903 remoteSettingState = SettingsHandshake .TRANSMITTED ;
894904 }
895- // Send ACK
896905 final RawFrame response = frameFactory .createSettingsAck ();
897906 commitFrame (response );
898907 remoteSettingState = SettingsHandshake .ACKED ;
899908 }
900909 }
901910 break ;
902911 case PRIORITY :
903- // Stream priority not supported
904912 break ;
905913 case PUSH_PROMISE : {
906914 acceptPushFrame ();
@@ -985,6 +993,29 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio
985993 }
986994 ioSession .setEvent (SelectionKey .OP_WRITE );
987995 break ;
996+ case PRIORITY_UPDATE : {
997+ if (streamId != 0 ) {
998+ throw new H2ConnectionException (H2Error .PROTOCOL_ERROR , "PRIORITY_UPDATE must be on stream 0" );
999+ }
1000+ final ByteBuffer payload = frame .getPayload ();
1001+ if (payload == null || payload .remaining () < 4 ) {
1002+ throw new H2ConnectionException (H2Error .FRAME_SIZE_ERROR , "Invalid PRIORITY_UPDATE payload" );
1003+ }
1004+ final int prioritizedId = payload .getInt () & 0x7fffffff ;
1005+ final int len = payload .remaining ();
1006+ final String field ;
1007+ if (len > 0 ) {
1008+ final byte [] b = new byte [len ];
1009+ payload .get (b );
1010+ field = new String (b , StandardCharsets .US_ASCII );
1011+ } else {
1012+ field = "" ;
1013+ }
1014+ final PriorityValue pv = PriorityParamsParser .parse (field ).toValueWithDefaults ();
1015+ priorities .put (prioritizedId , pv );
1016+ requestSessionOutput ();
1017+ }
1018+ break ;
9881019 }
9891020 }
9901021
@@ -1049,7 +1080,6 @@ private void consumeHeaderFrame(final RawFrame frame, final H2Stream stream) thr
10491080 }
10501081 final ByteBuffer payload = frame .getPayloadContent ();
10511082 if (frame .isFlagSet (FrameFlag .PRIORITY )) {
1052- // Priority not supported
10531083 payload .getInt ();
10541084 payload .get ();
10551085 }
@@ -1058,6 +1088,7 @@ private void consumeHeaderFrame(final RawFrame frame, final H2Stream stream) thr
10581088 if (streamListener != null ) {
10591089 streamListener .onHeaderInput (this , streamId , headers );
10601090 }
1091+ recordPriorityFromHeaders (streamId , headers );
10611092 stream .consumeHeader (headers , frame .isFlagSet (FrameFlag .END_STREAM ));
10621093 } else {
10631094 continuation .copyPayload (payload );
@@ -1076,6 +1107,7 @@ private void consumeContinuationFrame(final RawFrame frame, final H2Stream strea
10761107 if (streamListener != null ) {
10771108 streamListener .onHeaderInput (this , streamId , headers );
10781109 }
1110+ recordPriorityFromHeaders (streamId , headers );
10791111 if (continuation .type == FrameType .PUSH_PROMISE .getValue ()) {
10801112 stream .consumePromise (headers );
10811113 } else {
@@ -1132,6 +1164,9 @@ private void consumeSettingsFrame(final ByteBuffer payload) throws IOException {
11321164 throw new H2ConnectionException (H2Error .PROTOCOL_ERROR , ex .getMessage ());
11331165 }
11341166 break ;
1167+ case SETTINGS_NO_RFC7540_PRIORITIES :
1168+ peerNoRfc7540Priorities = value == 1 ;
1169+ break ;
11351170 }
11361171 }
11371172 }
@@ -1324,6 +1359,38 @@ H2Stream createStream(final H2StreamChannel channel, final H2StreamHandler strea
13241359 return streams .createActive (channel , streamHandler );
13251360 }
13261361
1362+ public final void sendPriorityUpdate (final int prioritizedStreamId , final PriorityValue value ) throws IOException {
1363+ if (value == null ) {
1364+ return ;
1365+ }
1366+ final String field = PriorityFormatter .format (value );
1367+ if (field == null ) {
1368+ return ;
1369+ }
1370+ final byte [] ascii = field .getBytes (StandardCharsets .US_ASCII );
1371+ final ByteArrayBuffer buf = new ByteArrayBuffer (4 + ascii .length );
1372+ buf .append ((byte ) (prioritizedStreamId >> 24 ));
1373+ buf .append ((byte ) (prioritizedStreamId >> 16 ));
1374+ buf .append ((byte ) (prioritizedStreamId >> 8 ));
1375+ buf .append ((byte ) prioritizedStreamId );
1376+ buf .append (ascii , 0 , ascii .length );
1377+ final RawFrame frame = frameFactory .createPriorityUpdate (ByteBuffer .wrap (buf .array (), 0 , buf .length ()));
1378+ commitFrame (frame );
1379+ }
1380+
1381+ private void recordPriorityFromHeaders (final int streamId , final List <? extends Header > headers ) {
1382+ if (headers == null || headers .isEmpty ()) {
1383+ return ;
1384+ }
1385+ for (final Header h : headers ) {
1386+ if (HttpHeaders .PRIORITY .equalsIgnoreCase (h .getName ())) {
1387+ final PriorityValue pv = PriorityParamsParser .parse (h .getValue ()).toValueWithDefaults ();
1388+ priorities .put (streamId , pv );
1389+ break ;
1390+ }
1391+ }
1392+ }
1393+
13271394 class H2StreamChannelImpl implements H2StreamChannel {
13281395
13291396 private final int id ;
@@ -1371,6 +1438,25 @@ public void submit(final List<Header> headers, final boolean endStream) throws I
13711438 return ;
13721439 }
13731440 ensureNotClosed ();
1441+ if (peerNoRfc7540Priorities && streams .isSameSide (id )) {
1442+ for (final Header h : headers ) {
1443+ if (HttpHeaders .PRIORITY .equalsIgnoreCase (h .getName ())) {
1444+ final byte [] ascii = h .getValue () != null
1445+ ? h .getValue ().getBytes (StandardCharsets .US_ASCII )
1446+ : new byte [0 ];
1447+ final ByteArrayBuffer b = new ByteArrayBuffer (4 + ascii .length );
1448+ b .append ((byte ) (id >> 24 ));
1449+ b .append ((byte ) (id >> 16 ));
1450+ b .append ((byte ) (id >> 8 ));
1451+ b .append ((byte ) id );
1452+ b .append (ascii , 0 , ascii .length );
1453+ final ByteBuffer pl = ByteBuffer .wrap (b .array (), 0 , b .length ());
1454+ final RawFrame priUpd = new RawFrame (FrameType .PRIORITY_UPDATE .getValue (), 0 , 0 , pl );
1455+ commitFrameInternal (priUpd );
1456+ break ;
1457+ }
1458+ }
1459+ }
13741460 commitHeaders (id , headers , endStream );
13751461 if (endStream ) {
13761462 localClosed = true ;
@@ -1508,4 +1594,4 @@ public String toString() {
15081594
15091595 }
15101596
1511- }
1597+ }
0 commit comments