Skip to content

Commit a427d64

Browse files
Adding a Flux based subscribeToEvents method (#1598)
* Adding a Flux based subscribeToEvents method Signed-off-by: Artur Ciocanu <[email protected]> * Simplify GRPC stream handling Signed-off-by: Artur Ciocanu <[email protected]> * Simplify Javadoc Signed-off-by: Artur Ciocanu <[email protected]> * Fix unit tests and simplify implementation Signed-off-by: Artur Ciocanu <[email protected]> * Adding event subscriber stream observer to simplify subscription logic Signed-off-by: Artur Ciocanu <[email protected]> * Use start() method to start stream subscription Signed-off-by: Artur Ciocanu <[email protected]> * Add unit test for event suscriber observer Signed-off-by: Artur Ciocanu <[email protected]> * Improve the tests a little bit Signed-off-by: Artur Ciocanu <[email protected]> * Remove the unnecessary method Signed-off-by: Artur Ciocanu <[email protected]> * Improve error handling and use CloudEvent wrapper Signed-off-by: Artur Ciocanu <[email protected]> * Fix unit tests asserts Signed-off-by: Artur Ciocanu <[email protected]> * Adjust Java examples for Subscriber Signed-off-by: Artur Ciocanu <[email protected]> --------- Signed-off-by: Artur Ciocanu <[email protected]>
1 parent fd2606b commit a427d64

File tree

9 files changed

+887
-39
lines changed

9 files changed

+887
-39
lines changed

examples/src/main/java/io/dapr/examples/pubsub/stream/README.md

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ The subscriber uses the `DaprPreviewClient` interface to use a new feature where
4949

5050
The publisher is a simple Java application with a main method that uses the Dapr gRPC Client to publish 10 messages to a specific topic.
5151

52-
In the `Subscriber.java` file, you will find the `Subscriber` class, containing the main method. The main method declares a `DaprPreviewClient` using the `DaprClientBuilder` class. When invoking `subscribeToEvents`, the subscriber provides an implementation of the `SubscriptionListener` interface, receiving a `Subscription` object. The `Subscription` object implements the `Closeable` interface and the `close()` method must be used to stop the subscription.
52+
In the `Subscriber.java` file, you will find the `Subscriber` class, containing the main method. The main method declares a `DaprPreviewClient` using the `DaprClientBuilder` class. When invoking `subscribeToEvents`, the method returns a `Flux<CloudEvent<T>>` that can be processed using reactive operators like `doOnNext()` for event handling and `doOnError()` for error handling. The example uses `blockLast()` to keep the subscriber running indefinitely. For production use cases requiring explicit subscription lifecycle control, you can use `.subscribe()` which returns a `Disposable` that can be disposed via `disposable.dispose()`.
5353

5454
```java
5555
public class Subscriber {
@@ -59,25 +59,19 @@ public class Subscriber {
5959
public static void main(String[] args) throws Exception {
6060
String topicName = getTopicName(args);
6161
try (var client = new DaprClientBuilder().buildPreviewClient()) {
62-
var subscription = client.subscribeToEvents(
62+
// Subscribe to events using the Flux-based reactive API
63+
// The stream will emit CloudEvent<String> objects as they arrive
64+
client.subscribeToEvents(
6365
PUBSUB_NAME,
6466
topicName,
65-
new SubscriptionListener<>() {
66-
67-
@Override
68-
public Mono<Status> onEvent(CloudEvent<String> event) {
69-
System.out.println("Subscriber got: " + event.getData());
70-
return Mono.just(Status.SUCCESS);
71-
}
72-
73-
@Override
74-
public void onError(RuntimeException exception) {
75-
System.out.println("Subscriber got exception: " + exception.getMessage());
76-
}
77-
},
78-
TypeRef.STRING);
79-
80-
subscription.awaitTermination();
67+
TypeRef.STRING)
68+
.doOnNext(event -> {
69+
System.out.println("Subscriber got: " + event.getData());
70+
})
71+
.doOnError(throwable -> {
72+
System.out.println("Subscriber got exception: " + throwable.getMessage());
73+
})
74+
.blockLast(); // Blocks indefinitely until the stream completes (keeps the subscriber running)
8175
}
8276
}
8377

examples/src/main/java/io/dapr/examples/pubsub/stream/Subscriber.java

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,7 @@
1414
package io.dapr.examples.pubsub.stream;
1515

1616
import io.dapr.client.DaprClientBuilder;
17-
import io.dapr.client.SubscriptionListener;
18-
import io.dapr.client.domain.CloudEvent;
1917
import io.dapr.utils.TypeRef;
20-
import reactor.core.publisher.Mono;
2118

2219
/**
2320
* Subscriber using bi-directional gRPC streaming, which does not require an app port.
@@ -44,25 +41,19 @@ public class Subscriber {
4441
public static void main(String[] args) throws Exception {
4542
String topicName = getTopicName(args);
4643
try (var client = new DaprClientBuilder().buildPreviewClient()) {
47-
var subscription = client.subscribeToEvents(
44+
// Subscribe to events using the Flux-based reactive API
45+
// The stream will emit CloudEvent<String> objects as they arrive
46+
client.subscribeToEvents(
4847
PUBSUB_NAME,
4948
topicName,
50-
new SubscriptionListener<>() {
51-
52-
@Override
53-
public Mono<Status> onEvent(CloudEvent<String> event) {
54-
System.out.println("Subscriber got: " + event.getData());
55-
return Mono.just(Status.SUCCESS);
56-
}
57-
58-
@Override
59-
public void onError(RuntimeException exception) {
60-
System.out.println("Subscriber got exception: " + exception.getMessage());
61-
}
62-
},
63-
TypeRef.STRING);
64-
65-
subscription.awaitTermination();
49+
TypeRef.STRING)
50+
.doOnNext(event -> {
51+
System.out.println("Subscriber got: " + event.getData());
52+
})
53+
.doOnError(throwable -> {
54+
System.out.println("Subscriber got exception: " + throwable.getMessage());
55+
})
56+
.blockLast(); // Blocks indefinitely until the stream completes (keeps the subscriber running)
6657
}
6758
}
6859

sdk/src/main/java/io/dapr/client/DaprClientImpl.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
import io.dapr.internal.grpc.DaprClientGrpcInterceptors;
9292
import io.dapr.internal.resiliency.RetryPolicy;
9393
import io.dapr.internal.resiliency.TimeoutPolicy;
94+
import io.dapr.internal.subscription.EventSubscriberStreamObserver;
9495
import io.dapr.serializer.DaprObjectSerializer;
9596
import io.dapr.serializer.DefaultObjectSerializer;
9697
import io.dapr.utils.DefaultContentTypeConverter;
@@ -475,6 +476,42 @@ public <T> Subscription subscribeToEvents(
475476
return buildSubscription(listener, type, request);
476477
}
477478

479+
/**
480+
* {@inheritDoc}
481+
*/
482+
@Override
483+
public <T> Flux<CloudEvent<T>> subscribeToEvents(String pubsubName, String topic, TypeRef<T> type) {
484+
DaprProtos.SubscribeTopicEventsRequestInitialAlpha1 initialRequest =
485+
DaprProtos.SubscribeTopicEventsRequestInitialAlpha1.newBuilder()
486+
.setTopic(topic)
487+
.setPubsubName(pubsubName)
488+
.build();
489+
DaprProtos.SubscribeTopicEventsRequestAlpha1 request =
490+
DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder()
491+
.setInitialRequest(initialRequest)
492+
.build();
493+
494+
return Flux.create(sink -> {
495+
DaprGrpc.DaprStub interceptedStub = this.grpcInterceptors.intercept(this.asyncStub);
496+
EventSubscriberStreamObserver<T> eventSubscriber = new EventSubscriberStreamObserver<>(
497+
interceptedStub,
498+
sink,
499+
type,
500+
this.objectSerializer
501+
);
502+
StreamObserver<DaprProtos.SubscribeTopicEventsRequestAlpha1> requestStream = eventSubscriber.start(request);
503+
504+
// Cleanup when Flux is cancelled or completed
505+
sink.onDispose(() -> {
506+
try {
507+
requestStream.onCompleted();
508+
} catch (Exception e) {
509+
logger.debug("Completing the subscription stream resulted in an error: {}", e.getMessage());
510+
}
511+
});
512+
}, FluxSink.OverflowStrategy.BUFFER);
513+
}
514+
478515
@Nonnull
479516
private <T> Subscription<T> buildSubscription(
480517
SubscriptionListener<T> listener,

sdk/src/main/java/io/dapr/client/DaprPreviewClient.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.dapr.client.domain.BulkPublishRequest;
1818
import io.dapr.client.domain.BulkPublishResponse;
1919
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
20+
import io.dapr.client.domain.CloudEvent;
2021
import io.dapr.client.domain.ConversationRequest;
2122
import io.dapr.client.domain.ConversationRequestAlpha2;
2223
import io.dapr.client.domain.ConversationResponse;
@@ -32,6 +33,7 @@
3233
import io.dapr.client.domain.UnlockResponseStatus;
3334
import io.dapr.client.domain.query.Query;
3435
import io.dapr.utils.TypeRef;
36+
import reactor.core.publisher.Flux;
3537
import reactor.core.publisher.Mono;
3638

3739
import java.util.List;
@@ -271,12 +273,24 @@ <T> Mono<BulkPublishResponse<T>> publishEvents(String pubsubName, String topicNa
271273
* @param topic Name of the topic to subscribe to.
272274
* @param listener Callback methods to process events.
273275
* @param type Type for object deserialization.
274-
* @return An active subscription.
275276
* @param <T> Type of object deserialization.
277+
* @return An active subscription.
278+
* @deprecated Use {@link #subscribeToEvents(String, String, TypeRef)} instead for a more reactive approach.
276279
*/
280+
@Deprecated
277281
<T> Subscription subscribeToEvents(
278282
String pubsubName, String topic, SubscriptionListener<T> listener, TypeRef<T> type);
279283

284+
/**
285+
* Subscribe to pubsub events via streaming using Project Reactor Flux.
286+
* @param pubsubName Name of the pubsub component.
287+
* @param topic Name of the topic to subscribe to.
288+
* @param type Type for object deserialization.
289+
* @return A Flux of CloudEvents containing deserialized event payloads and metadata.
290+
* @param <T> Type of the event payload.
291+
*/
292+
<T> Flux<CloudEvent<T>> subscribeToEvents(String pubsubName, String topic, TypeRef<T> type);
293+
280294
/**
281295
* Schedules a job using the provided job request details.
282296
*

sdk/src/main/java/io/dapr/client/Subscription.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
* Streaming subscription of events for Dapr's pubsub.
3636
* @param <T> Application's object type.
3737
*/
38+
@Deprecated
3839
public class Subscription<T> implements Closeable {
3940

4041
private final BlockingQueue<DaprProtos.SubscribeTopicEventsRequestAlpha1> ackQueue = new LinkedBlockingQueue<>(50);

sdk/src/main/java/io/dapr/client/SubscriptionListener.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
* Callback interface to receive events from a streaming subscription of events.
2121
* @param <T> Object type for deserialization.
2222
*/
23+
@Deprecated
2324
public interface SubscriptionListener<T> {
2425

2526
/**

0 commit comments

Comments
 (0)