Skip to content

Commit 5f3d4cb

Browse files
committed
PIP-254: Support configuring client version
### Motivation apache#19705 ### Modifications - Add the `ClientBuilderImpl#description` method to add the description to the original client version string that is set in `CommandConnect` and `CommandAuthResponse`. - Add `testClientVersion` to cover these two cases.
1 parent 42a6969 commit 5f3d4cb

File tree

5 files changed

+110
-10
lines changed

5 files changed

+110
-10
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,26 @@
2626
import java.util.Map;
2727
import java.util.Set;
2828
import java.util.concurrent.TimeUnit;
29+
import java.util.stream.Collectors;
2930
import javax.naming.AuthenticationException;
3031
import javax.net.ssl.SSLSession;
32+
import org.apache.pulsar.PulsarVersion;
3133
import org.apache.pulsar.broker.ServiceConfiguration;
3234
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
3335
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
3436
import org.apache.pulsar.broker.authentication.AuthenticationState;
37+
import org.apache.pulsar.client.impl.ClientBuilderImpl;
3538
import org.apache.pulsar.common.api.AuthData;
39+
import org.apache.pulsar.common.policies.data.PublisherStats;
40+
import org.apache.pulsar.common.policies.data.TopicStats;
3641
import org.slf4j.Logger;
3742
import org.slf4j.LoggerFactory;
38-
import org.testng.annotations.AfterMethod;
39-
import org.testng.annotations.BeforeMethod;
43+
import org.testng.annotations.AfterClass;
44+
import org.testng.annotations.BeforeClass;
4045
import org.testng.annotations.Test;
4146

4247
import static java.nio.charset.StandardCharsets.UTF_8;
48+
import static org.testng.Assert.assertEquals;
4349

4450
/**
4551
* Test Mutual Authentication.
@@ -182,7 +188,7 @@ public AuthenticationState newAuthState(AuthData authData,
182188
}
183189
}
184190

185-
@BeforeMethod(alwaysRun = true)
191+
@BeforeClass(alwaysRun = true)
186192
@Override
187193
protected void setup() throws Exception {
188194
mutualAuth = new MutualAuthentication();
@@ -205,7 +211,7 @@ protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
205211
clientBuilder.authentication(mutualAuth);
206212
}
207213

208-
@AfterMethod(alwaysRun = true)
214+
@AfterClass(alwaysRun = true)
209215
@Override
210216
protected void cleanup() throws Exception {
211217
internalCleanup();
@@ -214,12 +220,13 @@ protected void cleanup() throws Exception {
214220
@Test
215221
public void testAuthentication() throws Exception {
216222
log.info("-- Starting {} test --", methodName);
223+
String topic = "persistent://my-property/my-ns/test-authentication";
217224

218-
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic1")
225+
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
219226
.subscriptionName("my-subscriber-name")
220227
.subscribe();
221228
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
222-
.topic("persistent://my-property/my-ns/my-topic1")
229+
.topic(topic)
223230
.create();
224231

225232
for (int i = 0; i < 10; i++) {
@@ -239,4 +246,33 @@ public void testAuthentication() throws Exception {
239246

240247
log.info("-- Exiting {} test --", methodName);
241248
}
249+
250+
@Test
251+
public void testClientVersion() throws Exception {
252+
String defaultClientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion();
253+
String topic = "persistent://my-property/my-ns/test-client-version";
254+
255+
Producer<byte[]> producer1 = pulsarClient.newProducer()
256+
.topic(topic)
257+
.create();
258+
TopicStats stats = admin.topics().getStats(topic);
259+
assertEquals(stats.getPublishers().size(), 1);
260+
assertEquals(stats.getPublishers().get(0).getClientVersion(), defaultClientVersion);
261+
262+
PulsarClient client = ((ClientBuilderImpl) PulsarClient.builder())
263+
.description("my-java-client")
264+
.serviceUrl(lookupUrl.toString())
265+
.authentication(mutualAuth)
266+
.build();
267+
Producer<byte[]> producer2 = client.newProducer().topic(topic).create();
268+
stats = admin.topics().getStats(topic);
269+
assertEquals(stats.getPublishers().size(), 2);
270+
271+
assertEquals(stats.getPublishers().stream().map(PublisherStats::getClientVersion).collect(Collectors.toSet()),
272+
Sets.newHashSet(defaultClientVersion, defaultClientVersion + "my-java-client"));
273+
274+
producer1.close();
275+
producer2.close();
276+
client.close();
277+
}
242278
}

pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,12 @@
8383
import org.apache.commons.lang3.RandomUtils;
8484
import org.apache.commons.lang3.reflect.FieldUtils;
8585
import org.apache.commons.lang3.tuple.Pair;
86+
import org.apache.pulsar.PulsarVersion;
8687
import org.apache.pulsar.broker.PulsarService;
8788
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
8889
import org.apache.pulsar.client.admin.PulsarAdminException;
8990
import org.apache.pulsar.client.api.schema.GenericRecord;
91+
import org.apache.pulsar.client.impl.ClientBuilderImpl;
9092
import org.apache.pulsar.client.impl.ClientCnx;
9193
import org.apache.pulsar.client.impl.ConsumerBase;
9294
import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -105,6 +107,8 @@
105107
import org.apache.pulsar.common.compression.CompressionCodec;
106108
import org.apache.pulsar.common.compression.CompressionCodecProvider;
107109
import org.apache.pulsar.common.naming.TopicName;
110+
import org.apache.pulsar.common.policies.data.PublisherStats;
111+
import org.apache.pulsar.common.policies.data.TopicStats;
108112
import org.apache.pulsar.common.protocol.Commands;
109113
import org.apache.pulsar.common.schema.SchemaType;
110114
import org.apache.pulsar.common.util.FutureUtil;
@@ -4581,4 +4585,32 @@ public void testSendMsgGreaterThanBatchingMaxBytes() throws Exception {
45814585
// sendAsync should complete in time
45824586
assertNotNull(producer.sendAsync(msg).get(timeoutSec, TimeUnit.SECONDS));
45834587
}
4584-
}
4588+
4589+
@Test
4590+
public void testClientVersion() throws Exception {
4591+
String defaultClientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion();
4592+
String topic = "persistent://my-property/my-ns/test-client-version";
4593+
4594+
Producer<byte[]> producer1 = pulsarClient.newProducer()
4595+
.topic(topic)
4596+
.create();
4597+
TopicStats stats = admin.topics().getStats(topic);
4598+
assertEquals(stats.getPublishers().size(), 1);
4599+
assertEquals(stats.getPublishers().get(0).getClientVersion(), defaultClientVersion);
4600+
4601+
PulsarClient client = ((ClientBuilderImpl) PulsarClient.builder())
4602+
.description("my-java-client")
4603+
.serviceUrl(lookupUrl.toString())
4604+
.build();
4605+
Producer<byte[]> producer2 = client.newProducer().topic(topic).create();
4606+
stats = admin.topics().getStats(topic);
4607+
assertEquals(stats.getPublishers().size(), 2);
4608+
4609+
assertEquals(stats.getPublishers().stream().map(PublisherStats::getClientVersion).collect(Collectors.toSet()),
4610+
Sets.newHashSet(defaultClientVersion, defaultClientVersion + "my-java-client"));
4611+
4612+
producer1.close();
4613+
producer2.close();
4614+
client.close();
4615+
}
4616+
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,4 +410,27 @@ public ClientBuilder socks5ProxyPassword(String socks5ProxyPassword) {
410410
conf.setSocks5ProxyPassword(socks5ProxyPassword);
411411
return this;
412412
}
413+
414+
/**
415+
* Set the description.
416+
*
417+
* <p> By default, when the client connects to the broker, a version string like "Pulsar-Java-v<x.y.z>" will be
418+
* carried and saved by the broker. The client version string could be queried from the topic stats.
419+
*
420+
* <p> This method provides a way to add more description to a specific PulsarClient instance. If it's configured,
421+
* the description will be appended to the original client version string, with '-' as the separator.
422+
*
423+
* <p>For example, if the client version is 3.0.0, and the description is "forked", the final client version string
424+
* will be "Pulsar-Java-v3.0.0-forked".
425+
*
426+
* @param description the description of the current PulsarClient instance
427+
* @throws IllegalArgumentException if the length of description exceeds 64
428+
*/
429+
public ClientBuilder description(String description) {
430+
if (description.length() > 64) {
431+
throw new IllegalArgumentException("description should be at most 64 characters");
432+
}
433+
conf.setDescription(description);
434+
return this;
435+
}
413436
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,8 @@ public class ClientCnx extends PulsarHandler {
194194
@Getter
195195
private long lastDisconnectedTimestamp;
196196

197+
private final String clientVersion;
198+
197199
protected enum State {
198200
None, SentConnectFrame, Ready, Failed, Connecting
199201
}
@@ -252,6 +254,8 @@ public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, in
252254
this.state = State.None;
253255
this.protocolVersion = protocolVersion;
254256
this.idleState = new ClientCnxIdleState(this);
257+
this.clientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion()
258+
+ (conf.getDescription() == null ? "" : conf.getDescription());
255259
}
256260

257261
@Override
@@ -293,8 +297,7 @@ protected ByteBuf newConnectCommand() throws Exception {
293297
authenticationDataProvider = authentication.getAuthData(remoteHostName);
294298
AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
295299
return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
296-
String.format("Pulsar-Java-v%s", PulsarVersion.getVersion()), proxyToTargetBrokerAddress, null, null,
297-
null);
300+
clientVersion, proxyToTargetBrokerAddress, null, null, null);
298301
}
299302

300303
@Override
@@ -411,7 +414,7 @@ protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
411414
ByteBuf request = Commands.newAuthResponse(authentication.getAuthMethodName(),
412415
authData,
413416
this.protocolVersion,
414-
String.format("Pulsar-Java-v%s", PulsarVersion.getVersion()));
417+
clientVersion);
415418

416419
if (log.isDebugEnabled()) {
417420
log.debug("{} Mutual auth {}", ctx.channel(), authentication.getAuthMethodName());

pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,12 @@ public class ClientConfigurationData implements Serializable, Cloneable {
379379
@Secret
380380
private String socks5ProxyPassword;
381381

382+
@ApiModelProperty(
383+
name = "description",
384+
value = "The extra description of the client version."
385+
)
386+
private String description;
387+
382388
/**
383389
* Gets the authentication settings for the client.
384390
*

0 commit comments

Comments
 (0)