Skip to content

Commit 40d0b05

Browse files
authored
Merge pull request #779 from rabbitmq/environment-store-offset
Add Environment#storeOffset(String,String,long)
2 parents 4b86707 + 4cf2829 commit 40d0b05

File tree

7 files changed

+320
-109
lines changed

7 files changed

+320
-109
lines changed

src/main/java/com/rabbitmq/stream/Environment.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,22 @@ static EnvironmentBuilder builder() {
8383
*/
8484
StreamStats queryStreamStats(String stream);
8585

86+
/**
87+
* Store the offset for a given reference on the given stream.
88+
*
89+
* <p>This method is useful to store a given offset before a consumer is created.
90+
*
91+
* <p>Prefer the {@link Consumer#store(long)} or {@link MessageHandler.Context#storeOffset()}
92+
* methods to store offsets while consuming messages.
93+
*
94+
* @see Consumer#store(long)
95+
* @see MessageHandler.Context#storeOffset()
96+
* @param reference the reference to store the offset for, e.g. a consumer name
97+
* @param stream the stream
98+
* @param offset the offset to store
99+
*/
100+
void storeOffset(String reference, String stream, long offset);
101+
86102
/**
87103
* Return whether a stream exists or not.
88104
*
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
// Copyright (c) 2025 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
5+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
6+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
package com.rabbitmq.stream.impl;
16+
17+
import static com.rabbitmq.stream.BackOffDelayPolicy.fixedWithInitialDelay;
18+
import static com.rabbitmq.stream.impl.AsyncRetry.asyncRetry;
19+
import static java.lang.String.format;
20+
import static java.time.Duration.ofMillis;
21+
22+
import com.rabbitmq.stream.Constants;
23+
import com.rabbitmq.stream.NoOffsetException;
24+
import com.rabbitmq.stream.StreamException;
25+
import java.util.concurrent.CompletableFuture;
26+
import java.util.concurrent.ExecutionException;
27+
import java.util.concurrent.ScheduledExecutorService;
28+
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.TimeoutException;
30+
import java.util.function.LongSupplier;
31+
import java.util.function.Supplier;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
34+
35+
class OffsetTrackingUtils {
36+
37+
private static final Logger LOGGER = LoggerFactory.getLogger(OffsetTrackingUtils.class);
38+
39+
private OffsetTrackingUtils() {}
40+
41+
static long storedOffset(Supplier<Client> clientSupplier, String name, String stream) {
42+
// the client can be null, so we catch any exception
43+
Client.QueryOffsetResponse response;
44+
try {
45+
response = clientSupplier.get().queryOffset(name, stream);
46+
} catch (Exception e) {
47+
throw new IllegalStateException(
48+
format(
49+
"Not possible to query offset for name %s on stream %s for now: %s",
50+
name, stream, e.getMessage()),
51+
e);
52+
}
53+
if (response.isOk()) {
54+
return response.getOffset();
55+
} else if (response.getResponseCode() == Constants.RESPONSE_CODE_NO_OFFSET) {
56+
throw new NoOffsetException(
57+
format(
58+
"No offset stored for name %s on stream %s (%s)",
59+
name, stream, Utils.formatConstant(response.getResponseCode())));
60+
} else {
61+
throw new StreamException(
62+
format(
63+
"QueryOffset for name %s on stream %s returned an error (%s)",
64+
name, stream, Utils.formatConstant(response.getResponseCode())),
65+
response.getResponseCode());
66+
}
67+
}
68+
69+
static void waitForOffsetToBeStored(
70+
String caller,
71+
ScheduledExecutorService scheduledExecutorService,
72+
LongSupplier offsetSupplier,
73+
String name,
74+
String stream,
75+
long expectedStoredOffset) {
76+
String reference = String.format("{stream=%s/name=%s}", stream, name);
77+
CompletableFuture<Boolean> storedTask =
78+
asyncRetry(
79+
() -> {
80+
try {
81+
long lastStoredOffset = offsetSupplier.getAsLong();
82+
boolean stored = lastStoredOffset == expectedStoredOffset;
83+
LOGGER.debug(
84+
"Last stored offset from {} on {} is {}, expecting {}",
85+
caller,
86+
reference,
87+
lastStoredOffset,
88+
expectedStoredOffset);
89+
if (!stored) {
90+
throw new IllegalStateException();
91+
} else {
92+
return true;
93+
}
94+
} catch (StreamException e) {
95+
if (e.getCode() == Constants.RESPONSE_CODE_NO_OFFSET) {
96+
LOGGER.debug(
97+
"No stored offset for {} on {}, expecting {}",
98+
caller,
99+
reference,
100+
expectedStoredOffset);
101+
throw new IllegalStateException();
102+
} else {
103+
throw e;
104+
}
105+
}
106+
})
107+
.description(
108+
"Last stored offset for %s on %s must be %d",
109+
caller, reference, expectedStoredOffset)
110+
.delayPolicy(fixedWithInitialDelay(ofMillis(200), ofMillis(200)))
111+
.retry(exception -> exception instanceof IllegalStateException)
112+
.scheduler(scheduledExecutorService)
113+
.build();
114+
115+
try {
116+
storedTask.get(10, TimeUnit.SECONDS);
117+
LOGGER.debug("Offset {} stored ({}, {})", expectedStoredOffset, caller, reference);
118+
} catch (InterruptedException e) {
119+
Thread.currentThread().interrupt();
120+
} catch (ExecutionException | TimeoutException e) {
121+
LOGGER.warn("Error while checking offset has been stored", e);
122+
storedTask.cancel(true);
123+
}
124+
}
125+
}

src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java

Lines changed: 8 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,9 @@
1818
import static com.rabbitmq.stream.impl.AsyncRetry.asyncRetry;
1919
import static com.rabbitmq.stream.impl.Utils.offsetBefore;
2020
import static java.lang.String.format;
21-
import static java.time.Duration.ofMillis;
2221

2322
import com.rabbitmq.stream.*;
2423
import com.rabbitmq.stream.MessageHandler.Context;
25-
import com.rabbitmq.stream.impl.Client.QueryOffsetResponse;
2624
import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration;
2725
import com.rabbitmq.stream.impl.StreamEnvironment.LocatorNotAvailableException;
2826
import com.rabbitmq.stream.impl.StreamEnvironment.TrackingConsumerRegistration;
@@ -329,53 +327,13 @@ static long getStoredOffsetSafely(StreamConsumer consumer, StreamEnvironment env
329327
}
330328

331329
void waitForOffsetToBeStored(long expectedStoredOffset) {
332-
CompletableFuture<Boolean> storedTask =
333-
asyncRetry(
334-
() -> {
335-
try {
336-
long lastStoredOffset = storedOffset();
337-
boolean stored = lastStoredOffset == expectedStoredOffset;
338-
LOGGER.debug(
339-
"Last stored offset from consumer {} on {} is {}, expecting {}",
340-
this.id,
341-
this.stream,
342-
lastStoredOffset,
343-
expectedStoredOffset);
344-
if (!stored) {
345-
throw new IllegalStateException();
346-
} else {
347-
return true;
348-
}
349-
} catch (StreamException e) {
350-
if (e.getCode() == Constants.RESPONSE_CODE_NO_OFFSET) {
351-
LOGGER.debug(
352-
"No stored offset for consumer {} on {}, expecting {}",
353-
this.id,
354-
this.stream,
355-
expectedStoredOffset);
356-
throw new IllegalStateException();
357-
} else {
358-
throw e;
359-
}
360-
}
361-
})
362-
.description(
363-
"Last stored offset for consumer %s on stream %s must be %d",
364-
this.name, this.stream, expectedStoredOffset)
365-
.delayPolicy(fixedWithInitialDelay(ofMillis(200), ofMillis(200)))
366-
.retry(exception -> exception instanceof IllegalStateException)
367-
.scheduler(environment.scheduledExecutorService())
368-
.build();
369-
370-
try {
371-
storedTask.get(10, TimeUnit.SECONDS);
372-
LOGGER.debug(
373-
"Offset {} stored (consumer {}, stream {})", expectedStoredOffset, this.id, this.stream);
374-
} catch (InterruptedException e) {
375-
Thread.currentThread().interrupt();
376-
} catch (ExecutionException | TimeoutException e) {
377-
LOGGER.warn("Error while checking offset has been stored", e);
378-
}
330+
OffsetTrackingUtils.waitForOffsetToBeStored(
331+
"consumer " + this.id,
332+
this.environment.scheduledExecutorService(),
333+
this::storedOffset,
334+
this.name,
335+
this.stream,
336+
expectedStoredOffset);
379337
}
380338

381339
void start() {
@@ -563,31 +521,7 @@ void running() {
563521
long storedOffset(Supplier<Client> clientSupplier) {
564522
checkNotClosed();
565523
if (canTrack()) {
566-
// the client can be null by now, so we catch any exception
567-
QueryOffsetResponse response;
568-
try {
569-
response = clientSupplier.get().queryOffset(this.name, this.stream);
570-
} catch (Exception e) {
571-
throw new IllegalStateException(
572-
format(
573-
"Not possible to query offset for consumer %s on stream %s for now: %s",
574-
this.name, this.stream, e.getMessage()),
575-
e);
576-
}
577-
if (response.isOk()) {
578-
return response.getOffset();
579-
} else if (response.getResponseCode() == Constants.RESPONSE_CODE_NO_OFFSET) {
580-
throw new NoOffsetException(
581-
format(
582-
"No offset stored for consumer %s on stream %s (%s)",
583-
this.name, this.stream, Utils.formatConstant(response.getResponseCode())));
584-
} else {
585-
throw new StreamException(
586-
format(
587-
"QueryOffset for consumer %s on stream %s returned an error (%s)",
588-
this.name, this.stream, Utils.formatConstant(response.getResponseCode())),
589-
response.getResponseCode());
590-
}
524+
return OffsetTrackingUtils.storedOffset(clientSupplier, this.name, this.stream);
591525
} else if (this.name == null) {
592526
throw new UnsupportedOperationException(
593527
"Not possible to query stored offset for a consumer without a name");

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,29 @@ public StreamStats queryStreamStats(String stream) {
556556
}
557557
}
558558

559+
@Override
560+
public void storeOffset(String reference, String stream, long offset) {
561+
checkNotClosed();
562+
this.maybeInitializeLocator();
563+
locatorOperation(
564+
Utils.namedFunction(
565+
l -> {
566+
l.storeOffset(reference, stream, offset);
567+
return null;
568+
},
569+
"Store offset %d for stream '%s' with reference '%s'",
570+
offset,
571+
stream,
572+
reference));
573+
OffsetTrackingUtils.waitForOffsetToBeStored(
574+
"env-store-offset",
575+
this.scheduledExecutorService,
576+
() -> OffsetTrackingUtils.storedOffset(() -> locator().client(), reference, stream),
577+
reference,
578+
stream,
579+
offset);
580+
}
581+
559582
@Override
560583
public boolean streamExists(String stream) {
561584
checkNotClosed();

src/test/java/com/rabbitmq/stream/impl/Assertions.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,58 @@
1616

1717
import static org.assertj.core.api.Assertions.fail;
1818

19+
import com.rabbitmq.stream.Constants;
1920
import java.time.Duration;
2021
import org.assertj.core.api.AbstractObjectAssert;
2122

2223
final class Assertions {
2324

2425
private Assertions() {}
2526

27+
static ResponseAssert assertThat(Client.Response response) {
28+
return new ResponseAssert(response);
29+
}
30+
2631
static SyncAssert assertThat(TestUtils.Sync sync) {
2732
return new SyncAssert(sync);
2833
}
2934

35+
static class ResponseAssert extends AbstractObjectAssert<ResponseAssert, Client.Response> {
36+
37+
public ResponseAssert(Client.Response response) {
38+
super(response, ResponseAssert.class);
39+
}
40+
41+
ResponseAssert isOk() {
42+
if (!actual.isOk()) {
43+
fail(
44+
"Response should be successful but was not, response code is: %s",
45+
Utils.formatConstant(actual.getResponseCode()));
46+
}
47+
return this;
48+
}
49+
50+
ResponseAssert isNotOk() {
51+
if (actual.isOk()) {
52+
fail("Response should not be successful but was, response code is: %s", actual);
53+
}
54+
return this;
55+
}
56+
57+
ResponseAssert hasCode(short responseCode) {
58+
if (actual.getResponseCode() != responseCode) {
59+
fail(
60+
"Response code should be %s but was %s",
61+
Utils.formatConstant(responseCode), Utils.formatConstant(actual.getResponseCode()));
62+
}
63+
return this;
64+
}
65+
66+
ResponseAssert hasCodeNoOffset() {
67+
return hasCode(Constants.RESPONSE_CODE_NO_OFFSET);
68+
}
69+
}
70+
3071
static class SyncAssert extends AbstractObjectAssert<SyncAssert, TestUtils.Sync> {
3172

3273
private SyncAssert(TestUtils.Sync sync) {

0 commit comments

Comments
 (0)