Skip to content

Commit abec5d8

Browse files
committed
[enh] Broker: make max size of Consumer metadata configurable (#15713)
(cherry picked from commit 3dbf1f5)
1 parent ee43c96 commit abec5d8

File tree

4 files changed

+24
-17
lines changed

4 files changed

+24
-17
lines changed

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -759,6 +759,10 @@ public class ServiceConfiguration implements PulsarConfiguration {
759759
+ " unacked messages than this percentage limit and subscription will not receive any new messages "
760760
+ " until that subscription acks back `limit/2` messages")
761761
private double maxUnackedMessagesPerSubscriptionOnBrokerBlocked = 0.16;
762+
@FieldContext(
763+
category = CATEGORY_POLICIES,
764+
doc = "Maximum size of Consumer metadata")
765+
private int maxConsumerMetadataSize = 1024;
762766
@FieldContext(
763767
category = CATEGORY_POLICIES,
764768
dynamic = true,

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -983,7 +983,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
983983

984984
log.info("[{}] Subscribing on topic {} / {}", remoteAddress, topicName, subscriptionName);
985985
try {
986-
Metadata.validateMetadata(metadata);
986+
Metadata.validateMetadata(metadata,
987+
service.getPulsar().getConfiguration().getMaxConsumerMetadataSize());
987988
} catch (IllegalArgumentException iae) {
988989
final String msg = iae.getMessage();
989990
consumers.remove(consumerId, consumerFuture);

pulsar-common/src/main/java/org/apache/pulsar/common/naming/Metadata.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,25 +25,24 @@
2525
*/
2626
public class Metadata {
2727

28-
private static final int MAX_METADATA_SIZE = 1024; // 1 Kb
29-
3028
private Metadata() {}
3129

32-
public static void validateMetadata(Map<String, String> metadata) throws IllegalArgumentException {
30+
public static void validateMetadata(Map<String, String> metadata,
31+
int maxConsumerMetadataSize) throws IllegalArgumentException {
3332
if (metadata == null) {
3433
return;
3534
}
3635

3736
int size = 0;
3837
for (Map.Entry<String, String> e : metadata.entrySet()) {
3938
size += (e.getKey().length() + e.getValue().length());
40-
if (size > MAX_METADATA_SIZE) {
41-
throw new IllegalArgumentException(getErrorMessage());
39+
if (size > maxConsumerMetadataSize) {
40+
throw new IllegalArgumentException(getErrorMessage(maxConsumerMetadataSize));
4241
}
4342
}
4443
}
4544

46-
private static String getErrorMessage() {
47-
return "metadata has a max size of 1 Kb";
45+
private static String getErrorMessage(int maxConsumerMetadataSize) {
46+
return "metadata has a max size of " + maxConsumerMetadataSize + " bytes";
4847
}
4948
}

pulsar-common/src/test/java/org/apache/pulsar/common/naming/MetadataTests.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,46 +32,49 @@ public void testValidMetadata() {
3232
Map<String, String> metadata = new HashMap<>();
3333

3434
metadata.put(generateKey(1, 512), generateKey(1, 512));
35-
Assert.assertTrue(validateMetadata(metadata));
35+
Assert.assertTrue(validateMetadata(metadata, 1024));
3636

3737
metadata.clear();
3838
metadata.put(generateKey(1, 512), generateKey(1, 511));
39-
Assert.assertTrue(validateMetadata(metadata));
39+
Assert.assertTrue(validateMetadata(metadata, 1024));
4040

4141
metadata.clear();
4242
metadata.put(generateKey(1, 256), generateKey(1, 256));
4343
metadata.put(generateKey(2, 256), generateKey(2, 256));
44-
Assert.assertTrue(validateMetadata(metadata));
44+
Assert.assertTrue(validateMetadata(metadata, 1024));
4545

4646
metadata.clear();
4747
metadata.put(generateKey(1, 256), generateKey(1, 256));
4848
metadata.put(generateKey(2, 256), generateKey(2, 255));
49-
Assert.assertTrue(validateMetadata(metadata));
49+
Assert.assertTrue(validateMetadata(metadata, 1024));
5050
}
5151

5252
@Test
5353
public void testInvalidMetadata() {
5454
Map<String, String> metadata = new HashMap<>();
5555

5656
metadata.put(generateKey(1, 512), generateKey(1, 513));
57-
Assert.assertFalse(validateMetadata(metadata));
57+
Assert.assertFalse(validateMetadata(metadata, 1024));
5858

5959
metadata.clear();
6060
metadata.put(generateKey(1, 256), generateKey(1, 256));
6161
metadata.put(generateKey(2, 256), generateKey(2, 257));
62-
Assert.assertFalse(validateMetadata(metadata));
62+
Assert.assertFalse(validateMetadata(metadata, 1024));
6363

6464

6565
metadata.clear();
6666
metadata.put(generateKey(1, 256), generateKey(1, 256));
6767
metadata.put(generateKey(2, 256), generateKey(2, 256));
6868
metadata.put(generateKey(3, 1), generateKey(3, 1));
69-
Assert.assertFalse(validateMetadata(metadata));
69+
Assert.assertFalse(validateMetadata(metadata, 1024));
70+
71+
// set bigger maxConsumerMetadataSize, now validation should pass
72+
Assert.assertTrue(validateMetadata(metadata, 1024 * 10));
7073
}
7174

72-
private static boolean validateMetadata(Map<String, String> metadata) {
75+
private static boolean validateMetadata(Map<String, String> metadata, int maxSize) {
7376
try {
74-
Metadata.validateMetadata(metadata);
77+
Metadata.validateMetadata(metadata, maxSize);
7578
return true;
7679
} catch (IllegalArgumentException ignore) {
7780
return false;

0 commit comments

Comments
 (0)