messageData = serializer
.serialize(messages, serializationContext)
.iterator();
diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClientBase.java b/db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClientBase.java
index 9f4e19eb..80b34ee0 100644
--- a/db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClientBase.java
+++ b/db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClientBase.java
@@ -16,7 +16,6 @@
public class KurrentDBClientBase {
final Logger logger = LoggerFactory.getLogger(KurrentDBClientBase.class);
final private GrpcClient client;
- final protected MessageSerializer serializer;
KurrentDBClientBase(KurrentDBClientSettings settings) {
Discovery discovery;
@@ -33,8 +32,6 @@ public class KurrentDBClientBase {
this.client = service.getHandle();
CompletableFuture.runAsync(service, createConnectionLoopExecutor());
-
- serializer = MessageSerializerBuilder.get(settings.getSerializationSettings());
}
private Executor createConnectionLoopExecutor() {
return Executors.newSingleThreadExecutor(r -> {
diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/Message.java b/db-client-java/src/main/java/io/kurrent/dbclient/Message.java
index 85b8aebb..0895e7b3 100644
--- a/db-client-java/src/main/java/io/kurrent/dbclient/Message.java
+++ b/db-client-java/src/main/java/io/kurrent/dbclient/Message.java
@@ -34,7 +34,7 @@ public Message(Object data, Object metadata, UUID messageId) {
*
* Example:
*
- * // Create a message with a specific ID
+ * Create a message with a specific ID
* UserRegistered userRegistered = new UserRegistered("123", "Alice");
* Message message = Message.from(userRegistered);
*
@@ -53,7 +53,7 @@ public static Message from(Object data) {
*
* Example:
*
- * // Create a message with a specific ID
+ * Create a message with a specific ID
* UserRegistered userRegistered = new UserRegistered("123", "Alice");
* UUID messageId = UUID.randomUUID();
* Message message = Message.from(userRegistered, messageId);
@@ -74,7 +74,7 @@ public static Message from(Object data, UUID messageId) {
*
* Example:
*
- * // Create a message with data and metadata
+ * Create a message with data and metadata
* OrderPlaced orderPlaced = new OrderPlaced("ORD-123", 99.99);
* EventMetadata metadata = new EventMetadata(
* "user-456",
diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/OptionsBase.java b/db-client-java/src/main/java/io/kurrent/dbclient/OptionsBase.java
index f7199a88..bb50b6d0 100644
--- a/db-client-java/src/main/java/io/kurrent/dbclient/OptionsBase.java
+++ b/db-client-java/src/main/java/io/kurrent/dbclient/OptionsBase.java
@@ -8,7 +8,7 @@ class OptionsBase {
private final OperationKind kind;
private UserCredentials credentials;
private boolean requiresLeader;
- private Map headers = new HashMap<>();
+ private final Map headers = new HashMap<>();
protected OptionsBase() {
this(OperationKind.Regular);
diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/OptionsWithBackPressureAndSerialization.java b/db-client-java/src/main/java/io/kurrent/dbclient/OptionsWithBackPressureAndSerialization.java
new file mode 100644
index 00000000..eb9b790c
--- /dev/null
+++ b/db-client-java/src/main/java/io/kurrent/dbclient/OptionsWithBackPressureAndSerialization.java
@@ -0,0 +1,29 @@
+package io.kurrent.dbclient;
+
+import java.util.Optional;
+
+import io.kurrent.dbclient.serialization.OperationSerializationSettings;
+
+class OptionsWithBackPressureAndSerialization extends OptionsWithBackPressure {
+ public OperationSerializationSettings serializationSettings;
+
+ protected OptionsWithBackPressureAndSerialization(OperationKind kind) {
+ super(kind);
+ }
+
+ /**
+ * Allows to customize or disable the automatic deserialization.
+ */
+ public Optional serializationSettings() {
+ return Optional.ofNullable(serializationSettings);
+ }
+
+ /**
+ * Customize or disable the automatic deserialization.
+ */
+ @SuppressWarnings("unchecked")
+ public T serializationSettings(OperationSerializationSettings serializationSettings) {
+ this.serializationSettings = serializationSettings;
+ return (T)this;
+ }
+}
diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/OptionsWithPositionAndResolveLinkTosBase.java b/db-client-java/src/main/java/io/kurrent/dbclient/OptionsWithPositionAndResolveLinkTosBase.java
index 543126af..dc777fa5 100644
--- a/db-client-java/src/main/java/io/kurrent/dbclient/OptionsWithPositionAndResolveLinkTosBase.java
+++ b/db-client-java/src/main/java/io/kurrent/dbclient/OptionsWithPositionAndResolveLinkTosBase.java
@@ -1,6 +1,6 @@
package io.kurrent.dbclient;
-class OptionsWithPositionAndResolveLinkTosBase extends OptionsWithBackPressure {
+class OptionsWithPositionAndResolveLinkTosBase extends OptionsWithBackPressureAndSerialization {
private StreamPosition position;
protected OptionsWithPositionAndResolveLinkTosBase(OperationKind kind) {
diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/OptionsWithStartRevisionAndResolveLinkTosBase.java b/db-client-java/src/main/java/io/kurrent/dbclient/OptionsWithStartRevisionAndResolveLinkTosBase.java
index 9b978972..5afd050a 100644
--- a/db-client-java/src/main/java/io/kurrent/dbclient/OptionsWithStartRevisionAndResolveLinkTosBase.java
+++ b/db-client-java/src/main/java/io/kurrent/dbclient/OptionsWithStartRevisionAndResolveLinkTosBase.java
@@ -1,6 +1,6 @@
package io.kurrent.dbclient;
-class OptionsWithStartRevisionAndResolveLinkTosBase extends OptionsWithBackPressure {
+class OptionsWithStartRevisionAndResolveLinkTosBase extends OptionsWithBackPressureAndSerialization {
private StreamPosition startRevision;
protected OptionsWithStartRevisionAndResolveLinkTosBase(OperationKind kind) {
diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/ReadResponseObserver.java b/db-client-java/src/main/java/io/kurrent/dbclient/ReadResponseObserver.java
index da6408c8..5f3b182b 100644
--- a/db-client-java/src/main/java/io/kurrent/dbclient/ReadResponseObserver.java
+++ b/db-client-java/src/main/java/io/kurrent/dbclient/ReadResponseObserver.java
@@ -7,6 +7,7 @@
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
+import io.kurrent.dbclient.serialization.MessageSerializer;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -24,11 +25,16 @@ class ReadResponseObserver implements ClientResponseObserver requestStream;
private int outstandingRequests;
private WorkItemArgs args;
-
-
- public ReadResponseObserver(OptionsWithBackPressure> options, StreamConsumer consumer) {
+ private final MessageSerializer messageSerializer;
+
+ public ReadResponseObserver(
+ OptionsWithBackPressure> options,
+ StreamConsumer consumer,
+ MessageSerializer messageSerializer
+ ) {
this.options = options;
this.consumer = consumer;
+ this.messageSerializer = messageSerializer;
}
public Subscription getSubscription() {
@@ -105,7 +111,7 @@ public void onNext(StreamsOuterClass.ReadResp value) {
}
if (value.hasEvent())
- consumer.onEvent(ResolvedEvent.fromWire(value.getEvent()));
+ consumer.onEvent(ResolvedEvent.fromWire(value.getEvent(), messageSerializer));
else if (value.hasConfirmation())
consumer.onSubscriptionConfirmation(value.getConfirmation().getSubscriptionId());
else if (value.hasCheckpoint()) {
diff --git a/db-client-java/src/main/java/io/kurrent/dbclient/ResolvedEvent.java b/db-client-java/src/main/java/io/kurrent/dbclient/ResolvedEvent.java
index 8f5a8e3d..652f404b 100644
--- a/db-client-java/src/main/java/io/kurrent/dbclient/ResolvedEvent.java
+++ b/db-client-java/src/main/java/io/kurrent/dbclient/ResolvedEvent.java
@@ -2,10 +2,10 @@
import io.kurrent.dbclient.proto.persistentsubscriptions.Persistent;
import io.kurrent.dbclient.proto.streams.StreamsOuterClass;
+import io.kurrent.dbclient.serialization.MessageSerializer;
import java.util.Objects;
import java.util.Optional;
-import java.util.StringJoiner;
/**
* Represents an event with a potential link.
@@ -13,12 +13,17 @@
public class ResolvedEvent {
private final RecordedEvent event;
private final RecordedEvent link;
-
private final Position position;
+ private final Message message;
public ResolvedEvent(RecordedEvent event, RecordedEvent link, Position position) {
+ this(event, link, null, position);
+ }
+
+ public ResolvedEvent(RecordedEvent event, RecordedEvent link, Message message, Position position) {
this.event = event;
this.link = link;
+ this.message = message;
this.position = position;
}
@@ -44,8 +49,26 @@ public RecordedEvent getOriginalEvent() {
return this.link != null ? this.link : this.event;
}
+
+ /**
+ * Returns the deserialized message
+ * It will be provided or equal to null, depending on the automatic deserialization settings you choose.
+ * If it's null, you can use OriginalEvent to deserialize it manually.
+ */
+ public Optional getMessage() {
+ return Optional.ofNullable(message);
+ }
+
+ /**
+ * Returns the deserialized message data.
+ */
+ public Optional