Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
429184b
Initial bulk subscribe
shubham1172 Sep 27, 2022
5ee6735
Initial working stage
shubham1172 Sep 27, 2022
c584dfc
Add response
shubham1172 Sep 27, 2022
19cab69
Use a new annotation for BulkSubscribe
shubham1172 Sep 28, 2022
afd2c33
Fix stylecheck issues
shubham1172 Sep 28, 2022
83921a3
Revert refactor
shubham1172 Sep 28, 2022
902faaa
Move BulkSubscribe annotation to springboot
shubham1172 Sep 28, 2022
f492b4c
Fix stylecheck issues
shubham1172 Sep 28, 2022
183e49a
Add unit test
shubham1172 Sep 28, 2022
64e7813
Add another UT scenario
shubham1172 Sep 28, 2022
c2d0447
Refactor
shubham1172 Sep 28, 2022
1a09bda
Fix header licenses
shubham1172 Sep 28, 2022
bba39ac
Fix header licenses
shubham1172 Sep 29, 2022
41c5e26
Fix build checks
shubham1172 Sep 29, 2022
8f41e55
Initial integration tests
shubham1172 Sep 29, 2022
c20c78a
Working ITs
shubham1172 Sep 29, 2022
20d1a84
Fix linting
shubham1172 Sep 29, 2022
4802397
Add docs, fix findbugs
shubham1172 Sep 29, 2022
d77afa2
Fix linting
shubham1172 Sep 29, 2022
aee8dcf
Try FindBugs fix
shubham1172 Sep 29, 2022
4af4c60
Final FindBugs fix
shubham1172 Sep 29, 2022
5740a86
Revert example pubsub name for mechanical-markdown
shubham1172 Sep 29, 2022
f5ada26
Revert IT v3
shubham1172 Sep 29, 2022
eea67e8
Update mm
shubham1172 Sep 29, 2022
9635421
Review comments addressed
shubham1172 Sep 29, 2022
24377f0
Update mm readme
shubham1172 Sep 29, 2022
6ad5fa6
Prevent setting/getting mutable objects
shubham1172 Sep 29, 2022
5d3f107
Fix ITs
shubham1172 Sep 29, 2022
b0e2996
Fix ITs
shubham1172 Sep 29, 2022
5de793a
Make entries not-null
shubham1172 Sep 29, 2022
d4bb05e
Add Dapr ref/
shubham1172 Sep 29, 2022
2815567
Refactor to use List and fix MMg
shubham1172 Sep 29, 2022
ef265a0
Remove MM
shubham1172 Sep 29, 2022
09c8bdb
Remove extra var
shubham1172 Sep 29, 2022
0dd7b83
Fix mm
shubham1172 Sep 29, 2022
8339045
Merge with master
shubham1172 Oct 10, 2022
3f97066
Add MM
shubham1172 Oct 10, 2022
fbe1ea6
Add more UTs
shubham1172 Oct 10, 2022
da4a2d2
Retrigger tests
shubham1172 Oct 10, 2022
ac52e05
Retrigger tests
shubham1172 Oct 10, 2022
58cf265
Retrigger tests
shubham1172 Oct 10, 2022
c177515
Merge branch 'master' into shubham1172/bulk-subscribe-support
shubham1172 Nov 21, 2022
3117af6
Minor changes
shubham1172 Nov 21, 2022
6ace34a
Fix stylecheck issues
shubham1172 Nov 24, 2022
ad71dd9
Merge branch 'master' into shubham1172/bulk-subscribe-support
shubham1172 Nov 28, 2022
f1c4d05
re-trigger pipeline
shubham1172 Nov 28, 2022
f26b425
re-trigger pipeline
shubham1172 Nov 28, 2022
7660469
Update proto baseUrl
shubham1172 Nov 28, 2022
e9394f1
Merge branch 'master' into shubham1172/bulk-subscribe-support
shubham1172 Dec 12, 2022
62a9708
Merge branch 'master' into shubham1172/bulk-subscribe-support
shubham1172 Jan 12, 2023
9baf776
Refactor the latest bulk subscribe
shubham1172 Jan 13, 2023
fbe291c
Merge branch 'shubham1172/bulk-subscribe-support' of github.com:shubh…
shubham1172 Jan 13, 2023
e77fde8
Fix checkstyle
shubham1172 Jan 13, 2023
2df4b6f
Fix integration tests
shubham1172 Jan 13, 2023
396d113
Add new unit tests
shubham1172 Jan 13, 2023
380dffd
Fix issue with MethodInvokeIT
shubham1172 Jan 13, 2023
51303f2
Refactor, remove Dapr-prefix
shubham1172 Jan 13, 2023
a448542
Fix licenses
shubham1172 Jan 13, 2023
b8bb60c
Fix checkstyle
shubham1172 Jan 13, 2023
a635468
Fix IT
shubham1172 Jan 13, 2023
db767b3
Address comments part 1
shubham1172 Jan 16, 2023
5737472
Address comments part 2
shubham1172 Jan 16, 2023
90ba132
Address comments part 3
shubham1172 Jan 16, 2023
d52f339
Fix IT
shubham1172 Jan 16, 2023
60dd546
Fix issue with nullable property and serialization
shubham1172 Jan 16, 2023
5ac914a
Fix IT
shubham1172 Jan 16, 2023
bd0663e
re-trigger pipeline
shubham1172 Jan 16, 2023
b50146a
Fix tests
shubham1172 Jan 16, 2023
07d5772
Merge with master
shubham1172 Jan 18, 2023
af6c880
Refactor
shubham1172 Jan 18, 2023
75111a3
Fix IT
shubham1172 Jan 18, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# IDE generated files and directories
*.iml
.idea/
.run/
.vs/
.vscode/

Expand Down
33 changes: 33 additions & 0 deletions daprdocs/content/en/java-sdk-docs/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,13 @@ try (DaprClient client = (new DaprClientBuilder()).build()) {
```java
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.Topic;
import io.dapr.client.domain.BulkSubscribeAppResponse;
import io.dapr.client.domain.BulkSubscribeAppResponseEntry;
import io.dapr.client.domain.BulkSubscribeAppResponseStatus;
import io.dapr.client.domain.BulkSubscribeMessage;
import io.dapr.client.domain.BulkSubscribeMessageEntry;
import io.dapr.client.domain.CloudEvent;
import io.dapr.springboot.annotations.BulkSubscribe;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
Expand Down Expand Up @@ -186,6 +192,33 @@ public class SubscriberController {
});
}

@BulkSubscribe()
@Topic(name = "testingtopicbulk", pubsubName = "${myAppProperty:messagebus}")
@PostMapping(path = "/testingtopicbulk")
public Mono<BulkSubscribeAppResponse> handleBulkMessage(
@RequestBody(required = false) BulkSubscribeMessage<CloudEvent<String>> bulkMessage) {
return Mono.fromCallable(() -> {
if (bulkMessage.getEntries().size() == 0) {
return new BulkSubscribeAppResponse(new ArrayList<BulkSubscribeAppResponseEntry>());
}

System.out.println("Bulk Subscriber received " + bulkMessage.getEntries().size() + " messages.");

List<BulkSubscribeAppResponseEntry> entries = new ArrayList<BulkSubscribeAppResponseEntry>();
for (BulkSubscribeMessageEntry<?> entry : bulkMessage.getEntries()) {
try {
System.out.printf("Bulk Subscriber message has entry ID: %s\n", entry.getEntryId());
CloudEvent<?> cloudEvent = (CloudEvent<?>) entry.getEvent();
System.out.printf("Bulk Subscriber got: %s\n", cloudEvent.getData());
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS));
} catch (Exception e) {
e.printStackTrace();
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY));
}
}
return new BulkSubscribeAppResponse(entries);
});
}
}
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class Publisher {
private static final String MESSAGE_TTL_IN_SECONDS = "1000";

//The title of the topic to be used for publishing
private static final String TOPIC_NAME = "testingtopic";
private static final String DEFAULT_TOPIC_NAME = "testingtopic";

//The name of the pubsub
private static final String PUBSUB_NAME = "messagebus";
Expand All @@ -48,15 +48,17 @@ public class Publisher {
* @throws Exception A startup Exception.
*/
public static void main(String[] args) throws Exception {
String topicName = getTopicName(args);
try (DaprClient client = new DaprClientBuilder().build()) {
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i);
//Publishing messages
// Publishing messages
client.publishEvent(
PUBSUB_NAME,
TOPIC_NAME,
topicName,
message,
singletonMap(Metadata.TTL_IN_SECONDS, MESSAGE_TTL_IN_SECONDS)).block();

System.out.println("Published message: " + message);

try {
Expand All @@ -73,4 +75,17 @@ public static void main(String[] args) throws Exception {
System.out.println("Done.");
}
}

/**
* If a topic is specified in args, use that.
* Else, fallback to the default topic.
* @param args program arguments
* @return name of the topic to publish messages to.
*/
private static String getTopicName(String[] args) {
if (args.length >= 1) {
return args[0];
}
return DEFAULT_TOPIC_NAME;
}
}
100 changes: 93 additions & 7 deletions examples/src/main/java/io/dapr/examples/pubsub/http/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Dapr Pub-Sub Sample

In this sample, we'll create a publisher and a subscriber java applications using Dapr, based on the publish-subcribe pattern. The publisher will generate messages of a specific topic, while subscriber will listen for messages of specific topic. See [Why Pub-Sub](#why-pub-sub) to understand when this pattern might be a good choice for your software architecture.
In this sample, we'll create a publisher and a subscriber java applications using Dapr, based on the publish-subscribe pattern. The publisher will generate messages of a specific topic, while subscriber will listen for messages of specific topic. See [Why Pub-Sub](#why-pub-sub) to understand when this pattern might be a good choice for your software architecture.

Visit [this](https://docs.dapr.io/developing-applications/building-blocks/pubsub/pubsub-overview/) link for more information about Dapr and Pub-Sub.

Expand Down Expand Up @@ -59,7 +59,7 @@ This Spring Controller handles the message endpoint, printing the message which
The subscription's topic in Dapr is handled automatically via the `@Topic` annotation - which also supports the same expressions in
[Spring's @Value annotations](https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-value-annotations).

The code snippet below shows how to create a subscription using the `@Topic` annotation showcasing expression support. In this case, `myAppProperty` is a Java property that does not exist, so the expression resolves to the default value (`messagebus`).
The code snippet below shows how to create a subscription using the `@Topic` annotation showcasing expression support. In this case, `myAppProperty` is a Java property that does not exist, so the expression resolves to the default value (`messagebus`).

```java
@RestController
Expand All @@ -83,13 +83,49 @@ public class SubscriberController {
}
}
```

The `@BulkSubscribe` annotation can be used with `@Topic` to receive multiple messages at once. See the example on how to handle the bulk messages and respond correctly.

```java
@RestController
public class SubscriberController {
///...
@BulkSubscribe()
@Topic(name = "testingtopicbulk", pubsubName = "${myAppProperty:messagebus}")
@PostMapping(path = "/testingtopicbulk")
public Mono<BulkSubscribeAppResponse> handleBulkMessage(
@RequestBody(required = false) BulkSubscribeMessage<CloudEvent<String>> bulkMessage) {
return Mono.fromCallable(() -> {
System.out.println("Bulk Subscriber received " + bulkMessage.getEntries().size() + " messages.");

List<BulkSubscribeAppResponseEntry> entries = new ArrayList<BulkSubscribeAppResponseEntry>();
for (BulkSubscribeMessageEntry<?> entry : bulkMessage.getEntries()) {
try {
System.out.printf("Bulk Subscriber message has entry ID: %s\n", entry.getEntryId());
CloudEvent<?> cloudEvent = (CloudEvent<?>) entry.getEvent();
System.out.printf("Bulk Subscriber got: %s\n", cloudEvent.getData());
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS));
} catch (Exception e) {
e.printStackTrace();
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY));
}
}
return new BulkSubscribeAppResponse(entries);
});
}
}
```

Execute the follow script in order to run the Subscriber example:

<!-- STEP
name: Run Subscriber
match_order: none
expected_stdout_lines:
- '== APP == Subscriber got: This is message #1'
- '== APP == Subscriber got: This is message #2'
- '== APP == Bulk Subscriber got: This is message #1'
- '== APP == Bulk Subscriber got: This is message #2'
background: true
sleep: 5
-->
Expand All @@ -104,22 +140,28 @@ dapr run --components-path ./components/pubsub --app-id subscriber --app-port 30

The other component is the publisher. It is a simple java application with a main method that uses the Dapr HTTP Client to publish 10 messages to an specific topic.

In the `Publisher.java` file, you will find the `Publisher` class, containing the main method. The main method declares a Dapr Client using the `DaprClientBuilder` class. Notice that this builder gets two serializer implementations in the constructor: One is for Dapr's sent and recieved objects, and second is for objects to be persisted. The client publishes messages using `publishEvent` method. The Dapr client is also within a try-with-resource block to properly close the client at the end. See the code snippet below:
In the `Publisher.java` file, you will find the `Publisher` class, containing the main method. The main method declares a Dapr Client using the `DaprClientBuilder` class. Notice that this builder gets two serializer implementations in the constructor: One is for Dapr's sent and received objects, and second is for objects to be persisted. The client publishes messages using `publishEvent` method. The Dapr client is also within a try-with-resource block to properly close the client at the end. See the code snippet below:
Dapr sidecar will automatically wrap the payload received into a CloudEvent object, which will later on parsed by the subscriber.
```java
public class Publisher {
private static final int NUM_MESSAGES = 10;
private static final String TOPIC_NAME = "testingtopic";
private static final String DEFAULT_TOPIC_NAME = "testingtopic";
private static final String PUBSUB_NAME = "messagebus";

///...
public static void main(String[] args) throws Exception {
//Creating the DaprClient: Using the default builder client produces an HTTP Dapr Client
String topicName = getTopicName(args); // Topic can be configured by args.
// Creating the DaprClient: Using the default builder client produces an HTTP Dapr Client
try (DaprClient client = new DaprClientBuilder().build()) {
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i);
//Publishing messages
client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message).block();
// Publishing messages
client.publishEvent(
PUBSUB_NAME,
topicName,
message,
singletonMap(Metadata.TTL_IN_SECONDS, MESSAGE_TTL_IN_SECONDS)).block();

System.out.println("Published message: " + message);
//...
}
Expand Down Expand Up @@ -246,6 +288,50 @@ Once running, the Subscriber should print the output as follows:

Messages have been retrieved from the topic.

You can also run the publisher to publish messages to `testingtopicbulk` topic, and receive messages using the bulk subscription.

<!-- STEP
name: Run Publisher on bulk topic
expected_stdout_lines:
- '== APP == Published message: This is message #0'
- '== APP == Published message: This is message #1'
background: true
sleep: 15
-->

```bash
dapr run --components-path ./components/pubsub --app-id publisher -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.http.Publisher testingtopicbulk
```

<!-- END_STEP -->

Once running, the Publisher should print the same output as above. The Subscriber should print the output as follows:

```txt
== APP == Bulk Subscriber got 10 messages.
== APP == Bulk Subscriber message has entry ID: d4d81c57-d75c-4a22-a747-e907099ca135
== APP == Bulk Subscriber got: This is message #0
== APP == Bulk Subscriber message has entry ID: f109c837-f7c8-4839-8d71-2df9c467875c
== APP == Bulk Subscriber got: This is message #1
== APP == Bulk Subscriber message has entry ID: d735044f-1320-43e1-bd41-787ad9d26427
== APP == Bulk Subscriber got: This is message #2
== APP == Bulk Subscriber message has entry ID: afe74e5a-1a2b-498a-beca-7a6383141ccf
== APP == Bulk Subscriber got: This is message #3
== APP == Bulk Subscriber message has entry ID: 1df3fa51-d137-4749-891d-973ce58f1e1c
== APP == Bulk Subscriber got: This is message #4
== APP == Bulk Subscriber message has entry ID: ecab82bd-77be-40a1-8b62-2dbb3388d726
== APP == Bulk Subscriber got: This is message #5
== APP == Bulk Subscriber message has entry ID: 49a63916-ed09-4101-969e-13a860e35c55
== APP == Bulk Subscriber got: This is message #6
== APP == Bulk Subscriber message has entry ID: 897ec32c-ad74-4512-8979-ee0a455433e8
== APP == Bulk Subscriber got: This is message #7
== APP == Bulk Subscriber message has entry ID: 67367edc-27a6-4c8c-9e39-31caa0f74b2d
== APP == Bulk Subscriber got: This is message #8
== APP == Bulk Subscriber message has entry ID: f134d21f-0a05-408d-977c-1397b999e908
== APP == Bulk Subscriber got: This is message #9

```

### Tracing

Dapr handles tracing in PubSub automatically. Open Zipkin on [http://localhost:9411/zipkin](http://localhost:9411/zipkin). You should see a screen like the one below:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,21 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.Rule;
import io.dapr.Topic;
import io.dapr.client.domain.BulkSubscribeAppResponse;
import io.dapr.client.domain.BulkSubscribeAppResponseEntry;
import io.dapr.client.domain.BulkSubscribeAppResponseStatus;
import io.dapr.client.domain.BulkSubscribeMessage;
import io.dapr.client.domain.BulkSubscribeMessageEntry;
import io.dapr.client.domain.CloudEvent;
import io.dapr.springboot.annotations.BulkSubscribe;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.List;

/**
* SpringBoot Controller to handle input binding.
*/
Expand All @@ -32,6 +41,7 @@ public class SubscriberController {

/**
* Handles a registered publish endpoint on this app.
*
* @param cloudEvent The cloud event received.
* @return A message containing the time.
*/
Expand All @@ -49,7 +59,9 @@ public Mono<Void> handleMessage(@RequestBody(required = false) CloudEvent<String
}

/**
* Handles a registered publish endpoint on this app (version 2 of a cloud event).
* Handles a registered publish endpoint on this app (version 2 of a cloud
* event).
*
* @param cloudEvent The cloud event received.
* @return A message containing the time.
*/
Expand All @@ -67,4 +79,37 @@ public Mono<Void> handleMessageV2(@RequestBody(required = false) CloudEvent clou
});
}

/**
* Handles a registered subscribe endpoint on this app using bulk subscribe.
*
* @param bulkMessage The bulk pubSub message received.
* @return A list of responses for each event.
*/
@BulkSubscribe()
@Topic(name = "testingtopicbulk", pubsubName = "${myAppProperty:messagebus}")
@PostMapping(path = "/testingtopicbulk")
public Mono<BulkSubscribeAppResponse> handleBulkMessage(
@RequestBody(required = false) BulkSubscribeMessage<CloudEvent<String>> bulkMessage) {
return Mono.fromCallable(() -> {
if (bulkMessage.getEntries().size() == 0) {
return new BulkSubscribeAppResponse(new ArrayList<BulkSubscribeAppResponseEntry>());
}

System.out.println("Bulk Subscriber received " + bulkMessage.getEntries().size() + " messages.");

List<BulkSubscribeAppResponseEntry> entries = new ArrayList<BulkSubscribeAppResponseEntry>();
for (BulkSubscribeMessageEntry<?> entry : bulkMessage.getEntries()) {
try {
System.out.printf("Bulk Subscriber message has entry ID: %s\n", entry.getEntryId());
CloudEvent<?> cloudEvent = (CloudEvent<?>) entry.getEvent();
System.out.printf("Bulk Subscriber got: %s\n", cloudEvent.getData());
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS));
} catch (Exception e) {
e.printStackTrace();
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY));
}
}
return new BulkSubscribeAppResponse(entries);
});
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<grpc.version>1.42.1</grpc.version>
<protobuf.version>3.17.3</protobuf.version>
<dapr.proto.baseurl>https://github.com/raw/dapr/dapr/v1.9.0-rc.3/dapr/proto</dapr.proto.baseurl>
<dapr.proto.baseurl>https://github.com/raw/dapr/dapr/a8c698ad897e42d6624f5fc6ccfd0630e2a8fd00/dapr/proto</dapr.proto.baseurl>
<os-maven-plugin.version>1.6.2</os-maven-plugin.version>
<maven-dependency-plugin.version>3.1.1</maven-dependency-plugin.version>
<maven-antrun-plugin.version>1.8</maven-antrun-plugin.version>
Expand Down
Loading