Skip to content

Commit d1159f9

Browse files
committed
Make explicit that an instance has been closed
Fixes #169
1 parent a610d9a commit d1159f9

File tree

6 files changed

+67
-0
lines changed

6 files changed

+67
-0
lines changed

src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,7 @@ void start() {
330330

331331
@Override
332332
public void store(long offset) {
333+
checkNotClosed();
333334
trackingCallback.accept(offset);
334335
if (canTrack()) {
335336
if (offsetBefore(this.lastRequestedStoredOffset, offset)
@@ -475,6 +476,7 @@ void running() {
475476

476477
@Override
477478
public long storedOffset() {
479+
checkNotClosed();
478480
if (canTrack()) {
479481
// the client can be null by now, so we catch any exception
480482
QueryOffsetResponse response;
@@ -561,4 +563,10 @@ public String toString() {
561563
+ (trackingClient == null ? "null" : ("\"" + trackingClient.connectionName() + "\""))
562564
+ "}";
563565
}
566+
567+
private void checkNotClosed() {
568+
if (this.status == Status.CLOSED) {
569+
throw new IllegalStateException("This producer instance has been closed");
570+
}
571+
}
564572
}

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,11 +371,13 @@ void maybeInitializeLocator() {
371371

372372
@Override
373373
public StreamCreator streamCreator() {
374+
checkNotClosed();
374375
return new StreamStreamCreator(this);
375376
}
376377

377378
@Override
378379
public void deleteStream(String stream) {
380+
checkNotClosed();
379381
this.maybeInitializeLocator();
380382
Client.Response response = this.locator().delete(stream);
381383
if (!response.isOk()) {
@@ -391,6 +393,7 @@ public void deleteStream(String stream) {
391393

392394
@Override
393395
public StreamStats queryStreamStats(String stream) {
396+
checkNotClosed();
394397
StreamStatsResponse response =
395398
locatorOperation(
396399
client -> {
@@ -454,6 +457,7 @@ public long committedChunkId() {
454457

455458
@Override
456459
public ProducerBuilder producerBuilder() {
460+
checkNotClosed();
457461
return new StreamProducerBuilder(this);
458462
}
459463

@@ -475,6 +479,7 @@ void removeConsumer(StreamConsumer consumer) {
475479

476480
@Override
477481
public ConsumerBuilder consumerBuilder() {
482+
checkNotClosed();
478483
return new StreamConsumerBuilder(this);
479484
}
480485

@@ -734,4 +739,10 @@ public LocatorNotAvailableException() {
734739
super("Locator not available");
735740
}
736741
}
742+
743+
private void checkNotClosed() {
744+
if (this.closed.get()) {
745+
throw new IllegalStateException("This environment instance has been closed");
746+
}
747+
}
737748
}

src/main/java/com/rabbitmq/stream/impl/StreamProducer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ public MessageBuilder messageBuilder() {
268268

269269
@Override
270270
public long getLastPublishingId() {
271+
checkNotClosed();
271272
if (this.name != null && !this.name.isEmpty()) {
272273
if (canSend()) {
273274
try {
@@ -516,4 +517,10 @@ public String toString() {
516517
+ (client == null ? "null" : ("\"" + client.connectionName() + "\""))
517518
+ "}";
518519
}
520+
521+
private void checkNotClosed() {
522+
if (this.closed.get()) {
523+
throw new IllegalStateException("This producer instance has been closed");
524+
}
525+
}
519526
}

src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import java.util.function.UnaryOperator;
5858
import java.util.stream.IntStream;
5959
import java.util.stream.Stream;
60+
import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
6061
import org.junit.jupiter.api.AfterEach;
6162
import org.junit.jupiter.api.BeforeEach;
6263
import org.junit.jupiter.api.Test;
@@ -880,4 +881,17 @@ void offsetZeroShouldBeStored() throws Exception {
880881
consumer.store(0);
881882
waitAtMost(() -> consumer.storedOffset() == 0);
882883
}
884+
885+
@Test
886+
void methodsShouldThrowExceptionWhenConsumerIsClosed() {
887+
Consumer consumer =
888+
environment.consumerBuilder().stream(stream)
889+
.messageHandler((context, message) -> {})
890+
.build();
891+
consumer.close();
892+
ThrowingCallable[] calls =
893+
new ThrowingCallable[] {() -> consumer.store(1), () -> consumer.storedOffset()};
894+
Arrays.stream(calls)
895+
.forEach(call -> assertThatThrownBy(call).isInstanceOf(IllegalStateException.class));
896+
}
883897
}

src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import java.net.ConnectException;
5555
import java.nio.charset.StandardCharsets;
5656
import java.time.Duration;
57+
import java.util.Arrays;
5758
import java.util.Collection;
5859
import java.util.Collections;
5960
import java.util.List;
@@ -67,6 +68,7 @@
6768
import java.util.stream.IntStream;
6869
import javax.net.ssl.SNIHostName;
6970
import javax.net.ssl.SSLParameters;
71+
import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
7072
import org.junit.jupiter.api.AfterAll;
7173
import org.junit.jupiter.api.BeforeAll;
7274
import org.junit.jupiter.api.BeforeEach;
@@ -544,4 +546,20 @@ void queryStreamStatsShouldThrowExceptionWhenStreamDoesNotExist() {
544546
.isInstanceOf(StreamDoesNotExistException.class);
545547
}
546548
}
549+
550+
@Test
551+
void methodsShouldThrowExceptionWhenEnvironmentIsClosed() {
552+
Environment env = environmentBuilder.build();
553+
env.close();
554+
ThrowingCallable[] calls =
555+
new ThrowingCallable[] {
556+
() -> env.streamCreator(),
557+
() -> env.producerBuilder(),
558+
() -> env.consumerBuilder(),
559+
() -> env.deleteStream("does not matter"),
560+
() -> env.queryStreamStats("does not matter")
561+
};
562+
Arrays.stream(calls)
563+
.forEach(call -> assertThatThrownBy(call).isInstanceOf(IllegalStateException.class));
564+
}
547565
}

src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static com.rabbitmq.stream.impl.TestUtils.streamName;
1919
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
2020
import static org.assertj.core.api.Assertions.assertThat;
21+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2122

2223
import ch.qos.logback.classic.Level;
2324
import com.rabbitmq.stream.BackOffDelayPolicy;
@@ -619,4 +620,12 @@ void subEntryBatchesSentCompressedShouldBeConsumedProperly() {
619620
publishedBodies.forEach(
620621
publishBody -> assertThat(consumedBodies.contains(publishBody)).isTrue());
621622
}
623+
624+
@Test
625+
void methodsShouldThrowExceptionWhenProducerIsClosed() {
626+
Producer producer = environment.producerBuilder().stream(stream).build();
627+
producer.close();
628+
assertThatThrownBy(() -> producer.getLastPublishingId())
629+
.isInstanceOf(IllegalStateException.class);
630+
}
622631
}

0 commit comments

Comments
 (0)