Skip to content

WIP: [DEVEX-250] Add built-in auto-serialization #308

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
4 changes: 2 additions & 2 deletions db-client-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ dependencies {
implementation "io.grpc:grpc-stub:${grpcVersion}"
implementation "io.grpc:grpc-protobuf:${grpcVersion}"
implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
implementation "org.slf4j:slf4j-api:2.0.17"
implementation "org.bouncycastle:bcprov-jdk18on:1.80"
implementation "org.bouncycastle:bcpkix-jdk18on:1.80"

implementation platform("io.opentelemetry:opentelemetry-bom:${openTelemetryVersion}")
implementation "io.opentelemetry:opentelemetry-api"
implementation "io.opentelemetry.semconv:opentelemetry-semconv:${openTelemetrySemConvVersion}"
Expand All @@ -64,7 +65,6 @@ dependencies {
testImplementation "org.reactivestreams:reactive-streams-tck:${reactiveStreamsApiVersion}"
testImplementation "org.testcontainers:testcontainers:${testcontainersVersion}"
testImplementation platform("com.fasterxml.jackson:jackson-bom:${jacksonVersion}")
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
testImplementation "com.github.javafaker:javafaker:1.0.2"
testImplementation 'org.slf4j:slf4j-simple:2.0.17'
testImplementation "io.opentelemetry:opentelemetry-sdk"
Expand Down
18 changes: 11 additions & 7 deletions db-client-java/src/main/java/io/kurrent/dbclient/AbstractRead.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ abstract class AbstractRead implements Publisher<ReadMessage> {
protected static final StreamsOuterClass.ReadReq.Options.Builder defaultReadOptions;

private final GrpcClient client;
private final OptionsWithBackPressure<?> options;
private final OptionsWithBackPressureAndSerialization<?> options;

protected AbstractRead(GrpcClient client, OptionsWithBackPressure<?> options) {
protected AbstractRead(GrpcClient client, OptionsWithBackPressureAndSerialization<?> options) {
this.client = client;
this.options = options;
}
Expand All @@ -27,13 +27,17 @@ protected AbstractRead(GrpcClient client, OptionsWithBackPressure<?> options) {

@Override
public void subscribe(Subscriber<? super ReadMessage> subscriber) {
ReadResponseObserver observer = new ReadResponseObserver(options, new ReadStreamConsumer(subscriber));
ReadResponseObserver observer = new ReadResponseObserver(
options,
new ReadStreamConsumer(subscriber),
this.client.getSerializer(options.serializationSettings().orElse(null))
);

this.client.getWorkItemArgs().whenComplete((args, error) -> {
if (error != null) {
observer.onError(error);
return;
}
if (error != null) {
observer.onError(error);
return;
}

StreamsOuterClass.ReadReq request = StreamsOuterClass.ReadReq.newBuilder()
.setOptions(createOptions())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ abstract class AbstractRegularSubscription {
protected SubscriptionListener listener;
protected Checkpointer checkpointer = null;
private final GrpcClient client;
private final OptionsWithBackPressure<?> options;
private final OptionsWithBackPressureAndSerialization<?> options;

protected AbstractRegularSubscription(GrpcClient client, OptionsWithBackPressure<?> options) {
protected AbstractRegularSubscription(GrpcClient client, OptionsWithBackPressureAndSerialization<?> options) {
this.client = client;
this.options = options;
}
Expand Down Expand Up @@ -72,6 +72,10 @@ private ReadResponseObserver createObserver(WorkItemArgs args, CompletableFuture
event);
});

return new ReadResponseObserver(this.options, consumer);
return new ReadResponseObserver(
this.options,
consumer,
this.client.getSerializer(options.serializationSettings().orElse(null))
);
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package io.kurrent.dbclient;

import io.kurrent.dbclient.proto.persistentsubscriptions.Persistent;
import io.kurrent.dbclient.proto.persistentsubscriptions.PersistentSubscriptionsGrpc;
import io.kurrent.dbclient.proto.shared.Shared;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import io.kurrent.dbclient.proto.persistentsubscriptions.Persistent;
import io.kurrent.dbclient.proto.persistentsubscriptions.PersistentSubscriptionsGrpc;
import io.kurrent.dbclient.proto.shared.Shared;
import io.kurrent.dbclient.serialization.MessageSerializer;

import java.util.concurrent.CompletableFuture;

Expand All @@ -18,20 +19,26 @@ abstract class AbstractSubscribePersistentSubscription {
private final String group;
private final PersistentSubscriptionListener listener;
private final SubscribePersistentSubscriptionOptions options;
private final MessageSerializer messageSerializer;

static {
defaultReadOptions = Persistent.ReadReq.Options.newBuilder()
.setUuidOption(Persistent.ReadReq.Options.UUIDOption.newBuilder()
.setStructured(Shared.Empty.getDefaultInstance()));
}

public AbstractSubscribePersistentSubscription(GrpcClient client, String group,
SubscribePersistentSubscriptionOptions options,
PersistentSubscriptionListener listener) {
public AbstractSubscribePersistentSubscription(
GrpcClient client,
String group,
SubscribePersistentSubscriptionOptions options,
PersistentSubscriptionListener listener,
MessageSerializer messageSerializer
) {
this.client = client;
this.group = group;
this.options = options;
this.listener = listener;
this.messageSerializer = messageSerializer;
}

protected abstract Persistent.ReadReq.Options.Builder createOptions();
Expand Down Expand Up @@ -91,7 +98,7 @@ public void onNext(Persistent.ReadResp readResp) {
int retryCount = readResp.getEvent().hasNoRetryCount() ? 0 : readResp.getEvent().getRetryCount();

try {
ResolvedEvent resolvedEvent = ResolvedEvent.fromWire(readResp.getEvent());
ResolvedEvent resolvedEvent = ResolvedEvent.fromWire(readResp.getEvent(), messageSerializer);
ClientTelemetry.traceSubscribe(
() -> listener.onEvent(this._subscription, retryCount, resolvedEvent),
_subscription.getSubscriptionId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,20 @@
class AppendToStream {
private final GrpcClient client;
private final String streamName;
private final List<EventData> events;
private final StreamState streamState;
private final List<MessageData> events;
private final AppendToStreamOptions options;

public AppendToStream(GrpcClient client, String streamName, Iterator<EventData> events, AppendToStreamOptions options) {
public AppendToStream(
GrpcClient client,
String streamName,
StreamState streamState,
Iterator<MessageData> events,
AppendToStreamOptions options
) {
this.client = client;
this.streamName = streamName;
this.streamState = streamState;
this.events = new ArrayList<>();
while (events.hasNext()) {
this.events.add(events.next());
Expand All @@ -40,9 +48,9 @@ public CompletableFuture<WriteResult> execute() {
this.options.getCredentials()));
}

private CompletableFuture<WriteResult> append(ManagedChannel channel, List<EventData> events) {
private CompletableFuture<WriteResult> append(ManagedChannel channel, List<MessageData> events) {
CompletableFuture<WriteResult> result = new CompletableFuture<>();
StreamsOuterClass.AppendReq.Options.Builder options = this.options.getStreamState().applyOnWire(StreamsOuterClass.AppendReq.Options.newBuilder()
StreamsOuterClass.AppendReq.Options.Builder options = this.streamState.applyOnWire(StreamsOuterClass.AppendReq.Options.newBuilder()
.setStreamIdentifier(Shared.StreamIdentifier.newBuilder()
.setStreamName(ByteString.copyFromUtf8(streamName))
.build()));
Expand Down Expand Up @@ -93,18 +101,18 @@ private CompletableFuture<WriteResult> append(ManagedChannel channel, List<Event
try {
requestStream.onNext(StreamsOuterClass.AppendReq.newBuilder().setOptions(options).build());

for (EventData e : events) {
for (MessageData e : events) {
StreamsOuterClass.AppendReq.ProposedMessage.Builder msgBuilder = StreamsOuterClass.AppendReq.ProposedMessage.newBuilder()
.setId(Shared.UUID.newBuilder()
.setStructured(Shared.UUID.Structured.newBuilder()
.setMostSignificantBits(e.getEventId().getMostSignificantBits())
.setLeastSignificantBits(e.getEventId().getLeastSignificantBits())))
.setData(ByteString.copyFrom(e.getEventData()))
.setMostSignificantBits(e.getMessageId().getMostSignificantBits())
.setLeastSignificantBits(e.getMessageId().getLeastSignificantBits())))
.setData(ByteString.copyFrom(e.getMessageData()))
.putMetadata(SystemMetadataKeys.CONTENT_TYPE, e.getContentType())
.putMetadata(SystemMetadataKeys.TYPE, e.getEventType());
.putMetadata(SystemMetadataKeys.TYPE, e.getMessageType());

if (e.getUserMetadata() != null) {
msgBuilder.setCustomMetadata(ByteString.copyFrom(e.getUserMetadata()));
if (e.getMessageMetadata() != null) {
msgBuilder.setCustomMetadata(ByteString.copyFrom(e.getMessageMetadata()));
}

requestStream.onNext(StreamsOuterClass.AppendReq.newBuilder()
Expand All @@ -117,7 +125,7 @@ private CompletableFuture<WriteResult> append(ManagedChannel channel, List<Event
String leaderPort = e.getTrailers().get(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER));

if (leaderHost != null && leaderPort != null) {
NotLeaderException reason = new NotLeaderException(leaderHost, Integer.valueOf(leaderPort));
NotLeaderException reason = new NotLeaderException(leaderHost, Integer.parseInt(leaderPort));
result.completeExceptionally(reason);
} else {
result.completeExceptionally(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,36 @@
package io.kurrent.dbclient;

import io.kurrent.dbclient.serialization.OperationSerializationSettings;

import java.util.Optional;

/**
* Options of the append stream request.
*/
public class AppendToStreamOptions extends OptionsWithStreamStateBase<AppendToStreamOptions> {
private OperationSerializationSettings serializationSettings = null;

private AppendToStreamOptions() {
}

/**
* Returns optional serialization settings
*/
public Optional<OperationSerializationSettings> serializationSettings() {
return Optional.ofNullable(this.serializationSettings);
}

/**
* Allows to customize or disable the automatic deserialization
*
* @param serializationSettings - expected revision.
* @return updated options.
*/
public AppendToStreamOptions serializationSettings(OperationSerializationSettings serializationSettings) {
this.serializationSettings = serializationSettings;
return this;
}

/**
* Returns options with default values.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ private static Tracer getTracer() {
ClientTelemetry.class.getPackage().getImplementationVersion());
}

private static List<EventData> tryInjectTracingContext(Span span, List<EventData> events) {
List<EventData> injectedEvents = new ArrayList<>();
for (EventData event : events) {
private static List<MessageData> tryInjectTracingContext(Span span, List<MessageData> events) {
List<MessageData> injectedEvents = new ArrayList<>();
for (MessageData event : events) {
boolean isJsonEvent = Objects.equals(event.getContentType(), ContentType.JSON);

injectedEvents.add(EventDataBuilder
.binary(event.getEventId(), event.getEventType(), event.getEventData(), isJsonEvent)
.metadataAsBytes(tryInjectTracingContext(span, event.getUserMetadata()))
.build());
injectedEvents.add(
MessageDataBuilder
.with(event.getMessageType(), event.getMessageData(), tryInjectTracingContext(span, event.getMessageMetadata()), event.getMessageId(), isJsonEvent)
.build());
}
return injectedEvents;
}
Expand Down Expand Up @@ -85,9 +85,9 @@ private static SpanContext tryExtractTracingContext(byte[] userMetadataBytes) {
}

static CompletableFuture<WriteResult> traceAppend(
BiFunction<ManagedChannel, List<EventData>, CompletableFuture<WriteResult>> appendOperation,
BiFunction<ManagedChannel, List<MessageData>, CompletableFuture<WriteResult>> appendOperation,
ManagedChannel channel,
List<EventData> events, String streamId, KurrentDBClientSettings settings,
List<MessageData> events, String streamId, KurrentDBClientSettings settings,
UserCredentials optionalCallCredentials) {
Span span = createSpan(
ClientTelemetryConstants.Operations.APPEND,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


import io.grpc.ClientInterceptor;
import io.kurrent.dbclient.serialization.KurrentDBClientSerializationSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -36,6 +37,7 @@ public class ConnectionSettingsBuilder {
private List<ClientInterceptor> _interceptors = new ArrayList<>();
private String _tlsCaFile = null;
private Set<String> _features = new HashSet<>();
private KurrentDBClientSerializationSettings _serializationSettings;

ConnectionSettingsBuilder() {}

Expand All @@ -60,7 +62,9 @@ public KurrentDBClientSettings buildConnectionSettings() {
_defaultDeadline,
_interceptors,
_tlsCaFile,
_features);
_features,
_serializationSettings
);
}

/**
Expand Down Expand Up @@ -241,6 +245,15 @@ public ConnectionSettingsBuilder feature(String feature) {
return this;
}

/**
* Provides configuration options for messages serialization and deserialization in the KurrentDB client.
* If null, default settings are used.
*/
public ConnectionSettingsBuilder serialization(KurrentDBClientSerializationSettings serializationSettings) {
this._serializationSettings = serializationSettings;
return this;
}

void parseGossipSeed(String host) {
String[] hostParts = host.split(":");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public static <A> EventDataBuilder builderAsJson(String eventType, A eventData)
* @return an event data builder.
* @param <A> a type that can be serialized in JSON.
*/
@Deprecated
public static <A> EventDataBuilder builderAsJson(UUID eventId, String eventType, A eventData) {
return EventDataBuilder.json(eventId, eventType, eventData);
}
Expand Down Expand Up @@ -120,5 +121,9 @@ public static EventDataBuilder builderAsBinary(String eventType, byte[] eventDat
public static EventDataBuilder builderAsBinary(UUID eventId, String eventType, byte[] eventData) {
return EventDataBuilder.binary(eventId, eventType, eventData);
}

public MessageData toMessageData() {
return new MessageData(eventType, eventData, userMetadata, eventId, contentType);
}
}

12 changes: 11 additions & 1 deletion db-client-java/src/main/java/io/kurrent/dbclient/GrpcClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.kurrent.dbclient.serialization.MessageSerializer;
import io.kurrent.dbclient.serialization.MessageSerializerBuilder;
import io.kurrent.dbclient.serialization.OperationSerializationSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -19,11 +22,14 @@ class GrpcClient {
private final AtomicBoolean closed;
private final LinkedBlockingQueue<Msg> queue;
private final KurrentDBClientSettings settings;
private final MessageSerializer serializer;

GrpcClient(KurrentDBClientSettings settings, AtomicBoolean closed, LinkedBlockingQueue<Msg> queue) {
this.settings = settings;
this.closed = closed;
this.queue = queue;

this.serializer = MessageSerializerBuilder.get(settings.getSerializationSettings());
}

public boolean isShutdown() {
Expand Down Expand Up @@ -101,7 +107,7 @@ public <A> CompletableFuture<A> runWithArgs(Function<WorkItemArgs, CompletableFu
logger.debug("RunWorkItem[{}] completed exceptionally: {}", args.getId(), e.toString());

if (e instanceof RuntimeException)
throw (RuntimeException)e;
throw (RuntimeException) e;
else
throw new RuntimeException(e);
}
Expand All @@ -120,4 +126,8 @@ public CompletableFuture<Void> shutdown() {
public KurrentDBClientSettings getSettings() {
return this.settings;
}

public MessageSerializer getSerializer(OperationSerializationSettings serializationSettings) {
return this.serializer.with(serializationSettings);
}
}
Loading