Skip to content

Commit eb8565c

Browse files
authored
feat(pubsub): add bulk subscribe support (#791)
1 parent b83661d commit eb8565c

26 files changed

+1047
-46
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# IDE generated files and directories
22
*.iml
33
.idea/
4+
.run/
45
.vs/
56
.vscode/
67

daprdocs/content/en/java-sdk-docs/_index.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,13 @@ try (DaprClient client = (new DaprClientBuilder()).build()) {
148148
```java
149149
import com.fasterxml.jackson.databind.ObjectMapper;
150150
import io.dapr.Topic;
151+
import io.dapr.client.domain.BulkSubscribeAppResponse;
152+
import io.dapr.client.domain.BulkSubscribeAppResponseEntry;
153+
import io.dapr.client.domain.BulkSubscribeAppResponseStatus;
154+
import io.dapr.client.domain.BulkSubscribeMessage;
155+
import io.dapr.client.domain.BulkSubscribeMessageEntry;
151156
import io.dapr.client.domain.CloudEvent;
157+
import io.dapr.springboot.annotations.BulkSubscribe;
152158
import org.springframework.web.bind.annotation.PostMapping;
153159
import org.springframework.web.bind.annotation.RequestBody;
154160
import org.springframework.web.bind.annotation.RestController;
@@ -186,6 +192,33 @@ public class SubscriberController {
186192
});
187193
}
188194

195+
@BulkSubscribe()
196+
@Topic(name = "testingtopicbulk", pubsubName = "${myAppProperty:messagebus}")
197+
@PostMapping(path = "/testingtopicbulk")
198+
public Mono<BulkSubscribeAppResponse> handleBulkMessage(
199+
@RequestBody(required = false) BulkSubscribeMessage<CloudEvent<String>> bulkMessage) {
200+
return Mono.fromCallable(() -> {
201+
if (bulkMessage.getEntries().size() == 0) {
202+
return new BulkSubscribeAppResponse(new ArrayList<BulkSubscribeAppResponseEntry>());
203+
}
204+
205+
System.out.println("Bulk Subscriber received " + bulkMessage.getEntries().size() + " messages.");
206+
207+
List<BulkSubscribeAppResponseEntry> entries = new ArrayList<BulkSubscribeAppResponseEntry>();
208+
for (BulkSubscribeMessageEntry<?> entry : bulkMessage.getEntries()) {
209+
try {
210+
System.out.printf("Bulk Subscriber message has entry ID: %s\n", entry.getEntryId());
211+
CloudEvent<?> cloudEvent = (CloudEvent<?>) entry.getEvent();
212+
System.out.printf("Bulk Subscriber got: %s\n", cloudEvent.getData());
213+
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS));
214+
} catch (Exception e) {
215+
e.printStackTrace();
216+
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY));
217+
}
218+
}
219+
return new BulkSubscribeAppResponse(entries);
220+
});
221+
}
189222
}
190223
```
191224

examples/src/main/java/io/dapr/examples/pubsub/http/Publisher.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class Publisher {
3737
private static final String MESSAGE_TTL_IN_SECONDS = "1000";
3838

3939
//The title of the topic to be used for publishing
40-
private static final String TOPIC_NAME = "testingtopic";
40+
private static final String DEFAULT_TOPIC_NAME = "testingtopic";
4141

4242
//The name of the pubsub
4343
private static final String PUBSUB_NAME = "messagebus";
@@ -48,15 +48,17 @@ public class Publisher {
4848
* @throws Exception A startup Exception.
4949
*/
5050
public static void main(String[] args) throws Exception {
51+
String topicName = getTopicName(args);
5152
try (DaprClient client = new DaprClientBuilder().build()) {
5253
for (int i = 0; i < NUM_MESSAGES; i++) {
5354
String message = String.format("This is message #%d", i);
54-
//Publishing messages
55+
// Publishing messages
5556
client.publishEvent(
5657
PUBSUB_NAME,
57-
TOPIC_NAME,
58+
topicName,
5859
message,
5960
singletonMap(Metadata.TTL_IN_SECONDS, MESSAGE_TTL_IN_SECONDS)).block();
61+
6062
System.out.println("Published message: " + message);
6163

6264
try {
@@ -73,4 +75,17 @@ public static void main(String[] args) throws Exception {
7375
System.out.println("Done.");
7476
}
7577
}
78+
79+
/**
80+
* If a topic is specified in args, use that.
81+
* Else, fallback to the default topic.
82+
* @param args program arguments
83+
* @return name of the topic to publish messages to.
84+
*/
85+
private static String getTopicName(String[] args) {
86+
if (args.length >= 1) {
87+
return args[0];
88+
}
89+
return DEFAULT_TOPIC_NAME;
90+
}
7691
}

examples/src/main/java/io/dapr/examples/pubsub/http/README.md

Lines changed: 93 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Dapr Pub-Sub Sample
22

3-
In this sample, we'll create a publisher and a subscriber java applications using Dapr, based on the publish-subcribe pattern. The publisher will generate messages of a specific topic, while subscriber will listen for messages of specific topic. See [Why Pub-Sub](#why-pub-sub) to understand when this pattern might be a good choice for your software architecture.
3+
In this sample, we'll create a publisher and a subscriber java applications using Dapr, based on the publish-subscribe pattern. The publisher will generate messages of a specific topic, while subscriber will listen for messages of specific topic. See [Why Pub-Sub](#why-pub-sub) to understand when this pattern might be a good choice for your software architecture.
44

55
Visit [this](https://docs.dapr.io/developing-applications/building-blocks/pubsub/pubsub-overview/) link for more information about Dapr and Pub-Sub.
66

@@ -59,7 +59,7 @@ This Spring Controller handles the message endpoint, printing the message which
5959
The subscription's topic in Dapr is handled automatically via the `@Topic` annotation - which also supports the same expressions in
6060
[Spring's @Value annotations](https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-value-annotations).
6161

62-
The code snippet below shows how to create a subscription using the `@Topic` annotation showcasing expression support. In this case, `myAppProperty` is a Java property that does not exist, so the expression resolves to the default value (`messagebus`).
62+
The code snippet below shows how to create a subscription using the `@Topic` annotation showcasing expression support. In this case, `myAppProperty` is a Java property that does not exist, so the expression resolves to the default value (`messagebus`).
6363

6464
```java
6565
@RestController
@@ -83,13 +83,49 @@ public class SubscriberController {
8383
}
8484
}
8585
```
86+
87+
The `@BulkSubscribe` annotation can be used with `@Topic` to receive multiple messages at once. See the example on how to handle the bulk messages and respond correctly.
88+
89+
```java
90+
@RestController
91+
public class SubscriberController {
92+
///...
93+
@BulkSubscribe()
94+
@Topic(name = "testingtopicbulk", pubsubName = "${myAppProperty:messagebus}")
95+
@PostMapping(path = "/testingtopicbulk")
96+
public Mono<BulkSubscribeAppResponse> handleBulkMessage(
97+
@RequestBody(required = false) BulkSubscribeMessage<CloudEvent<String>> bulkMessage) {
98+
return Mono.fromCallable(() -> {
99+
System.out.println("Bulk Subscriber received " + bulkMessage.getEntries().size() + " messages.");
100+
101+
List<BulkSubscribeAppResponseEntry> entries = new ArrayList<BulkSubscribeAppResponseEntry>();
102+
for (BulkSubscribeMessageEntry<?> entry : bulkMessage.getEntries()) {
103+
try {
104+
System.out.printf("Bulk Subscriber message has entry ID: %s\n", entry.getEntryId());
105+
CloudEvent<?> cloudEvent = (CloudEvent<?>) entry.getEvent();
106+
System.out.printf("Bulk Subscriber got: %s\n", cloudEvent.getData());
107+
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS));
108+
} catch (Exception e) {
109+
e.printStackTrace();
110+
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY));
111+
}
112+
}
113+
return new BulkSubscribeAppResponse(entries);
114+
});
115+
}
116+
}
117+
```
118+
86119
Execute the follow script in order to run the Subscriber example:
87120

88121
<!-- STEP
89122
name: Run Subscriber
123+
match_order: none
90124
expected_stdout_lines:
91125
- '== APP == Subscriber got: This is message #1'
92126
- '== APP == Subscriber got: This is message #2'
127+
- '== APP == Bulk Subscriber got: This is message #1'
128+
- '== APP == Bulk Subscriber got: This is message #2'
93129
background: true
94130
sleep: 5
95131
-->
@@ -104,22 +140,28 @@ dapr run --components-path ./components/pubsub --app-id subscriber --app-port 30
104140

105141
The other component is the publisher. It is a simple java application with a main method that uses the Dapr HTTP Client to publish 10 messages to an specific topic.
106142

107-
In the `Publisher.java` file, you will find the `Publisher` class, containing the main method. The main method declares a Dapr Client using the `DaprClientBuilder` class. Notice that this builder gets two serializer implementations in the constructor: One is for Dapr's sent and recieved objects, and second is for objects to be persisted. The client publishes messages using `publishEvent` method. The Dapr client is also within a try-with-resource block to properly close the client at the end. See the code snippet below:
143+
In the `Publisher.java` file, you will find the `Publisher` class, containing the main method. The main method declares a Dapr Client using the `DaprClientBuilder` class. Notice that this builder gets two serializer implementations in the constructor: One is for Dapr's sent and received objects, and second is for objects to be persisted. The client publishes messages using `publishEvent` method. The Dapr client is also within a try-with-resource block to properly close the client at the end. See the code snippet below:
108144
Dapr sidecar will automatically wrap the payload received into a CloudEvent object, which will later on parsed by the subscriber.
109145
```java
110146
public class Publisher {
111147
private static final int NUM_MESSAGES = 10;
112-
private static final String TOPIC_NAME = "testingtopic";
148+
private static final String DEFAULT_TOPIC_NAME = "testingtopic";
113149
private static final String PUBSUB_NAME = "messagebus";
114150

115151
///...
116152
public static void main(String[] args) throws Exception {
117-
//Creating the DaprClient: Using the default builder client produces an HTTP Dapr Client
153+
String topicName = getTopicName(args); // Topic can be configured by args.
154+
// Creating the DaprClient: Using the default builder client produces an HTTP Dapr Client
118155
try (DaprClient client = new DaprClientBuilder().build()) {
119156
for (int i = 0; i < NUM_MESSAGES; i++) {
120157
String message = String.format("This is message #%d", i);
121-
//Publishing messages
122-
client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message).block();
158+
// Publishing messages
159+
client.publishEvent(
160+
PUBSUB_NAME,
161+
topicName,
162+
message,
163+
singletonMap(Metadata.TTL_IN_SECONDS, MESSAGE_TTL_IN_SECONDS)).block();
164+
123165
System.out.println("Published message: " + message);
124166
//...
125167
}
@@ -246,6 +288,50 @@ Once running, the Subscriber should print the output as follows:
246288

247289
Messages have been retrieved from the topic.
248290

291+
You can also run the publisher to publish messages to `testingtopicbulk` topic, and receive messages using the bulk subscription.
292+
293+
<!-- STEP
294+
name: Run Publisher on bulk topic
295+
expected_stdout_lines:
296+
- '== APP == Published message: This is message #0'
297+
- '== APP == Published message: This is message #1'
298+
background: true
299+
sleep: 15
300+
-->
301+
302+
```bash
303+
dapr run --components-path ./components/pubsub --app-id publisher -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.http.Publisher testingtopicbulk
304+
```
305+
306+
<!-- END_STEP -->
307+
308+
Once running, the Publisher should print the same output as above. The Subscriber should print the output as follows:
309+
310+
```txt
311+
== APP == Bulk Subscriber got 10 messages.
312+
== APP == Bulk Subscriber message has entry ID: d4d81c57-d75c-4a22-a747-e907099ca135
313+
== APP == Bulk Subscriber got: This is message #0
314+
== APP == Bulk Subscriber message has entry ID: f109c837-f7c8-4839-8d71-2df9c467875c
315+
== APP == Bulk Subscriber got: This is message #1
316+
== APP == Bulk Subscriber message has entry ID: d735044f-1320-43e1-bd41-787ad9d26427
317+
== APP == Bulk Subscriber got: This is message #2
318+
== APP == Bulk Subscriber message has entry ID: afe74e5a-1a2b-498a-beca-7a6383141ccf
319+
== APP == Bulk Subscriber got: This is message #3
320+
== APP == Bulk Subscriber message has entry ID: 1df3fa51-d137-4749-891d-973ce58f1e1c
321+
== APP == Bulk Subscriber got: This is message #4
322+
== APP == Bulk Subscriber message has entry ID: ecab82bd-77be-40a1-8b62-2dbb3388d726
323+
== APP == Bulk Subscriber got: This is message #5
324+
== APP == Bulk Subscriber message has entry ID: 49a63916-ed09-4101-969e-13a860e35c55
325+
== APP == Bulk Subscriber got: This is message #6
326+
== APP == Bulk Subscriber message has entry ID: 897ec32c-ad74-4512-8979-ee0a455433e8
327+
== APP == Bulk Subscriber got: This is message #7
328+
== APP == Bulk Subscriber message has entry ID: 67367edc-27a6-4c8c-9e39-31caa0f74b2d
329+
== APP == Bulk Subscriber got: This is message #8
330+
== APP == Bulk Subscriber message has entry ID: f134d21f-0a05-408d-977c-1397b999e908
331+
== APP == Bulk Subscriber got: This is message #9
332+
333+
```
334+
249335
### Tracing
250336

251337
Dapr handles tracing in PubSub automatically. Open Zipkin on [http://localhost:9411/zipkin](http://localhost:9411/zipkin). You should see a screen like the one below:

examples/src/main/java/io/dapr/examples/pubsub/http/SubscriberController.java

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,21 @@
1616
import com.fasterxml.jackson.databind.ObjectMapper;
1717
import io.dapr.Rule;
1818
import io.dapr.Topic;
19+
import io.dapr.client.domain.BulkSubscribeAppResponse;
20+
import io.dapr.client.domain.BulkSubscribeAppResponseEntry;
21+
import io.dapr.client.domain.BulkSubscribeAppResponseStatus;
22+
import io.dapr.client.domain.BulkSubscribeMessage;
23+
import io.dapr.client.domain.BulkSubscribeMessageEntry;
1924
import io.dapr.client.domain.CloudEvent;
25+
import io.dapr.springboot.annotations.BulkSubscribe;
2026
import org.springframework.web.bind.annotation.PostMapping;
2127
import org.springframework.web.bind.annotation.RequestBody;
2228
import org.springframework.web.bind.annotation.RestController;
2329
import reactor.core.publisher.Mono;
2430

31+
import java.util.ArrayList;
32+
import java.util.List;
33+
2534
/**
2635
* SpringBoot Controller to handle input binding.
2736
*/
@@ -32,6 +41,7 @@ public class SubscriberController {
3241

3342
/**
3443
* Handles a registered publish endpoint on this app.
44+
*
3545
* @param cloudEvent The cloud event received.
3646
* @return A message containing the time.
3747
*/
@@ -49,7 +59,9 @@ public Mono<Void> handleMessage(@RequestBody(required = false) CloudEvent<String
4959
}
5060

5161
/**
52-
* Handles a registered publish endpoint on this app (version 2 of a cloud event).
62+
* Handles a registered publish endpoint on this app (version 2 of a cloud
63+
* event).
64+
*
5365
* @param cloudEvent The cloud event received.
5466
* @return A message containing the time.
5567
*/
@@ -67,4 +79,37 @@ public Mono<Void> handleMessageV2(@RequestBody(required = false) CloudEvent clou
6779
});
6880
}
6981

82+
/**
83+
* Handles a registered subscribe endpoint on this app using bulk subscribe.
84+
*
85+
* @param bulkMessage The bulk pubSub message received.
86+
* @return A list of responses for each event.
87+
*/
88+
@BulkSubscribe()
89+
@Topic(name = "testingtopicbulk", pubsubName = "${myAppProperty:messagebus}")
90+
@PostMapping(path = "/testingtopicbulk")
91+
public Mono<BulkSubscribeAppResponse> handleBulkMessage(
92+
@RequestBody(required = false) BulkSubscribeMessage<CloudEvent<String>> bulkMessage) {
93+
return Mono.fromCallable(() -> {
94+
if (bulkMessage.getEntries().size() == 0) {
95+
return new BulkSubscribeAppResponse(new ArrayList<BulkSubscribeAppResponseEntry>());
96+
}
97+
98+
System.out.println("Bulk Subscriber received " + bulkMessage.getEntries().size() + " messages.");
99+
100+
List<BulkSubscribeAppResponseEntry> entries = new ArrayList<BulkSubscribeAppResponseEntry>();
101+
for (BulkSubscribeMessageEntry<?> entry : bulkMessage.getEntries()) {
102+
try {
103+
System.out.printf("Bulk Subscriber message has entry ID: %s\n", entry.getEntryId());
104+
CloudEvent<?> cloudEvent = (CloudEvent<?>) entry.getEvent();
105+
System.out.printf("Bulk Subscriber got: %s\n", cloudEvent.getData());
106+
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS));
107+
} catch (Exception e) {
108+
e.printStackTrace();
109+
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY));
110+
}
111+
}
112+
return new BulkSubscribeAppResponse(entries);
113+
});
114+
}
70115
}

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
1717
<grpc.version>1.42.1</grpc.version>
1818
<protobuf.version>3.17.3</protobuf.version>
19-
<dapr.proto.baseurl>https://github.com/raw/dapr/dapr/v1.9.0-rc.3/dapr/proto</dapr.proto.baseurl>
19+
<dapr.proto.baseurl>https://github.com/raw/dapr/dapr/a8c698ad897e42d6624f5fc6ccfd0630e2a8fd00/dapr/proto</dapr.proto.baseurl>
2020
<os-maven-plugin.version>1.6.2</os-maven-plugin.version>
2121
<maven-dependency-plugin.version>3.1.1</maven-dependency-plugin.version>
2222
<maven-antrun-plugin.version>1.8</maven-antrun-plugin.version>

0 commit comments

Comments
 (0)