Skip to content

Commit 75b2bef

Browse files
Technoboy-srinath-ctds
authored andcommitted
[improve][client] Add startTimestamp and endTimestamp for consuming message in client cli (apache#24521)
(cherry picked from commit e627c2c) (cherry picked from commit faa4c75)
1 parent 9c0274a commit 75b2bef

File tree

1 file changed

+24
-1
lines changed
  • pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli

1 file changed

+24
-1
lines changed

pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,12 @@ public class CmdConsume extends AbstractCmdConsume {
118118
@Option(names = { "-mp", "--print-metadata" }, description = "Message metadata")
119119
private boolean printMetadata = false;
120120

121+
@Option(names = { "-stp", "--start-timestamp" }, description = "Start timestamp for consuming messages")
122+
private long startTimestamp = 0L;
123+
124+
@Option(names = { "-etp", "--end-timestamp" }, description = "End timestamp for consuming messages")
125+
private long endTimestamp = Long.MAX_VALUE;
126+
121127
public CmdConsume() {
122128
// Do nothing
123129
super();
@@ -139,6 +145,18 @@ public int run() throws IOException {
139145
throw new CommandLine.ParameterException(commandSpec.commandLine(),
140146
"Number of messages should be zero or positive.");
141147
}
148+
if (this.startTimestamp < 0) {
149+
throw new CommandLine.ParameterException(commandSpec.commandLine(),
150+
"start timestamp should be positive.");
151+
}
152+
if (this.endTimestamp < 0) {
153+
throw new CommandLine.ParameterException(commandSpec.commandLine(),
154+
"end timestamp should be positive.");
155+
}
156+
if (this.endTimestamp < startTimestamp) {
157+
throw new CommandLine.ParameterException(commandSpec.commandLine(),
158+
"end timestamp should larger than start timestamp.");
159+
}
142160

143161
if (this.serviceURL.startsWith("ws")) {
144162
return consumeFromWebSocket(topic);
@@ -188,17 +206,22 @@ private int consume(String topic) {
188206
}
189207

190208
try (Consumer<?> consumer = builder.subscribe();) {
209+
if (startTimestamp > 0L) {
210+
consumer.seek(startTimestamp);
211+
}
191212
RateLimiter limiter = (this.consumeRate > 0) ? RateLimiter.create(this.consumeRate) : null;
192213
while (this.numMessagesToConsume == 0 || numMessagesConsumed < this.numMessagesToConsume) {
193214
if (limiter != null) {
194215
limiter.acquire();
195216
}
196-
197217
Message<?> msg = consumer.receive(5, TimeUnit.SECONDS);
198218
if (msg == null) {
199219
LOG.debug("No message to consume after waiting for 5 seconds.");
200220
} else {
201221
try {
222+
if (msg.getPublishTime() > endTimestamp) {
223+
break;
224+
}
202225
numMessagesConsumed += 1;
203226
if (!hideContent) {
204227
System.out.println(MESSAGE_BOUNDARY);

0 commit comments

Comments
 (0)