Skip to content

Commit ce1029c

Browse files
authored
Merge branch 'main' into dependabot/maven/org.apache.maven.plugins-maven-compiler-plugin-3.14.1
2 parents 8258317 + ea4221b commit ce1029c

22 files changed

+549
-95
lines changed

pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
<snappy-java.version>1.1.10.8</snappy-java.version>
6464
<junit.jupiter.version>5.13.4</junit.jupiter.version>
6565
<assertj.version>3.27.5</assertj.version>
66-
<mockito.version>5.19.0</mockito.version>
66+
<mockito.version>5.20.0</mockito.version>
6767
<amqp-client.version>5.26.0</amqp-client.version>
6868
<commons-lang3.version>3.18.0</commons-lang3.version>
6969
<commons-codec.version>1.19.0</commons-codec.version>
@@ -72,7 +72,7 @@
7272
<micrometer-tracing-test.version>1.5.4</micrometer-tracing-test.version>
7373
<bouncycastle.version>1.82</bouncycastle.version>
7474
<micrometer-docs-generator.version>1.0.4</micrometer-docs-generator.version>
75-
<netty-tcnative.version>2.0.73.Final</netty-tcnative.version>
75+
<netty-tcnative.version>2.0.74.Final</netty-tcnative.version>
7676
<maven.compiler.plugin.version>3.14.1</maven.compiler.plugin.version>
7777
<maven-surefire-plugin.version>3.5.4</maven-surefire-plugin.version>
7878
<maven-dependency-plugin.version>3.8.1</maven-dependency-plugin.version>
@@ -82,7 +82,7 @@
8282
<maven-resources-plugin.version>3.3.1</maven-resources-plugin.version>
8383
<maven-clean-plugin.version>3.5.0</maven-clean-plugin.version>
8484
<maven-source-plugin.version>3.3.1</maven-source-plugin.version>
85-
<maven-javadoc-plugin.version>3.11.3</maven-javadoc-plugin.version>
85+
<maven-javadoc-plugin.version>3.12.0</maven-javadoc-plugin.version>
8686
<maven.jar.plugin.version>3.4.2</maven.jar.plugin.version>
8787
<build-helper-maven-plugin.version>3.4.0</build-helper-maven-plugin.version>
8888
<maven-site-plugin.version>3.21.0</maven-site-plugin.version>

src/main/java/com/rabbitmq/stream/ConsumerBuilder.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,15 @@ public interface ConsumerBuilder {
126126
*/
127127
ConsumerBuilder subscriptionListener(SubscriptionListener subscriptionListener);
128128

129+
/**
130+
* Add {@link Resource.StateListener}s to the consumer.
131+
*
132+
* @param listeners listeners
133+
* @return this builder instance
134+
* @since 1.3.0
135+
*/
136+
ConsumerBuilder listeners(Resource.StateListener... listeners);
137+
129138
/**
130139
* Enable {@link ManualTrackingStrategy}.
131140
*
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright (c) 2025 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
5+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
6+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
package com.rabbitmq.stream;
16+
17+
/**
18+
* Exception thrown when a resource is not in an appropriate state.
19+
*
20+
* <p>An example is a connection that is initializing.
21+
*/
22+
public class InvalidStateException extends StreamException {
23+
public InvalidStateException(String format, Object... args) {
24+
super(format, args);
25+
}
26+
}

src/main/java/com/rabbitmq/stream/ProducerBuilder.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,15 @@ public interface ProducerBuilder {
192192
*/
193193
ProducerBuilder filterValue(Function<Message, String> filterValueExtractor);
194194

195+
/**
196+
* Add {@link Resource.StateListener}s to the producer.
197+
*
198+
* @param listeners listeners
199+
* @return this builder instance
200+
* @since 1.3.0
201+
*/
202+
ProducerBuilder listeners(Resource.StateListener... listeners);
203+
195204
/**
196205
* Create the {@link Producer} instance.
197206
*
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
// Copyright (c) 2024-2025 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
5+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
6+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
package com.rabbitmq.stream;
16+
17+
/**
18+
* Marker interface for {@link com.rabbitmq.stream.Resource}-like classes.
19+
*
20+
* <p>Instances of these classes have different states during their lifecycle: open, recovering,
21+
* closed, etc. Application can be interested in taking some actions for a given state (e.g.
22+
* stopping publishing when a {@link com.rabbitmq.stream.Producer} is recovering after a connection
23+
* problem and resuming publishing when it is open again).
24+
*
25+
* @see com.rabbitmq.stream.Producer
26+
* @see com.rabbitmq.stream.Consumer
27+
*/
28+
public interface Resource {
29+
30+
/**
31+
* Application listener for a {@link com.rabbitmq.stream.Resource}.
32+
*
33+
* <p>They are registered at creation time.
34+
*
35+
* @see
36+
* com.rabbitmq.stream.ProducerBuilder#listeners(com.rabbitmq.stream.Resource.StateListener...)
37+
* @see
38+
* com.rabbitmq.stream.ConsumerBuilder#listeners(com.rabbitmq.stream.Resource.StateListener...)
39+
*/
40+
@FunctionalInterface
41+
interface StateListener {
42+
43+
/**
44+
* Handle state change.
45+
*
46+
* @param context state change context
47+
*/
48+
void handle(Context context);
49+
}
50+
51+
/** Context of a resource state change. */
52+
interface Context {
53+
54+
/**
55+
* The resource instance.
56+
*
57+
* @return resource instance
58+
*/
59+
Resource resource();
60+
61+
/**
62+
* The previous state of the resource.
63+
*
64+
* @return previous state
65+
*/
66+
State previousState();
67+
68+
/**
69+
* The current (new) state of the resource.
70+
*
71+
* @return current state
72+
*/
73+
State currentState();
74+
}
75+
76+
/** Resource state. */
77+
enum State {
78+
/** The resource is currently opening. */
79+
OPENING,
80+
/** The resource is open and functional. */
81+
OPEN,
82+
/** The resource is recovering. */
83+
RECOVERING,
84+
/** The resource is closing. */
85+
CLOSING,
86+
/** The resource is closed. */
87+
CLOSED
88+
}
89+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
// Copyright (c) 2025 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
5+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
6+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
package com.rabbitmq.stream;
16+
17+
/** Exception thrown when a resource is not usable because it is closed. */
18+
public class ResourceClosedException extends InvalidStateException {
19+
public ResourceClosedException(String format, Object... args) {
20+
super(format, args);
21+
}
22+
}

src/main/java/com/rabbitmq/stream/StreamException.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ public class StreamException extends RuntimeException {
2525

2626
private final short code;
2727

28-
public StreamException(String message) {
29-
super(message);
28+
public StreamException(String format, Object... args) {
29+
super(String.format(format, args));
3030
this.code = -1;
3131
}
3232

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -506,15 +506,15 @@ SubscriptionState state() {
506506
return this.state.get();
507507
}
508508

509-
private void markConsuming() {
509+
private void markOpen() {
510510
if (this.consumer != null) {
511-
this.consumer.consuming();
511+
this.consumer.markOpen();
512512
}
513513
}
514514

515-
private void markNotConsuming() {
515+
private void markRecovering() {
516516
if (this.consumer != null) {
517-
this.consumer.notConsuming();
517+
this.consumer.markRecovering();
518518
}
519519
}
520520

@@ -712,7 +712,7 @@ private ClientSubscriptionsManager(
712712
"Subscription connection has {} consumer(s) over {} stream(s) to recover",
713713
this.subscriptionTrackers.stream().filter(Objects::nonNull).count(),
714714
this.streamToStreamSubscriptions.size());
715-
iterate(this.subscriptionTrackers, SubscriptionTracker::markNotConsuming);
715+
iterate(this.subscriptionTrackers, SubscriptionTracker::markRecovering);
716716
environment
717717
.scheduledExecutorService()
718718
.execute(
@@ -787,7 +787,7 @@ private ClientSubscriptionsManager(
787787
}
788788

789789
if (affectedSubscriptions != null && !affectedSubscriptions.isEmpty()) {
790-
iterate(affectedSubscriptions, SubscriptionTracker::markNotConsuming);
790+
iterate(affectedSubscriptions, SubscriptionTracker::markRecovering);
791791
environment
792792
.scheduledExecutorService()
793793
.execute(
@@ -1146,7 +1146,7 @@ void add(
11461146
throw e;
11471147
}
11481148
subscriptionTracker.state(SubscriptionState.ACTIVE);
1149-
subscriptionTracker.markConsuming();
1149+
subscriptionTracker.markOpen();
11501150
LOGGER.debug("Subscribed to '{}'", subscriptionTracker.stream);
11511151
} finally {
11521152
this.subscriptionManagerLock.unlock();
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Copyright (c) 2024-2025 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
5+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
6+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
package com.rabbitmq.stream.impl;
16+
17+
import static com.rabbitmq.stream.Resource.State.CLOSED;
18+
import static com.rabbitmq.stream.Resource.State.OPEN;
19+
import static com.rabbitmq.stream.Resource.State.OPENING;
20+
21+
import com.rabbitmq.stream.InvalidStateException;
22+
import com.rabbitmq.stream.Resource;
23+
import com.rabbitmq.stream.ResourceClosedException;
24+
import java.util.List;
25+
import java.util.concurrent.atomic.AtomicReference;
26+
27+
abstract class ResourceBase implements Resource {
28+
29+
private final AtomicReference<State> state = new AtomicReference<>();
30+
private final StateEventSupport stateEventSupport;
31+
32+
ResourceBase(List<StateListener> listeners) {
33+
this.stateEventSupport = new StateEventSupport(listeners);
34+
this.state(OPENING);
35+
}
36+
37+
protected void checkOpen() {
38+
State state = this.state.get();
39+
if (state == CLOSED) {
40+
throw new ResourceClosedException("Resource is closed");
41+
} else if (state != OPEN) {
42+
throw new InvalidStateException("Resource is not open, current state is %s", state.name());
43+
}
44+
}
45+
46+
protected State state() {
47+
return this.state.get();
48+
}
49+
50+
protected void state(Resource.State state) {
51+
Resource.State previousState = this.state.getAndSet(state);
52+
if (state != previousState) {
53+
this.dispatch(previousState, state);
54+
}
55+
}
56+
57+
private void dispatch(State previous, State current) {
58+
this.stateEventSupport.dispatch(this, previous, current);
59+
}
60+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Copyright (c) 2024-2025 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
5+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
6+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
package com.rabbitmq.stream.impl;
16+
17+
import com.rabbitmq.stream.Resource;
18+
import java.util.List;
19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
21+
22+
final class StateEventSupport {
23+
24+
private static final Logger LOGGER = LoggerFactory.getLogger(StateEventSupport.class);
25+
26+
private final List<Resource.StateListener> listeners;
27+
28+
StateEventSupport(List<Resource.StateListener> listeners) {
29+
this.listeners = List.copyOf(listeners);
30+
}
31+
32+
void dispatch(Resource resource, Resource.State previousState, Resource.State currentState) {
33+
if (!this.listeners.isEmpty()) {
34+
Resource.Context context = new DefaultContext(resource, previousState, currentState);
35+
this.listeners.forEach(
36+
l -> {
37+
try {
38+
l.handle(context);
39+
} catch (Exception e) {
40+
LOGGER.warn("Error in resource listener", e);
41+
}
42+
});
43+
}
44+
}
45+
46+
private static class DefaultContext implements Resource.Context {
47+
48+
private final Resource resource;
49+
private final Resource.State previousState;
50+
private final Resource.State currentState;
51+
52+
private DefaultContext(
53+
Resource resource, Resource.State previousState, Resource.State currentState) {
54+
this.resource = resource;
55+
this.previousState = previousState;
56+
this.currentState = currentState;
57+
}
58+
59+
@Override
60+
public Resource resource() {
61+
return this.resource;
62+
}
63+
64+
@Override
65+
public Resource.State previousState() {
66+
return this.previousState;
67+
}
68+
69+
@Override
70+
public Resource.State currentState() {
71+
return this.currentState;
72+
}
73+
}
74+
}

0 commit comments

Comments
 (0)