-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Search before reporting
- I searched in the issues and found nothing similar.
Read release policy
- I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.
User environment
any released version
Issue Description
When using the REST API to produce messages to a single-partitioned topic (partition count = 1), the topics.produceOnPersistentTopic interface fails with "Unable to add schema" error due to incorrect topic type detection.
Expected Behavior
The REST API should successfully produce messages to single-partitioned topics, just like it works for non-partitioned topics and multi-partitioned topics.
Actual Behavior
The REST API fails to add schema to single-partitioned topics, preventing message production.
Root Cause
In TopicsBase, there's an incorrect condition for determining partitioned topics:
if (!topicName.isPartitioned() && metadata.partitions > 1) {This condition incorrectly treats single-partitioned topics (partition count = 1) as non-partitioned topics. Because metadata.partitions > 1 is also false for single-partitioned topics.
This leads to incorrect topic type detection, causing the schema addition logic to fail.
Error messages
org.apache.pulsar.broker.service.schema.exceptions.SchemaException: Unable to add schema SchemaData(type=STRING, isDeleted=false, timestamp=1750682252591, user=Rest Producer, data=[], props={__charset=UTF-8}) to topic persistent://tenant/namespace/topic
Reproducing the issue
-
Create a partitioned topic with 1 partition:
admin.topics().createPartitionedTopic("persistent://tenant/namespace/topic", 1);
-
Try to produce messages using REST API:
ProducerMessages producerMessages = new ProducerMessages(); producerMessages.setKeySchema(ObjectMapperFactory.getMapper().getObjectMapper(). writeValueAsString(StringSchema.utf8().getSchemaInfo())); producerMessages.setValueSchema(ObjectMapperFactory.getMapper().getObjectMapper(). writeValueAsString(StringSchema.utf8().getSchemaInfo())); // ... set messages topics.produceOnPersistentTopic(asyncResponse, tenant, namespace, topicName, false, producerMessages);
-
The operation fails with:
org.apache.pulsar.broker.service.schema.exceptions.SchemaException: Unable to add schema SchemaData(type=STRING, isDeleted=false, timestamp=1750682252591, user=Rest Producer, data=[], props={__charset=UTF-8}) to topic persistent://tenant/namespace/topic
pulsar/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
Line 170 in 25f57ff
| public void testProduceToPartitionedTopic() throws Exception { |
@DataProvider(name = "partitionNumbers")
public Object[][] partitionNumbers() {
return new Object[][] {
// in current version, it always failed
{1},
//in current version, it will success
{5},
};
}
@Test(dataProvider = "partitionNumbers")
public void testProduceToPartitionedTopic(int numPartitions) throws Exception {
admin.topics().createPartitionedTopic("persistent://" + testTenant + "/" + testNamespace
+ "/" + testTopicName + "-p", numPartitions);
AsyncResponse asyncResponse = mock(AsyncResponse.class);
Schema<String> schema = StringSchema.utf8();
ProducerMessages producerMessages = new ProducerMessages();
producerMessages.setKeySchema(ObjectMapperFactory.getMapper().getObjectMapper().
writeValueAsString(schema.getSchemaInfo()));
producerMessages.setValueSchema(ObjectMapperFactory.getMapper().getObjectMapper().
writeValueAsString(schema.getSchemaInfo()));
String message = "[" +
"{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1}," +
"{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2}," +
"{\"key\":\"my-key\",\"payload\":\"RestProducer:3\",\"eventTime\":1603045262772,\"sequenceId\":3}," +
"{\"key\":\"my-key\",\"payload\":\"RestProducer:4\",\"eventTime\":1603045262772,\"sequenceId\":4}," +
"{\"key\":\"my-key\",\"payload\":\"RestProducer:5\",\"eventTime\":1603045262772,\"sequenceId\":5}," +
"{\"key\":\"my-key\",\"payload\":\"RestProducer:6\",\"eventTime\":1603045262772,\"sequenceId\":6}," +
"{\"key\":\"my-key\",\"payload\":\"RestProducer:7\",\"eventTime\":1603045262772,\"sequenceId\":7}," +
"{\"key\":\"my-key\",\"payload\":\"RestProducer:8\",\"eventTime\":1603045262772,\"sequenceId\":8}," +
"{\"key\":\"my-key\",\"payload\":\"RestProducer:9\",\"eventTime\":1603045262772,\"sequenceId\":9}," +
"{\"key\":\"my-key\",\"payload\":\"RestProducer:10\",\"eventTime\":1603045262772,\"sequenceId\":10}]";
producerMessages.setMessages(createMessages(message));
topics.produceOnPersistentTopic(asyncResponse, testTenant, testNamespace, testTopicName + "-p", false, producerMessages);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(asyncResponse, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.OK.getStatusCode());
Object responseEntity = responseCaptor.getValue().getEntity();
Assert.assertTrue(responseEntity instanceof ProducerAcks);
ProducerAcks response = (ProducerAcks) responseEntity;
Assert.assertEquals(response.getMessagePublishResults().size(), 10);
Assert.assertEquals(response.getSchemaVersion(), 0);
int[] messagePerPartition = new int[5];
for (int index = 0; index < response.getMessagePublishResults().size(); index++) {
messagePerPartition[Integer.parseInt(response.getMessagePublishResults().get(index)
.getMessageId().split(":")[2])]++;
Assert.assertEquals(response.getMessagePublishResults().get(index).getErrorCode(), 0);
Assert.assertTrue(response.getMessagePublishResults().get(index).getMessageId().length() > 0);
}
for (int index = 0; index < messagePerPartition.length; index++) {
// We publish to each partition in round robin mode so each partition should get at most 2 message.
Assert.assertTrue(messagePerPartition[index] <= 10 / numPartitions);
}
}Additional information
No response
Are you willing to submit a PR?
- I'm willing to submit a PR!