Skip to content

Commit 5dab966

Browse files
BewareMyPowerwalkinggo
authored andcommitted
[fix][proxy] Fix proxy OOM by replacing TopicName with a simple conversion method (apache#24465)
1 parent 23f6774 commit 5dab966

File tree

5 files changed

+133
-15
lines changed

5 files changed

+133
-15
lines changed

pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,17 @@ public interface MessageProcessor {
5555
void process(RawMessage message) throws IOException;
5656
}
5757

58+
@Deprecated
59+
public static void parseMessage(TopicName topicName, long ledgerId, long entryId, ByteBuf headersAndPayload,
60+
MessageProcessor processor, int maxMessageSize) throws IOException {
61+
parseMessage(topicName.toString(), ledgerId, entryId, headersAndPayload, processor, maxMessageSize);
62+
}
63+
5864
/**
5965
* Parse a raw Pulsar entry payload and extract all the individual message that may be included in the batch. The
6066
* provided {@link MessageProcessor} will be invoked for each individual message.
6167
*/
62-
public static void parseMessage(TopicName topicName, long ledgerId, long entryId, ByteBuf headersAndPayload,
68+
public static void parseMessage(String topicName, long ledgerId, long entryId, ByteBuf headersAndPayload,
6369
MessageProcessor processor, int maxMessageSize) throws IOException {
6470
ByteBuf payload = headersAndPayload;
6571
ByteBuf uncompressedPayload = null;
@@ -117,7 +123,7 @@ public static void parseMessage(TopicName topicName, long ledgerId, long entryId
117123
}
118124
}
119125

120-
public static boolean verifyChecksum(TopicName topic, ByteBuf headersAndPayload, long ledgerId, long entryId) {
126+
public static boolean verifyChecksum(String topic, ByteBuf headersAndPayload, long ledgerId, long entryId) {
121127
if (hasChecksum(headersAndPayload)) {
122128
int checksum = readChecksum(headersAndPayload);
123129
int computedChecksum = computeChecksum(headersAndPayload);
@@ -132,7 +138,14 @@ public static boolean verifyChecksum(TopicName topic, ByteBuf headersAndPayload,
132138
return true;
133139
}
134140

135-
public static ByteBuf uncompressPayloadIfNeeded(TopicName topic, MessageMetadata msgMetadata,
141+
@Deprecated
142+
public static ByteBuf uncompressPayloadIfNeeded(TopicName topicName, MessageMetadata msgMetadata,
143+
ByteBuf payload, long ledgerId, long entryId, int maxMessageSize) {
144+
return uncompressPayloadIfNeeded(topicName.toString(), msgMetadata, payload, ledgerId, entryId,
145+
maxMessageSize);
146+
}
147+
148+
public static ByteBuf uncompressPayloadIfNeeded(String topic, MessageMetadata msgMetadata,
136149
ByteBuf payload, long ledgerId, long entryId, int maxMessageSize) {
137150
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(msgMetadata.getCompression());
138151
int uncompressedSize = msgMetadata.getUncompressedSize();

pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.google.common.base.Splitter;
2222
import com.google.re2j.Pattern;
2323
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
24+
import java.util.ArrayList;
2425
import java.util.List;
2526
import java.util.Objects;
2627
import java.util.concurrent.ConcurrentHashMap;
@@ -442,4 +443,78 @@ public boolean includes(TopicName otherTopicName) {
442443
public boolean isV2() {
443444
return cluster == null;
444445
}
446+
447+
/**
448+
* Convert a topic name to a full topic name.
449+
* In Pulsar, a full topic name is "<domain>://<tenant>/<namespace>/<local-topic>" (v2) or
450+
* "<domain>://<tenant>/<cluster>/<namespace>/<local-topic>" (v1). However, for convenient, it's allowed for clients
451+
* to pass a short topic name with v2 format:
452+
* - "<local-topic>", which represents "persistent://public/default/<local-topic>"
453+
* - "<tenant>/<namespace>/<local-topic>, which represents "persistent://<tenant>/<namespace>/<local-topic>"
454+
*
455+
* @param topic the topic name from client
456+
* @return the full topic name.
457+
*/
458+
public static String toFullTopicName(String topic) {
459+
final int index = topic.indexOf("://");
460+
if (index >= 0) {
461+
TopicDomain.getEnum(topic.substring(0, index));
462+
final List<String> parts = splitBySlash(topic.substring(index + "://".length()), 4);
463+
if (parts.size() != 3 && parts.size() != 4) {
464+
throw new IllegalArgumentException(topic + " is invalid");
465+
}
466+
if (parts.size() == 3) {
467+
NamespaceName.validateNamespaceName(parts.get(0), parts.get(1));
468+
if (StringUtils.isBlank(parts.get(2))) {
469+
throw new IllegalArgumentException(topic + " has blank local topic");
470+
}
471+
} else {
472+
NamespaceName.validateNamespaceName(parts.get(0), parts.get(1), parts.get(2));
473+
if (StringUtils.isBlank(parts.get(3))) {
474+
throw new IllegalArgumentException(topic + " has blank local topic");
475+
}
476+
}
477+
return topic; // it's a valid full topic name
478+
} else {
479+
List<String> parts = splitBySlash(topic, 0);
480+
if (parts.size() != 1 && parts.size() != 3) {
481+
throw new IllegalArgumentException(topic + " is invalid");
482+
}
483+
if (parts.size() == 1) {
484+
if (StringUtils.isBlank(parts.get(0))) {
485+
throw new IllegalArgumentException(topic + " has blank local topic");
486+
}
487+
return "persistent://public/default/" + parts.get(0);
488+
} else {
489+
NamespaceName.validateNamespaceName(parts.get(0), parts.get(1));
490+
if (StringUtils.isBlank(parts.get(2))) {
491+
throw new IllegalArgumentException(topic + " has blank local topic");
492+
}
493+
return "persistent://" + topic;
494+
}
495+
}
496+
}
497+
498+
private static List<String> splitBySlash(String topic, int limit) {
499+
final List<String> tokens = new ArrayList<>(3);
500+
final int loopCount = (limit <= 0) ? Integer.MAX_VALUE : limit - 1;
501+
int beginIndex = 0;
502+
for (int i = 0; i < loopCount; i++) {
503+
final int endIndex = topic.indexOf('/', beginIndex);
504+
if (endIndex < 0) {
505+
tokens.add(topic.substring(beginIndex));
506+
return tokens;
507+
} else if (endIndex > beginIndex) {
508+
tokens.add(topic.substring(beginIndex, endIndex));
509+
} else {
510+
throw new IllegalArgumentException("Invalid topic name " + topic);
511+
}
512+
beginIndex = endIndex + 1;
513+
}
514+
if (beginIndex >= topic.length()) {
515+
throw new IllegalArgumentException("Invalid topic name " + topic);
516+
}
517+
tokens.add(topic.substring(beginIndex));
518+
return tokens;
519+
}
445520
}

pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.testng.Assert.assertFalse;
2323
import static org.testng.Assert.assertNotEquals;
2424
import static org.testng.Assert.assertNull;
25+
import static org.testng.Assert.assertThrows;
2526
import static org.testng.Assert.assertTrue;
2627
import static org.testng.Assert.fail;
2728
import org.apache.pulsar.common.util.Codec;
@@ -52,9 +53,8 @@ public void topic() {
5253

5354
assertEquals(TopicName.get("persistent://tenant/cluster/namespace/topic").toString(),
5455
"persistent://tenant/cluster/namespace/topic");
55-
56-
assertNotEquals(TopicName.get("persistent://tenant/cluster/namespace/topic"),
57-
"persistent://tenant/cluster/namespace/topic");
56+
assertEquals(TopicName.toFullTopicName("persistent://tenant/cluster/namespace/topic"),
57+
"persistent://tenant/cluster/namespace/topic");
5858

5959
assertEquals(TopicName.get("persistent://tenant/cluster/namespace/topic").getDomain(),
6060
TopicDomain.persistent);
@@ -103,62 +103,76 @@ public void topic() {
103103
} catch (IllegalArgumentException e) {
104104
// Ok
105105
}
106+
assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("://tenant.namespace:my-topic"));
106107

107108
try {
108109
TopicName.get("://tenant.namespace");
109110
fail("Should have raised exception");
110111
} catch (IllegalArgumentException e) {
111112
// Ok
112113
}
114+
assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("://tenant.namespace"));
113115

114116
try {
115117
TopicName.get("invalid://tenant/cluster/namespace/topic");
116118
fail("Should have raied exception");
117119
} catch (IllegalArgumentException e) {
118120
// Ok
119121
}
122+
assertThrows(IllegalArgumentException.class,
123+
() -> TopicName.toFullTopicName("invalid://tenant/cluster/namespace/topic"));
120124

121125
try {
122126
TopicName.get("tenant/cluster/namespace/topic");
123127
fail("Should have raised exception");
124128
} catch (IllegalArgumentException e) {
125129
// Ok
126130
}
131+
assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("tenant/cluster/namespace/topic"));
127132

128133
try {
129134
TopicName.get("persistent:///cluster/namespace/mydest-1");
130135
fail("Should have raised exception");
131136
} catch (IllegalArgumentException e) {
132137
// Ok
133138
}
139+
assertThrows(IllegalArgumentException.class,
140+
() -> TopicName.toFullTopicName("persistent:///cluster/namespace/mydest-1"));
134141

135142
try {
136143
TopicName.get("persistent://pulsar//namespace/mydest-1");
137144
fail("Should have raised exception");
138145
} catch (IllegalArgumentException e) {
139146
// Ok
140147
}
148+
assertThrows(IllegalArgumentException.class,
149+
() -> TopicName.toFullTopicName("persistent://pulsar//namespace/mydest-1"));
141150

142151
try {
143152
TopicName.get("persistent://pulsar/cluster//mydest-1");
144153
fail("Should have raised exception");
145154
} catch (IllegalArgumentException e) {
146155
// Ok
147156
}
157+
assertThrows(IllegalArgumentException.class,
158+
() -> TopicName.toFullTopicName("persistent://pulsar/cluster//mydest-1"));
148159

149160
try {
150161
TopicName.get("persistent://pulsar/cluster/namespace/");
151162
fail("Should have raised exception");
152163
} catch (IllegalArgumentException e) {
153164
// Ok
154165
}
166+
assertThrows(IllegalArgumentException.class,
167+
() -> TopicName.toFullTopicName("persistent://pulsar/cluster/namespace/"));
155168

156169
try {
157170
TopicName.get("://pulsar/cluster/namespace/");
158171
fail("Should have raised exception");
159172
} catch (IllegalArgumentException e) {
160173
// Ok
161174
}
175+
assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("://pulsar/cluster/namespace/"));
162176

163177
assertEquals(TopicName.get("persistent://tenant/cluster/namespace/topic")
164178
.getPersistenceNamingEncoding(), "tenant/cluster/namespace/persistent/topic");
@@ -169,20 +183,23 @@ public void topic() {
169183
} catch (IllegalArgumentException e) {
170184
// Ok
171185
}
186+
assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("://tenant.namespace"));
172187

173188
try {
174189
TopicName.get("://tenant/cluster/namespace");
175190
fail("Should have raied exception");
176191
} catch (IllegalArgumentException e) {
177192
// Ok
178193
}
194+
assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("://tenant//cluster/namespace"));
179195

180196
try {
181197
TopicName.get(" ");
182198
fail("Should have raised exception");
183199
} catch (IllegalArgumentException e) {
184200
// Ok
185201
}
202+
assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName(" "));
186203

187204
TopicName nameWithSlash = TopicName.get("persistent://tenant/cluster/namespace/ns-abc/table/1");
188205
assertEquals(nameWithSlash.getEncodedLocalName(), Codec.encode("ns-abc/table/1"));
@@ -344,4 +361,15 @@ public void testTwoKeyWordPartition(){
344361
assertNotEquals(tp2.toString(), tp1.toString());
345362
assertEquals(tp2.toString(), "persistent://tenant1/namespace1/tp1-partition-0-DLQ-partition-0");
346363
}
364+
365+
@Test
366+
public void testToFullTopicName() {
367+
// There is no constraint for local topic name
368+
assertEquals("persistent://public/default/tp???xx=", TopicName.toFullTopicName("tp???xx="));
369+
assertEquals("persistent://tenant/ns/tp???xx=", TopicName.toFullTopicName("tenant/ns/tp???xx="));
370+
assertEquals("persistent://tenant/ns/test", TopicName.toFullTopicName("persistent://tenant/ns/test"));
371+
assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("ns/topic"));
372+
// v1 format is not supported when the domain is not included
373+
assertThrows(IllegalArgumentException.class, () -> TopicName.toFullTopicName("tenant/cluster/ns/topic"));
374+
}
347375
}

pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ public void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata part
221221
**/
222222
private void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata partitionMetadata,
223223
long clientRequestId) {
224-
TopicName topicName = TopicName.get(partitionMetadata.getTopic());
224+
String topicName = TopicName.toFullTopicName(partitionMetadata.getTopic());
225225

226226
String serviceUrl = getBrokerServiceUrl(clientRequestId);
227227
if (serviceUrl == null) {
@@ -235,7 +235,7 @@ private void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata par
235235

236236
if (log.isDebugEnabled()) {
237237
log.debug("Getting connections to '{}' for Looking up topic '{}' with clientReq Id '{}'", addr,
238-
topicName.getPartitionedTopicName(), clientRequestId);
238+
topicName, clientRequestId);
239239
}
240240
proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
241241
// Connected to backend broker
@@ -245,7 +245,7 @@ private void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata par
245245
partitionMetadata.isMetadataAutoCreationEnabled());
246246
clientCnx.newLookup(command, requestId).whenComplete((r, t) -> {
247247
if (t != null) {
248-
log.warn("[{}] failed to get Partitioned metadata : {}", topicName.toString(),
248+
log.warn("[{}] failed to get Partitioned metadata : {}", topicName,
249249
t.getMessage(), t);
250250
PulsarClientException pce = PulsarClientException.unwrap(t);
251251
writeAndFlush(Commands.newLookupErrorResponse(clientCnx.revertClientExToErrorCode(pce),

pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ private void logging(Channel conn, BaseCommand.Type cmdtype, String info, List<R
101101
private final BaseCommand cmd = new BaseCommand();
102102

103103
public void channelRead(ChannelHandlerContext ctx, Object msg) {
104-
TopicName topicName;
104+
String key;
105+
String topicName;
105106
List<RawMessage> messages = new ArrayList<>();
106107
ByteBuf buffer = (ByteBuf) (msg);
107108

@@ -130,16 +131,16 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
130131
logging(ctx.channel(), cmd.getType(), "", null);
131132
break;
132133
}
133-
topicName = TopicName.get(ParserProxyHandler.producerHashMap.get(cmd.getSend().getProducerId() + ","
134-
+ ctx.channel().id()));
134+
topicName = TopicName.toFullTopicName(ParserProxyHandler.producerHashMap.get(
135+
cmd.getSend().getProducerId() + "," + ctx.channel().id()));
135136
MutableLong msgBytes = new MutableLong(0);
136137
MessageParser.parseMessage(topicName, -1L,
137138
-1L, buffer, (message) -> {
138139
messages.add(message);
139140
msgBytes.add(message.getData().readableBytes());
140141
}, maxMessageSize);
141142
// update topic stats
142-
TopicStats topicStats = this.service.getTopicStats().computeIfAbsent(topicName.toString(),
143+
TopicStats topicStats = this.service.getTopicStats().computeIfAbsent(topicName,
143144
topic -> new TopicStats());
144145
topicStats.getMsgInRate().recordMultipleEvents(messages.size(), msgBytes.longValue());
145146
logging(ctx.channel(), cmd.getType(), "", messages);
@@ -158,8 +159,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
158159
logging(ctx.channel(), cmd.getType(), "", null);
159160
break;
160161
}
161-
topicName = TopicName.get(ParserProxyHandler.consumerHashMap.get(cmd.getMessage().getConsumerId()
162-
+ "," + peerChannelId));
162+
topicName = TopicName.toFullTopicName(ParserProxyHandler.consumerHashMap.get(
163+
cmd.getMessage().getConsumerId() + "," + peerChannelId));
164+
163165
msgBytes = new MutableLong(0);
164166
MessageParser.parseMessage(topicName, -1L,
165167
-1L, buffer, (message) -> {

0 commit comments

Comments
 (0)