diff --git a/build.gradle b/build.gradle index b06e3ba9..f7cab4d4 100644 --- a/build.gradle +++ b/build.gradle @@ -64,6 +64,7 @@ dependencies { implementation "org.slf4j:slf4j-api:2.0.17" implementation "org.bouncycastle:bcprov-jdk18on:1.80" implementation "org.bouncycastle:bcpkix-jdk18on:1.80" + implementation "org.apache.commons:commons-lang3:3.17.0" implementation platform("io.opentelemetry:opentelemetry-bom:${openTelemetryVersion}") implementation "io.opentelemetry:opentelemetry-api" diff --git a/src/main/java/io/kurrent/dbclient/v2/AppendStreamFailure.java b/src/main/java/io/kurrent/dbclient/v2/AppendStreamFailure.java new file mode 100644 index 00000000..e11333bc --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/v2/AppendStreamFailure.java @@ -0,0 +1,63 @@ +package io.kurrent.dbclient.v2; + +import javax.validation.constraints.NotNull; +import java.util.Objects; + +/** + * Represents a failed append to a stream. + */ +public class AppendStreamFailure { + private final String stream; + private final Exception error; + + /** + * Initializes a new instance of the AppendStreamFailure class. + * + * @param stream The stream name. + * @param error The error that occurred. + */ + public AppendStreamFailure(@NotNull String stream, Exception error) { + this.stream = stream; + this.error = error; + } + + /** + * Gets the stream name. + * + * @return The stream name. + */ + public String getStream() { + return stream; + } + + /** + * Gets the error that occurred. + * + * @return The error that occurred. + */ + public Exception getError() { + return error; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AppendStreamFailure that = (AppendStreamFailure) o; + return Objects.equals(stream, that.stream) && + Objects.equals(error, that.error); + } + + @Override + public int hashCode() { + return Objects.hash(stream, error); + } + + @Override + public String toString() { + return "AppendStreamFailure{" + + "stream='" + stream + '\'' + + ", error=" + error + + '}'; + } +} \ No newline at end of file diff --git a/src/main/java/io/kurrent/dbclient/v2/AppendStreamFailures.java b/src/main/java/io/kurrent/dbclient/v2/AppendStreamFailures.java new file mode 100644 index 00000000..0686958a --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/v2/AppendStreamFailures.java @@ -0,0 +1,26 @@ +package io.kurrent.dbclient.v2; + +import java.util.ArrayList; +import java.util.Collection; + +/** + * Represents a collection of failed appends to streams. + */ +public class AppendStreamFailures extends ArrayList { + + /** + * Initializes a new instance of the AppendStreamFailures class. + */ + public AppendStreamFailures() { + super(); + } + + /** + * Initializes a new instance of the AppendStreamFailures class with the specified collection of AppendStreamFailure objects. + * + * @param input The collection of AppendStreamFailure objects. + */ + public AppendStreamFailures(Collection input) { + super(input); + } +} \ No newline at end of file diff --git a/src/main/java/io/kurrent/dbclient/v2/AppendStreamRequest.java b/src/main/java/io/kurrent/dbclient/v2/AppendStreamRequest.java new file mode 100644 index 00000000..28690d71 --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/v2/AppendStreamRequest.java @@ -0,0 +1,80 @@ +package io.kurrent.dbclient.v2; + +import io.kurrent.dbclient.StreamState; + +import javax.validation.constraints.NotNull; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Represents a request to append messages to a stream. + */ +public class AppendStreamRequest { + private final String stream; + private final List messages; + private final StreamState expectedState; + /** + * Initializes a new instance of the AppendStreamRequest class. + * + * @param stream The stream name. + * @param messages The messages to append. + * @param expectedState The expected state of the stream. + */ + public AppendStreamRequest(@NotNull String stream, @NotNull List messages, @NotNull StreamState expectedState) { + this.stream = stream; + this.messages = messages; + this.expectedState = expectedState; + } + + /** + * Gets the stream name. + * + * @return The stream name. + */ + public String getStream() { + return stream; + } + + /** + * Gets the messages to append. + * + * @return The messages to append. + */ + public List getMessages() { + return messages; + } + + /** + * Gets the expected state of the stream. + * + * @return The expected state of the stream. + */ + public StreamState getExpectedState() { + return expectedState; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AppendStreamRequest that = (AppendStreamRequest) o; + return Objects.equals(stream, that.stream) && + Objects.equals(messages, that.messages) && + Objects.equals(expectedState, that.expectedState); + } + + @Override + public int hashCode() { + return Objects.hash(stream, messages, expectedState); + } + + @Override + public String toString() { + return "AppendStreamRequest{" + + "stream='" + stream + '\'' + + ", messages=" + messages + + ", expectedState=" + expectedState + + '}'; + } +} \ No newline at end of file diff --git a/src/main/java/io/kurrent/dbclient/v2/AppendStreamResult.java b/src/main/java/io/kurrent/dbclient/v2/AppendStreamResult.java new file mode 100644 index 00000000..827d007f --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/v2/AppendStreamResult.java @@ -0,0 +1,125 @@ +package io.kurrent.dbclient.v2; + +import java.util.Objects; +import java.util.function.Function; + +/** + * Represents the result of appending to a stream, which can be either a success or a failure. + */ +public class AppendStreamResult { + private final AppendStreamSuccess success; + private final AppendStreamFailure failure; + + private AppendStreamResult(AppendStreamSuccess success, AppendStreamFailure failure) { + this.success = success; + this.failure = failure; + } + + /** + * Creates a new AppendStreamResult from a success. + * + * @param success The success result. + * @return A new AppendStreamResult. + */ + public static AppendStreamResult fromSuccess(AppendStreamSuccess success) { + if (success == null) + throw new IllegalArgumentException("Success cannot be null"); + + return new AppendStreamResult(success, null); + } + + /** + * Creates a new AppendStreamResult from a failure. + * + * @param failure The failure result. + * @return A new AppendStreamResult. + */ + public static AppendStreamResult fromFailure(AppendStreamFailure failure) { + if (failure == null) + throw new IllegalArgumentException("Failure cannot be null"); + + return new AppendStreamResult(null, failure); + } + + /** + * Checks if this result is a success. + * + * @return true if this result is a success, false otherwise. + */ + public boolean isSuccess() { + return success != null; + } + + /** + * Checks if this result is a failure. + * + * @return true if this result is a failure, false otherwise. + */ + public boolean isFailure() { + return failure != null; + } + + /** + * Gets the success result. + * + * @return The success result. + * @throws IllegalStateException if this result is not a success. + */ + public AppendStreamSuccess getSuccess() { + if (!isSuccess()) + throw new IllegalStateException("Result is not a success"); + + return success; + } + + /** + * Gets the failure result. + * + * @return The failure result. + * @throws IllegalStateException if this result is not a failure. + */ + public AppendStreamFailure getFailure() { + if (!isFailure()) + throw new IllegalStateException("Result is not a failure"); + + return failure; + } + + /** + * Folds the result to the appropriate action. + * + * @param successAction The action to perform if this result is a success. + * @param failureAction The action to perform if this result is a failure. + * @param The type of the result. + * @return The result of the action. + */ + public T fold(Function successAction, + Function failureAction) { + if (isSuccess()) + return successAction.apply(success); + else + return failureAction.apply(failure); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AppendStreamResult that = (AppendStreamResult) o; + return Objects.equals(success, that.success) && + Objects.equals(failure, that.failure); + } + + @Override + public int hashCode() { + return Objects.hash(success, failure); + } + + @Override + public String toString() { + if (isSuccess()) + return "AppendStreamResult{success=" + success + '}'; + else + return "AppendStreamResult{failure=" + failure + '}'; + } +} \ No newline at end of file diff --git a/src/main/java/io/kurrent/dbclient/v2/AppendStreamSuccess.java b/src/main/java/io/kurrent/dbclient/v2/AppendStreamSuccess.java new file mode 100644 index 00000000..f5a555ad --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/v2/AppendStreamSuccess.java @@ -0,0 +1,76 @@ +package io.kurrent.dbclient.v2; + +import java.util.Objects; + +/** + * Represents a successful append to a stream. + */ +public class AppendStreamSuccess { + private final String stream; + private final long position; + private final long streamRevision; + + /** + * Initializes a new instance of the AppendStreamSuccess class. + * + * @param stream The stream name. + * @param position The position in the log. + * @param streamRevision The stream revision. + */ + public AppendStreamSuccess(String stream, long position, long streamRevision) { + this.stream = stream != null ? stream : ""; + this.position = position; + this.streamRevision = streamRevision; + } + + /** + * Gets the stream name. + * + * @return The stream name. + */ + public String getStream() { + return stream; + } + + /** + * Gets the position in the log. + * + * @return The position in the log. + */ + public long getPosition() { + return position; + } + + /** + * Gets the stream revision. + * + * @return The stream revision. + */ + public long getStreamRevision() { + return streamRevision; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AppendStreamSuccess that = (AppendStreamSuccess) o; + return position == that.position && + streamRevision == that.streamRevision && + Objects.equals(stream, that.stream); + } + + @Override + public int hashCode() { + return Objects.hash(stream, position, streamRevision); + } + + @Override + public String toString() { + return "AppendStreamSuccess{" + + "stream='" + stream + '\'' + + ", position=" + position + + ", streamRevision=" + streamRevision + + '}'; + } +} \ No newline at end of file diff --git a/src/main/java/io/kurrent/dbclient/v2/AppendStreamSuccesses.java b/src/main/java/io/kurrent/dbclient/v2/AppendStreamSuccesses.java new file mode 100644 index 00000000..f0ba9ecb --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/v2/AppendStreamSuccesses.java @@ -0,0 +1,27 @@ +package io.kurrent.dbclient.v2; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * Represents a collection of successful appends to streams. + */ +public class AppendStreamSuccesses extends ArrayList { + + /** + * Initializes a new instance of the AppendStreamSuccesses class. + */ + public AppendStreamSuccesses() { + super(); + } + + /** + * Initializes a new instance of the AppendStreamSuccesses class with the specified collection of AppendStreamSuccess objects. + * + * @param input The collection of AppendStreamSuccess objects. + */ + public AppendStreamSuccesses(Collection input) { + super(input); + } +} \ No newline at end of file diff --git a/src/main/java/io/kurrent/dbclient/v2/ConsumeFilter.java b/src/main/java/io/kurrent/dbclient/v2/ConsumeFilter.java new file mode 100644 index 00000000..63ca8bec --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/v2/ConsumeFilter.java @@ -0,0 +1,319 @@ +package io.kurrent.dbclient.v2; + +import io.kurrent.dbclient.SubscriptionFilter; +import io.kurrent.dbclient.SubscriptionFilterBuilder; + +import javax.validation.constraints.NotNull; +import java.util.Objects; +import java.util.regex.Pattern; +import java.util.regex.Matcher; + +/** + * Represents a filter for consuming events. + */ +public class ConsumeFilter { + /** + * Represents an empty filter. + */ + public static final ConsumeFilter NONE = new ConsumeFilter(); + + private final ConsumeFilterScope scope; + private final ConsumeFilterType type; + private final String expression; + private final Pattern regex; + + /** + * Creates a new empty consume filter. + */ + public ConsumeFilter() { + this.scope = ConsumeFilterScope.UNSPECIFIED; + this.type = ConsumeFilterType.UNSPECIFIED; + this.expression = ""; + this.regex = Pattern.compile(""); + } + + /** + * Creates a new consume filter with the specified parameters. + * + * @param scope The scope of the filter. + * @param type The type of the filter. + * @param expression The filter expression. + * @param regex The compiled regex pattern. + */ + private ConsumeFilter(ConsumeFilterScope scope, ConsumeFilterType type, String expression, Pattern regex) { + this.scope = scope; + this.type = type; + this.expression = expression; + this.regex = regex; + } + + /** + * Gets the scope of the filter. + * + * @return The filter scope. + */ + public ConsumeFilterScope getScope() { + return scope; + } + + /** + * Gets the type of the filter. + * + * @return The filter type. + */ + public ConsumeFilterType getType() { + return type; + } + + /** + * Gets the filter expression. + * + * @return The filter expression. + */ + public String getExpression() { + return expression; + } + + /** + * Checks if this is a literal filter. + * + * @return True if this is a literal filter, false otherwise. + */ + public boolean isLiteralFilter() { + return type == ConsumeFilterType.LITERAL; + } + + /** + * Checks if this is a regex filter. + * + * @return True if this is a regex filter, false otherwise. + */ + public boolean isRegexFilter() { + return type == ConsumeFilterType.REGEX; + } + + /** + * Checks if this is a stream filter. + * + * @return True if this is a stream filter, false otherwise. + */ + public boolean isStreamFilter() { + return scope == ConsumeFilterScope.STREAM; + } + + /** + * Checks if this is a record filter. + * + * @return True if this is a record filter, false otherwise. + */ + public boolean isRecordFilter() { + return scope == ConsumeFilterScope.RECORD; + } + + /** + * Checks if this is a stream name filter. + * + * @return True if this is a stream name filter, false otherwise. + */ + public boolean isStreamNameFilter() { + return type == ConsumeFilterType.LITERAL && scope == ConsumeFilterScope.STREAM; + } + + /** + * Checks if this is an empty filter. + * + * @return True if this is an empty filter, false otherwise. + */ + public boolean isEmptyFilter() { + return type == ConsumeFilterType.UNSPECIFIED && scope == ConsumeFilterScope.UNSPECIFIED; + } + + /** + * Checks if the input matches this filter. + * + * @param input The input to check. + * @return True if the input matches this filter, false otherwise. + */ + public boolean isMatch(CharSequence input) { + return regex.matcher(input).matches(); + } + + /** + * Creates a stream filter from a stream name. + * + * @param stream The stream name. + * @return A new consume filter for the specified stream. + * @throws IllegalArgumentException If the stream name is invalid. + */ + public static ConsumeFilter fromStream(String stream) { + if (stream == null || stream.trim().isEmpty()) + throw new IllegalArgumentException("Stream name cannot be null or whitespace."); + + if (stream.startsWith("~")) + throw new IllegalArgumentException("Stream name cannot start with '~'."); + + if (stream.length() < 2) + throw new IllegalArgumentException("Stream name must be at least 2 characters long."); + + return new ConsumeFilter( + ConsumeFilterScope.STREAM, + ConsumeFilterType.LITERAL, + stream, + Pattern.compile(Pattern.quote(stream)) + ); + } + + /** + * Creates a filter from prefixes. + * + * @param scope The scope of the filter. + * @param prefixes The prefixes to filter by. + * @return A new consume filter for the specified prefixes. + * @throws IllegalArgumentException If the prefixes are invalid. + */ + public static ConsumeFilter fromPrefixes(ConsumeFilterScope scope, String... prefixes) { + if (prefixes == null || prefixes.length == 0) + throw new IllegalArgumentException("Prefixes cannot be empty."); + + StringBuilder patternBuilder = new StringBuilder("^("); + boolean first = true; + + for (String prefix : prefixes) { + if (prefix == null || prefix.trim().isEmpty()) + throw new IllegalArgumentException("Prefix cannot be empty."); + + if (!first) + patternBuilder.append("|"); + + patternBuilder.append(Pattern.quote(prefix)); + first = false; + } + patternBuilder.append(").*"); + + String pattern = patternBuilder.toString(); + return new ConsumeFilter( + scope, + ConsumeFilterType.REGEX, + pattern, + Pattern.compile(pattern) + ); + } + + /** + * Creates a filter from a comma-separated list of prefixes. + * + * @param scope The scope of the filter. + * @param expression The comma-separated list of prefixes. + * @return A new consume filter for the specified prefixes. + * @throws IllegalArgumentException If the expression is invalid. + */ + public static ConsumeFilter fromPrefixes(ConsumeFilterScope scope, String expression) { + if (expression == null || expression.trim().isEmpty()) + throw new IllegalArgumentException("Prefix expression cannot be empty."); + + return fromPrefixes(scope, expression.split(",")); + } + + /** + * Creates a filter from a regex pattern. + * + * @param scope The scope of the filter. + * @param pattern The regex pattern. + * @return A new consume filter for the specified regex pattern. + * @throws IllegalArgumentException If the pattern is invalid. + */ + public static ConsumeFilter fromRegex(ConsumeFilterScope scope, String pattern) { + String expression = pattern.startsWith("~") ? pattern.substring(1) : pattern; + + try { + return new ConsumeFilter( + scope, + ConsumeFilterType.REGEX, + expression, + Pattern.compile(expression) + ); + } catch (Exception ex) { + throw new IllegalArgumentException("Invalid regex pattern: " + pattern, ex); + } + } + + /** + * Creates a filter from an expression. + * + * @param scope The scope of the filter. + * @param expression The filter expression. + * @return A new consume filter for the specified expression. + * @throws IllegalArgumentException If the expression is invalid. + */ + public static ConsumeFilter create(ConsumeFilterScope scope, String expression) { + if (expression == null || expression.trim().isEmpty()) + throw new IllegalArgumentException("Expression cannot be null or empty."); + + if (expression.startsWith("~")) { + return fromRegex(scope, expression); + } else { + return new ConsumeFilter( + scope, + ConsumeFilterType.LITERAL, + expression, + Pattern.compile(Pattern.quote(expression)) + ); + } + } + + /** + * Converts a ConsumeFilter to a SubscriptionFilter. + * + * @param checkpointInterval The checkpoint interval to use. + * @return The converted subscription filter, or null if the filter is empty. + * @throws IllegalArgumentException If the filter is invalid. + */ + public SubscriptionFilter toSubscriptionFilter(int checkpointInterval) { + if (isEmptyFilter()) + return null; + + SubscriptionFilterBuilder builder = SubscriptionFilter.newBuilder(); + + // Set the checkpoint interval + builder.withMaxWindow(checkpointInterval); + + // Configure the filter based on its scope and type + if (isStreamFilter()) { + if (isRegexFilter()) { + builder.withStreamNameRegularExpression(getExpression()); + } else if (isLiteralFilter()) { + builder.addStreamNamePrefix(getExpression()); + } + } else if (isRecordFilter()) { + if (isRegexFilter()) { + builder.withEventTypeRegularExpression(getExpression()); + } else if (isLiteralFilter()) { + builder.addEventTypePrefix(getExpression()); + } + } else { + throw new IllegalArgumentException("Invalid consume filter."); + } + + return builder.build(); + } + + @Override + public String toString() { + return "[" + scope + "|" + type + "] " + expression; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ConsumeFilter that = (ConsumeFilter) o; + return scope == that.scope && + type == that.type && + Objects.equals(expression, that.expression); + } + + @Override + public int hashCode() { + return Objects.hash(scope, type, expression); + } +} \ No newline at end of file diff --git a/src/main/java/io/kurrent/dbclient/v2/ConsumeFilterScope.java b/src/main/java/io/kurrent/dbclient/v2/ConsumeFilterScope.java new file mode 100644 index 00000000..623d2e02 --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/v2/ConsumeFilterScope.java @@ -0,0 +1,35 @@ +package io.kurrent.dbclient.v2; + +/** + * Defines the scope of a consume filter. + */ +public enum ConsumeFilterScope { + /** + * Unspecified filter scope. + */ + UNSPECIFIED(0), + + /** + * Filter applies to stream names. + */ + STREAM(1), + + /** + * Filter applies to record types. + */ + RECORD(2); + + private final int value; + + ConsumeFilterScope(int value) { + this.value = value; + } + + /** + * Gets the integer value of the enum. + * @return The integer value. + */ + public int getValue() { + return value; + } +} \ No newline at end of file diff --git a/src/main/java/io/kurrent/dbclient/v2/ConsumeFilterType.java b/src/main/java/io/kurrent/dbclient/v2/ConsumeFilterType.java new file mode 100644 index 00000000..88dbe086 --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/v2/ConsumeFilterType.java @@ -0,0 +1,35 @@ +package io.kurrent.dbclient.v2; + +/** + * Defines the type of a consume filter. + */ +public enum ConsumeFilterType { + /** + * Unspecified filter type. + */ + UNSPECIFIED(0), + + /** + * Literal string filter. + */ + LITERAL(1), + + /** + * Regular expression filter. + */ + REGEX(2); + + private final int value; + + ConsumeFilterType(int value) { + this.value = value; + } + + /** + * Gets the integer value of the enum. + * @return The integer value. + */ + public int getValue() { + return value; + } +} \ No newline at end of file diff --git a/src/main/java/io/kurrent/dbclient/v2/DeserializeFunction.java b/src/main/java/io/kurrent/dbclient/v2/DeserializeFunction.java new file mode 100644 index 00000000..23ec333f --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/v2/DeserializeFunction.java @@ -0,0 +1,19 @@ +package io.kurrent.dbclient.v2; + +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; + +/** + * Functional interface for deserializing bytes into an object. + */ +@FunctionalInterface +public interface DeserializeFunction { + /** + * Deserializes the given bytes into an object according to the provided metadata. + * + * @param data The bytes to be deserialized. + * @param metadata The metadata providing additional information for the deserialization process. + * @return A CompletableFuture that resolves to the deserialized object. + */ + CompletableFuture deserialize(ByteBuffer data, Metadata metadata); +} \ No newline at end of file diff --git a/src/main/java/io/kurrent/dbclient/v2/IMetadataDecoder.java b/src/main/java/io/kurrent/dbclient/v2/IMetadataDecoder.java new file mode 100644 index 00000000..8cf666a9 --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/v2/IMetadataDecoder.java @@ -0,0 +1,16 @@ +package io.kurrent.dbclient.v2; + +import io.kurrent.dbclient.StreamMetadata; + +/** + * Interface for decoding metadata from a byte array. + */ +public interface IMetadataDecoder { + /** + * Decodes metadata from a byte array. + * + * @param bytes The byte array containing the encoded metadata. + * @return The decoded metadata. + */ + StreamMetadata decode(byte[] bytes); +} \ No newline at end of file diff --git a/src/main/java/io/kurrent/dbclient/v2/Message.java b/src/main/java/io/kurrent/dbclient/v2/Message.java new file mode 100644 index 00000000..0003dda9 --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/v2/Message.java @@ -0,0 +1,158 @@ +package io.kurrent.dbclient.v2; + +import java.util.UUID; +import java.util.function.Function; + +/** + * Represents a message with payload, metadata, and schema information. + */ +public final class Message { + /** + * An empty message instance. + */ + public static final Message EMPTY = new Message(); + + private final Object value; + private final Metadata metadata; + private final UUID recordId; + private final SchemaDataFormat dataFormat; + + /** + * Initializes a new instance of the Message class with default values. + */ + public Message() { + this(null, new Metadata(), UUID.randomUUID(), SchemaDataFormat.JSON); + } + + /** + * Initializes a new instance of the Message class. + * + * @param value The message payload. + * @param metadata The message metadata. + * @param recordId The assigned record id. + * @param dataFormat The format of the schema associated with the message. + */ + public Message(Object value, Metadata metadata, UUID recordId, SchemaDataFormat dataFormat) { + this.value = value; + this.metadata = metadata != null ? metadata : new Metadata(); + this.recordId = recordId != null ? recordId : UUID.randomUUID(); + this.dataFormat = dataFormat != null ? dataFormat : SchemaDataFormat.JSON; + } + + /** + * Gets the message payload. + * + * @return The message payload. + */ + public Object getValue() { + return value; + } + + /** + * Gets the message metadata. + * + * @return The message metadata. + */ + public Metadata getMetadata() { + return metadata; + } + + /** + * Gets the assigned record id. + * + * @return The assigned record id. + */ + public UUID getRecordId() { + return recordId; + } + + /** + * Gets the format of the schema associated with the message. + * + * @return The schema data format. + */ + public SchemaDataFormat getDataFormat() { + return dataFormat; + } + + /** + * Creates a new Message builder. + * + * @return A new Message builder. + */ + public static Builder builder() { + return new Builder(null, new Metadata(), null, SchemaDataFormat.JSON); + } + + /** + * Builder for creating Message instances. + */ + public static class Builder { + private final Object value; + private final Metadata metadata; + private final UUID recordId; + private final SchemaDataFormat dataFormat; + + private Builder(Object value, Metadata metadata, UUID recordId, SchemaDataFormat dataFormat) { + this.value = value; + this.metadata = metadata; + this.recordId = recordId; + this.dataFormat = dataFormat; + } + + /** + * Sets the message payload. + * + * @param value The message payload. + * @return This builder instance. + */ + public Builder value(Object value) { + return new Builder(value, this.metadata, this.recordId, this.dataFormat); + } + + /** + * Sets the message metadata. + * + * @param metadata The message metadata. + * @return This builder instance. + */ + public Builder metadata(Metadata metadata) { + assert metadata != null : "Metadata cannot be null"; + return new Builder(this.value, metadata, this.recordId, this.dataFormat); + } + + /** + * Sets the assigned record id. + * + * @param recordId The assigned record id. + * @return This builder instance. + */ + public Builder recordId(UUID recordId) { + assert recordId != null : "Record ID cannot be null"; + return new Builder(this.value, this.metadata, recordId, this.dataFormat); + } + + /** + * Sets the format of the schema associated with the message. + * + * @param dataFormat The schema data format. + * @return This builder instance. + */ + public Builder dataFormat(SchemaDataFormat dataFormat) { + return new Builder(this.value, this.metadata, this.recordId, dataFormat); + } + + public Builder when(boolean condition, Function func) { + return condition ? func.apply(this) : this; + } + + /** + * Builds a new Message instance. + * + * @return A new Message instance. + */ + public Message build() { + return new Message(this.value, this.metadata, this.recordId != null ? this.recordId : UUID.randomUUID(), this.dataFormat); + } + } +} \ No newline at end of file diff --git a/src/main/java/io/kurrent/dbclient/v2/Metadata.java b/src/main/java/io/kurrent/dbclient/v2/Metadata.java new file mode 100644 index 00000000..31ff1208 --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/v2/Metadata.java @@ -0,0 +1,221 @@ +package io.kurrent.dbclient.v2; + +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.math.NumberUtils; + +import java.nio.ByteBuffer; +import java.time.DateTimeException; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.format.DateTimeParseException; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +/** + * Represents a collection of metadata as key-value pairs with additional helper methods. + */ +public class Metadata extends HashMap { + /** + * Initializes a new, empty instance of the Metadata class. + */ + public Metadata() { + super(); + } + + /** + * Initializes a new instance of the Metadata class from an existing metadata instance. + * + * @param metadata The metadata to copy from. + */ + public Metadata(Metadata metadata) { + super(metadata); + } + + /** + * Initializes a new instance of the Metadata class from a dictionary. + * + * @param dictionary The dictionary to copy from. + */ + public Metadata(Map dictionary) { + super(dictionary); + } + + /** + * Adds or updates a key-value pair in the metadata and returns the metadata instance. + * + * @param key The key to add or update. + * @param value The value to set. + * @param The type of the value. + * @return The metadata instance (this) to enable method chaining. + */ + public Metadata set(String key, T value) { + this.put(key, value); + return this; + } + + /** + * Gets a typed value from the metadata. + * + * @param key The key to retrieve. + * @param type The class of the type T. + * @param The type to cast the value to. + * @return The value cast to type T, or null if the key is not found or the value can't be cast to type T. + */ + @SuppressWarnings("unchecked") + public T get(String key, Class type) { + Object value = get(key); + if (containsKey(key) && type.isInstance(value)) + return (T) value; + + return null; + } + + /** + * Gets a typed value from the metadata, with automatic type conversion where appropriate. + * + * @param metadata The metadata object. + * @param key The key to retrieve. + * @param defaultValue The default value to return if the key is not found or the value can't be cast to type T. + * @param type The class of the type T. + * @param The type to cast the value to. + * @return The value cast to type T, or the default value if the key is not found or the value can't be cast to type T. + */ + @SuppressWarnings("unchecked") + public T get(String key, T defaultValue, Class type) { + Object value = get(key); + if (containsKey(key) && type.isInstance(value)) { + return (T) value; + } + return defaultValue; + } + + /** + * Tries to get a typed value from the metadata, with automatic type conversion where appropriate. + * + * @param key The key to retrieve. + * @param type The class of the type T. + * @param The type to get or convert to. + */ + @SuppressWarnings("unchecked") + public Optional tryGet(String key, Class type) { + Object obj = get(key); + if (!containsKey(key)) { + return Optional.empty(); + } + + // Direct type match + if (type.isInstance(obj)) { + return Optional.of((T) obj); + } + + // Handle byte array conversions + if (type == byte[].class || type == ByteBuffer.class) { + if (obj instanceof byte[]) { + if (type == byte[].class) + return Optional.of((T) obj); + else + return Optional.of((T) ByteBuffer.wrap((byte[]) obj)); + } + + if (obj instanceof ByteBuffer) { + if (type == byte[].class) + return Optional.of((T) ((ByteBuffer) obj).array()); + else + return Optional.of((T) obj); + } + + return Optional.empty(); + } + + // Convert string representation for various types + String stringValue = obj.toString(); + if (stringValue == null) + return Optional.empty(); + + // Handle common value types with parsing + if (type == Boolean.class || type == boolean.class) { + Boolean value = BooleanUtils.toBooleanObject(stringValue); + + if (value == null) + return Optional.empty(); + + return Optional.of((T) value); + } + + if (type == Byte.class || type == byte.class) { + byte value = NumberUtils.toByte(stringValue); + if (value == 0 && !stringValue.equalsIgnoreCase("0")) + return Optional.empty(); + + return Optional.of((T) Byte.valueOf(value)); + } + + if (type == Short.class || type == short.class) { + short value = NumberUtils.toShort(stringValue); + if (value == 0 && !stringValue.equalsIgnoreCase("0")) + return Optional.empty(); + + return Optional.of((T) Short.valueOf(stringValue)); + } + + if (type == Integer.class || type == int.class) { + int value = NumberUtils.toInt(stringValue); + if (value == 0 && !stringValue.equalsIgnoreCase("0")) + return Optional.empty(); + + return Optional.of((T) Integer.valueOf(stringValue)); + } + + if (type == Long.class || type == long.class) { + long value = NumberUtils.toLong(stringValue); + if (value == 0 && !stringValue.equalsIgnoreCase("0")) + return Optional.empty(); + + return Optional.of((T) Long.valueOf(stringValue)); + } + + if (type == Float.class || type == float.class) { + float value = NumberUtils.toFloat(stringValue, Float.NaN); + if (Float.isNaN(value)) + return Optional.empty(); + + return Optional.of((T) Float.valueOf(value)); + } + + if (type == Double.class || type == double.class) { + double value = NumberUtils.toDouble(stringValue, Double.NaN); + if (Double.isNaN(value)) + return Optional.empty(); + + return Optional.of((T) Double.valueOf(value)); + + } + + if (type == Character.class || type == char.class) { + if (stringValue.length() == 1) + return Optional.of((T) Character.valueOf(stringValue.charAt(0))); + } + + try { + if (type == UUID.class) + return Optional.of((T) UUID.fromString(stringValue)); + + if (type == Instant.class) + return Optional.of((T) Instant.parse(stringValue)); + + if (type == LocalDate.class) + return Optional.of((T) LocalDate.parse(stringValue)); + + if (type == LocalTime.class) + return Optional.of((T) LocalTime.parse(stringValue)); + + } catch (DateTimeParseException | IllegalArgumentException e) { + return Optional.empty(); + } + + return Optional.empty(); + } +} diff --git a/src/main/java/io/kurrent/dbclient/v2/MultiStreamAppendResult.java b/src/main/java/io/kurrent/dbclient/v2/MultiStreamAppendResult.java new file mode 100644 index 00000000..ded75649 --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/v2/MultiStreamAppendResult.java @@ -0,0 +1,126 @@ +package io.kurrent.dbclient.v2; + +import java.util.Objects; +import java.util.function.Function; + +/** + * Represents the result of appending to multiple streams, which can be either a collection of successes or a collection of failures. + */ +public class MultiStreamAppendResult { + private final AppendStreamSuccesses successes; + private final AppendStreamFailures failures; + + private MultiStreamAppendResult(AppendStreamSuccesses successes, AppendStreamFailures failures) { + this.successes = successes; + this.failures = failures; + } + + /** + * Creates a new MultiStreamAppendResult from a collection of successes. + * + * @param successes The collection of success results. + * @return A new MultiStreamAppendResult. + */ + public static MultiStreamAppendResult fromSuccesses(AppendStreamSuccesses successes) { + if (successes == null) + throw new IllegalArgumentException("Successes cannot be null"); + + return new MultiStreamAppendResult(successes, null); + } + + /** + * Creates a new MultiStreamAppendResult from a collection of failures. + * + * @param failures The collection of failure results. + * @return A new MultiStreamAppendResult. + */ + public static MultiStreamAppendResult fromFailures(AppendStreamFailures failures) { + if (failures == null) + throw new IllegalArgumentException("Failures cannot be null"); + + return new MultiStreamAppendResult(null, failures); + } + + /** + * Checks if this result is a collection of successes. + * + * @return true if this result is a collection of successes, false otherwise. + */ + public boolean isSuccesses() { + return successes != null; + } + + /** + * Checks if this result is a collection of failures. + * + * @return true if this result is a collection of failures, false otherwise. + */ + public boolean isFailures() { + return failures != null; + } + + /** + * Gets the collection of success results. + * + * @return The collection of success results. + * @throws IllegalStateException if this result is not a collection of successes. + */ + public AppendStreamSuccesses getSuccesses() { + if (!isSuccesses()) + throw new IllegalStateException("Result is not a collection of successes"); + + return successes; + } + + /** + * Gets the collection of failure results. + * + * @return The collection of failure results. + * @throws IllegalStateException if this result is not a collection of failures. + */ + public AppendStreamFailures getFailures() { + if (!isFailures()) + throw new IllegalStateException("Result is not a collection of failures"); + + return failures; + } + + /** + * Folds the result to the appropriate action. + * + * @param successesAction The action to perform if this result is a collection of successes. + * @param failuresAction The action to perform if this result is a collection of failures. + * @param The type of the result. + * @return The result of the action. + */ + public T fold(Function successesAction, + Function failuresAction) { + if (isSuccesses()) + return successesAction.apply(successes); + else + return failuresAction.apply(failures); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + MultiStreamAppendResult that = (MultiStreamAppendResult) o; + return Objects.equals(successes, that.successes) && + Objects.equals(failures, that.failures); + } + + @Override + public int hashCode() { + return Objects.hash(successes, failures); + } + + @Override + public String toString() { + if (isSuccesses()) { + return "MultiStreamAppendResult{successes=" + successes + '}'; + } else { + return "MultiStreamAppendResult{failures=" + failures + '}'; + } + } +} \ No newline at end of file diff --git a/src/main/java/io/kurrent/dbclient/v2/RegisteredSchema.java b/src/main/java/io/kurrent/dbclient/v2/RegisteredSchema.java new file mode 100644 index 00000000..99b76550 --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/v2/RegisteredSchema.java @@ -0,0 +1,151 @@ +package io.kurrent.dbclient.v2; + +import java.time.OffsetDateTime; +import java.util.Objects; + +/** + * Represents a registered schema with its metadata. + */ +public class RegisteredSchema { + /** + * Represents an empty or unspecified registered schema. + */ + public static final RegisteredSchema NONE = new RegisteredSchema(); + + private final String schemaName; + private final SchemaDataFormat dataFormat; + private final String schemaVersionId; + private final String definition; + private final int versionNumber; + private final OffsetDateTime createdAt; + + /** + * Creates a default instance with null or default values. + */ + private RegisteredSchema() { + this.schemaName = null; + this.dataFormat = SchemaDataFormat.UNSPECIFIED; + this.schemaVersionId = null; + this.definition = null; + this.versionNumber = 0; + this.createdAt = OffsetDateTime.now(); + } + + /** + * Creates a new instance of RegisteredSchema with the specified values. + * + * @param schemaName The name of the schema. + * @param dataFormat The data format of the schema. + * @param schemaVersionId The version ID of the schema. + * @param definition The definition of the schema. + * @param versionNumber The version number of the schema. + * @param createdAt The creation timestamp of the schema. + */ + public RegisteredSchema( + String schemaName, + SchemaDataFormat dataFormat, + String schemaVersionId, + String definition, + int versionNumber, + OffsetDateTime createdAt) { + this.schemaName = schemaName; + this.dataFormat = dataFormat; + this.schemaVersionId = schemaVersionId; + this.definition = definition; + this.versionNumber = versionNumber; + this.createdAt = createdAt; + } + + /** + * Gets the schema name. + * + * @return The schema name. + */ + public String getSchemaName() { + return schemaName; + } + + /** + * Gets the data format. + * + * @return The data format. + */ + public SchemaDataFormat getDataFormat() { + return dataFormat; + } + + /** + * Gets the schema version ID. + * + * @return The schema version ID. + */ + public String getSchemaVersionId() { + return schemaVersionId; + } + + /** + * Gets the schema definition. + * + * @return The schema definition. + */ + public String getDefinition() { + return definition; + } + + /** + * Gets the version number. + * + * @return The version number. + */ + public int getVersionNumber() { + return versionNumber; + } + + /** + * Gets the creation timestamp. + * + * @return The creation timestamp. + */ + public OffsetDateTime getCreatedAt() { + return createdAt; + } + + /** + * Converts this registered schema to a SchemaInfo object. + * + * @return A new SchemaInfo instance. + */ + public SchemaInfo toSchemaInfo() { + return new SchemaInfo(schemaName, dataFormat); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + RegisteredSchema that = (RegisteredSchema) o; + return versionNumber == that.versionNumber && + Objects.equals(schemaName, that.schemaName) && + dataFormat == that.dataFormat && + Objects.equals(schemaVersionId, that.schemaVersionId) && + Objects.equals(definition, that.definition) && + Objects.equals(createdAt, that.createdAt); + } + + @Override + public int hashCode() { + return Objects.hash(schemaName, dataFormat, schemaVersionId, definition, versionNumber, createdAt); + } + + @Override + public String toString() { + return "RegisteredSchema{" + + "schemaName='" + schemaName + '\'' + + ", dataFormat=" + dataFormat + + ", schemaVersionId='" + schemaVersionId + '\'' + + ", definition='" + definition + '\'' + + ", versionNumber=" + versionNumber + + ", createdAt=" + createdAt + + '}'; + } +} \ No newline at end of file diff --git a/src/main/java/io/kurrent/dbclient/v2/SchemaDataFormat.java b/src/main/java/io/kurrent/dbclient/v2/SchemaDataFormat.java new file mode 100644 index 00000000..cb1b8dc6 --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/v2/SchemaDataFormat.java @@ -0,0 +1,61 @@ +package io.kurrent.dbclient.v2; + +/** + * Specifies the format of schema data. + */ +public enum SchemaDataFormat { + /** + * Unspecified format. + */ + UNSPECIFIED(0), + + /** + * JSON format. + */ + JSON(1), + + /** + * Protocol Buffers format. + */ + PROTOBUF(2), + + /** + * Apache Avro format. + */ + AVRO(3), + + /** + * Raw bytes format. + */ + BYTES(4); + + private final int value; + + SchemaDataFormat(int value) { + this.value = value; + } + + /** + * Gets the integer value of the enum. + * + * @return The integer value. + */ + public int getValue() { + return value; + } + + /** + * Gets a SchemaDataFormat from its integer value. + * + * @param value The integer value. + * @return The corresponding SchemaDataFormat, or UNSPECIFIED if not found. + */ + public static SchemaDataFormat fromValue(int value) { + for (SchemaDataFormat format : SchemaDataFormat.values()) { + if (format.value == value) { + return format; + } + } + return UNSPECIFIED; + } +} \ No newline at end of file diff --git a/src/main/java/io/kurrent/dbclient/v2/SchemaInfo.java b/src/main/java/io/kurrent/dbclient/v2/SchemaInfo.java new file mode 100644 index 00000000..e9300c15 --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/v2/SchemaInfo.java @@ -0,0 +1,178 @@ +package io.kurrent.dbclient.v2; + +/** + * Represents schema information including schema name and data format. + */ +public class SchemaInfo { + // Content type headers + private static final String JSON_CONTENT_TYPE = "application/json"; + private static final String PROTOBUF_CONTENT_TYPE = "application/vnd.google.protobuf"; + private static final String AVRO_CONTENT_TYPE = "application/vnd.apache.avro+json"; + private static final String BYTES_CONTENT_TYPE = "application/octet-stream"; + + // Metadata keys + private static final String SCHEMA_NAME_KEY = "schema-name"; + private static final String SCHEMA_DATA_FORMAT_KEY = "schema-data-format"; + + // Static instance for no schema + public static final SchemaInfo NONE = new SchemaInfo("", SchemaDataFormat.UNSPECIFIED); + + private final String schemaName; + private final SchemaDataFormat dataFormat; + private final String contentTypeHeader; + + /** + * Creates a new instance of SchemaInfo. + * + * @param schemaName The name of the schema. + * @param dataFormat The data format of the schema. + */ + public SchemaInfo(String schemaName, SchemaDataFormat dataFormat) { + this.schemaName = schemaName; + this.dataFormat = dataFormat; + this.contentTypeHeader = determineContentTypeHeader(dataFormat); + } + + /** + * Gets the schema name. + * + * @return The schema name. + */ + public String getSchemaName() { + return schemaName; + } + + /** + * Gets the data format. + * + * @return The data format. + */ + public SchemaDataFormat getDataFormat() { + return dataFormat; + } + + /** + * Gets the content type header. + * + * @return The content type header. + */ + public String getContentTypeHeader() { + return contentTypeHeader; + } + + /** + * Gets the content type. + * + * @return The content type. + */ + public String getContentType() { + return contentTypeHeader; + } + + /** + * Checks if the schema name is missing. + * + * @return True if the schema name is null, empty, or whitespace; otherwise, false. + */ + public boolean isSchemaNameMissing() { + return schemaName == null || schemaName.trim().isEmpty(); + } + + /** + * Injects schema information into metadata. + * + * @param metadata The metadata to inject into. + */ + public void injectIntoMetadata(Metadata metadata) { + metadata.set(SCHEMA_NAME_KEY, schemaName); + metadata.set(SCHEMA_DATA_FORMAT_KEY, dataFormat.toString().toLowerCase()); + } + + /** + * Injects schema name into metadata. + * + * @param metadata The metadata to inject into. + */ + public void injectSchemaNameIntoMetadata(Metadata metadata) { + metadata.set(SCHEMA_NAME_KEY, schemaName); + } + + /** + * Creates a SchemaInfo instance from metadata. + * + * @param metadata The metadata to extract from. + * @return A new SchemaInfo instance. + */ + public static SchemaInfo fromMetadata(Metadata metadata) { + String schemaName = extractSchemaName(metadata); + SchemaDataFormat dataFormat = extractSchemaDataFormat(metadata); + return new SchemaInfo(schemaName, dataFormat); + } + + /** + * Creates a SchemaInfo instance from content type. + * + * @param schemaName The schema name. + * @param contentType The content type. + * @return A new SchemaInfo instance. + * @throws IllegalArgumentException If schemaName or contentType is null or empty. + */ + public static SchemaInfo fromContentType(String schemaName, String contentType) { + if (schemaName == null || schemaName.isEmpty()) + throw new IllegalArgumentException("schemaName cannot be null or empty"); + + if (contentType == null || contentType.isEmpty()) + throw new IllegalArgumentException("contentType cannot be null or empty"); + + SchemaDataFormat schemaDataFormat; + switch (contentType) { + case JSON_CONTENT_TYPE: + schemaDataFormat = SchemaDataFormat.JSON; + break; + case PROTOBUF_CONTENT_TYPE: + schemaDataFormat = SchemaDataFormat.PROTOBUF; + break; + case BYTES_CONTENT_TYPE: + schemaDataFormat = SchemaDataFormat.BYTES; + break; + default: + schemaDataFormat = SchemaDataFormat.UNSPECIFIED; + break; + } + + return new SchemaInfo(schemaName, schemaDataFormat); + } + + private static String extractSchemaName(Metadata metadata) { + String schemaName = metadata.get(SCHEMA_NAME_KEY, String.class); + return schemaName != null ? schemaName : ""; + } + + private static SchemaDataFormat extractSchemaDataFormat(Metadata metadata) { + String formatStr = metadata.get(SCHEMA_DATA_FORMAT_KEY, String.class); + if (formatStr == null) { + return SchemaDataFormat.UNSPECIFIED; + } + + try { + return SchemaDataFormat.valueOf(formatStr.toUpperCase()); + } catch (IllegalArgumentException e) { + return SchemaDataFormat.UNSPECIFIED; + } + } + + private String determineContentTypeHeader(SchemaDataFormat dataFormat) { + switch (dataFormat) { + case JSON: + return JSON_CONTENT_TYPE; + case PROTOBUF: + return PROTOBUF_CONTENT_TYPE; + case AVRO: + return AVRO_CONTENT_TYPE; + case BYTES: + case UNSPECIFIED: + default: + return BYTES_CONTENT_TYPE; + } + } +} diff --git a/src/main/java/io/kurrent/dbclient/v2/SchemaSerializer.java b/src/main/java/io/kurrent/dbclient/v2/SchemaSerializer.java new file mode 100644 index 00000000..aa02c129 --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/v2/SchemaSerializer.java @@ -0,0 +1,43 @@ +package io.kurrent.dbclient.v2; + +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CancellationException; + +/** + * Defines the interface for a schema serializer. + */ +public interface SchemaSerializer { + /** + * The type of the schema. + * + * @return The data format of the schema. + */ + SchemaDataFormat getDataFormat(); + + /** + * Serializes the given value into bytes according to the provided context. + * + * @param value The object to be serialized. + * @param context The context providing additional information for the serialization process. + * @return A CompletableFuture that resolves to a ByteBuffer containing the serialized data. + */ + CompletableFuture serialize(Object value, SerializationContext context); + + /** + * Deserializes the given bytes into an object according to the provided context. + * + * @param data The bytes to be deserialized. + * @param context The context providing additional information for the deserialization process. + * @return A CompletableFuture that resolves to the deserialized object. + */ + CompletableFuture deserialize(ByteBuffer data, SerializationContext context); + + /** + * Serializes the given message. + * + * @param message The message to be serialized. + * @return A CompletableFuture that resolves to a ByteBuffer containing the serialized data. + */ + CompletableFuture serialize(Message message); +} \ No newline at end of file diff --git a/src/main/java/io/kurrent/dbclient/v2/SerializationContext.java b/src/main/java/io/kurrent/dbclient/v2/SerializationContext.java new file mode 100644 index 00000000..b0c5789a --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/v2/SerializationContext.java @@ -0,0 +1,50 @@ +package io.kurrent.dbclient.v2; + +/** + * Allows the serialization operations to be autonomous. + */ +public final class SerializationContext { + private final Metadata metadata; + private final String stream; + private final SchemaInfo schemaInfo; + + /** + * Initializes a new instance of the SerializationContext class with the specified metadata, stream, and cancellation flag. + * + * @param metadata The metadata providing additional information for the serialization process. + * @param stream The stream that the record belongs to. + */ + public SerializationContext(Metadata metadata, String stream) { + this.metadata = metadata; + this.stream = stream; + this.schemaInfo = SchemaInfo.fromMetadata(this.metadata); + } + + /** + * Gets the metadata providing additional information for the serialization process. + * + * @return The metadata. + */ + public Metadata getMetadata() { + return metadata; + } + + /** + * Gets the stream that the record belongs to. + * + * @return The stream. + */ + public String getStream() { + return stream; + } + + /** + * Gets the schema information extracted from the headers. + * If the headers do not contain schema information, it will return an undefined schema information. + * + * @return The schema information. + */ + public SchemaInfo getSchemaInfo() { + return schemaInfo; + } +} diff --git a/src/main/java/io/kurrent/dbclient/v2/SerializeFunction.java b/src/main/java/io/kurrent/dbclient/v2/SerializeFunction.java new file mode 100644 index 00000000..57de2a81 --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/v2/SerializeFunction.java @@ -0,0 +1,19 @@ +package io.kurrent.dbclient.v2; + +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; + +/** + * Functional interface for serializing an object into bytes. + */ +@FunctionalInterface +public interface SerializeFunction { + /** + * Serializes the given value into bytes according to the provided metadata. + * + * @param value The object to be serialized. + * @param metadata The metadata providing additional information for the serialization process. + * @return A CompletableFuture that resolves to a ByteBuffer containing the serialized data. + */ + CompletableFuture serialize(Object value, Metadata metadata); +} \ No newline at end of file diff --git a/src/main/proto/multi-append.proto b/src/main/proto/multi-append.proto new file mode 100644 index 00000000..0f1124fb --- /dev/null +++ b/src/main/proto/multi-append.proto @@ -0,0 +1,210 @@ +syntax = "proto3"; + +package kurrentdb.protocol.v2; + +option csharp_namespace = "KurrentDB.Protocol.Streams.V2"; +option java_package = "io.kurrentdb.streams.v2"; +option java_multiple_files = true; + +import "google/protobuf/timestamp.proto"; +import "google/protobuf/duration.proto"; +import "google/protobuf/struct.proto"; + +service StreamsService { + // Executes an atomic operation to append records to multiple streams. + // This transactional method ensures that all appends either succeed + // completely, or are entirely rolled back, thereby maintaining strict data + // consistency across all involved streams. + rpc MultiStreamAppend(MultiStreamAppendRequest) returns (MultiStreamAppendResponse); + + // Streaming version of MultiStreamAppend that allows clients to send multiple + // append requests over a single connection. When the stream completes, all + // records are appended transactionally (all succeed or fail together). + // Provides improved efficiency for high-throughput scenarios while + // maintaining the same transactional guarantees. + rpc MultiStreamAppendSession(stream AppendStreamRequest) returns (MultiStreamAppendResponse); +} + +message ProtocolDataUnit { + string id = 1; + map properties = 2; + bytes data = 3; + google.protobuf.Timestamp timestamp = 4; +} + +// Record to be appended to a stream. +message AppendRecord { + // Universally Unique identifier for the record. + // If not provided, the server will generate a new one. + optional string record_id = 1; + // A collection of properties providing additional system information about the + // record. + map properties = 2; + // The actual data payload of the record, stored as bytes. + bytes data = 3; + // // Optional timestamp indicating when the record was created. + // // If not provided, the server will use the current time. + // optional google.protobuf.Timestamp timestamp = 4; +} + +// Constants that match the expected state of a stream during an +// append operation. It can be used to specify whether the stream should exist, +// not exist, or can be in any state. +enum ExpectedRevisionConstants { + // The stream should exist and the expected revision should match the current + EXPECTED_REVISION_CONSTANTS_SINGLE_EVENT = 0; + // It is not important whether the stream exists or not. + EXPECTED_REVISION_CONSTANTS_ANY = -2; + // The stream should not exist. If it does, the append will fail. + EXPECTED_REVISION_CONSTANTS_NO_STREAM = -1; + // The stream should exist + EXPECTED_REVISION_CONSTANTS_EXISTS = -4; +} + +// Represents the input for appending records to a specific stream. +message AppendStreamRequest { + // The name of the stream to append records to. + string stream = 1; + // The records to append to the stream. + repeated AppendRecord records = 2; + // The expected revision of the stream. If the stream's current revision does + // not match, the append will fail. + // The expected revision can also be one of the special values + // from ExpectedRevisionConstants. + // Missing value means no expectation, the same as EXPECTED_REVISION_CONSTANTS_ANY + optional sint64 expected_revision = 3; +} + +// Success represents the successful outcome of an append operation. +message AppendStreamSuccess { + // The name of the stream to which records were appended. + string stream = 1; + // The position of the last appended record in the stream. + int64 position = 2; + // The expected revision of the stream after the append operation. + int64 stream_revision = 3; +} + +// Failure represents the detailed error information when an append operation fails. +message AppendStreamFailure { + // The name of the stream to which records were appended. + string stream = 1; + + // The error details + oneof error { + // Failed because the actual stream revision didn't match the expected revision. + ErrorDetails.WrongExpectedRevision wrong_expected_revision = 2; + // Failed because the client lacks sufficient permissions. + ErrorDetails.AccessDenied access_denied = 3; + // Failed because the target stream has been deleted. + ErrorDetails.StreamDeleted stream_deleted = 4; + } +} + +// AppendStreamOutput represents the output of appending records to a specific +// stream. +message AppendStreamResponse { + // The result of the append operation. + oneof result { + // Success represents the successful outcome of an append operation. + AppendStreamSuccess success = 1; + // Failure represents the details of a failed append operation. + AppendStreamFailure failure = 2; + } +} + +// MultiStreamAppendRequest represents a request to append records to multiple streams. +message MultiStreamAppendRequest { + // A list of AppendStreamInput messages, each representing a stream to which records should be appended. + repeated AppendStreamRequest input = 1; +} + +// Response from the MultiStreamAppend operation. +message MultiStreamAppendResponse { + oneof result { + // Success represents the successful outcome of a multi-stream append operation. + Success success = 1; + // Failure represents the details of a failed multi-stream append operation. + Failure failure = 2; + } + + message Success { + repeated AppendStreamSuccess output = 1; + } + + message Failure { + repeated AppendStreamFailure output = 1; + } +} + +// ErrorDetails provides detailed information about specific error conditions. +message ErrorDetails { + // When the user does not have sufficient permissions to perform the operation. + message AccessDenied { + // The simplified reason for access denial. + string reason = 1; + } + + // When the stream has been deleted. + message StreamDeleted { + // The time when the stream was deleted. + google.protobuf.Timestamp deleted_at = 1; + + // If the stream was hard deleted, you cannot reuse the stream name, + // it will raise an exception if you try to append to it again. + bool tombstoned = 2; + } + + // When the expected revision of the stream does not match the actual revision. + message WrongExpectedRevision { + // The actual revision of the stream. + int64 stream_revision = 1; + } + + // When the transaction exceeds the maximum size allowed + // (its bigger than the configured chunk size). + message TransactionMaxSizeExceeded { + // The maximum allowed size of the transaction. + uint32 max_size = 1; + } +} + +//=================================================================== +// Shared +//=================================================================== + +// Represents a list of dynamically typed values. +message ListDynamicValue { + // Repeated property of dynamically typed values. + repeated DynamicValue values = 1; +} + +// Represents a dynamic value +message DynamicValue { + oneof kind { + // Represents a null value. + google.protobuf.NullValue null_value = 1; + // Represents a 32-bit signed integer value. + sint32 int32_value = 2; + // Represents a 64-bit signed integer value. + sint64 int64_value = 3; + // Represents a byte array value. + bytes bytes_value = 4; + // Represents a 64-bit double-precision floating-point value. + double double_value = 5; + // Represents a 32-bit single-precision floating-point value + float float_value = 6; + // Represents a string value. + string string_value = 7; + // Represents a boolean value. + bool boolean_value = 8; + // Represents a timestamp value. + google.protobuf.Timestamp timestamp_value = 9; + // Represents a duration value. + google.protobuf.Duration duration_value = 10; + // // Represents a list of dynamic values. + // ListDynamicValue list_value = 11; + // // Represents a json struct + // google.protobuf.Struct struct_value = 12; + } +} \ No newline at end of file diff --git a/src/test/java/io/kurrent/dbclient/v2/ConsumeFilterTests.java b/src/test/java/io/kurrent/dbclient/v2/ConsumeFilterTests.java new file mode 100644 index 00000000..724e2371 --- /dev/null +++ b/src/test/java/io/kurrent/dbclient/v2/ConsumeFilterTests.java @@ -0,0 +1,223 @@ +package io.kurrent.dbclient.v2; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import io.kurrent.dbclient.SubscriptionFilter; + +public class ConsumeFilterTests { + + @Test + public void testNoneConsumeFilter() { + ConsumeFilter none = ConsumeFilter.NONE; + + Assertions.assertEquals(ConsumeFilterScope.UNSPECIFIED, none.getScope()); + Assertions.assertEquals(ConsumeFilterType.UNSPECIFIED, none.getType()); + Assertions.assertEquals("", none.getExpression()); + Assertions.assertTrue(none.isEmptyFilter()); + Assertions.assertFalse(none.isLiteralFilter()); + Assertions.assertFalse(none.isRegexFilter()); + Assertions.assertFalse(none.isStreamFilter()); + Assertions.assertFalse(none.isRecordFilter()); + Assertions.assertFalse(none.isStreamNameFilter()); + } + + @Test + public void testFromStream() { + String streamName = "test-stream"; + ConsumeFilter filter = ConsumeFilter.fromStream(streamName); + + Assertions.assertEquals(ConsumeFilterScope.STREAM, filter.getScope()); + Assertions.assertEquals(ConsumeFilterType.LITERAL, filter.getType()); + Assertions.assertEquals(streamName, filter.getExpression()); + Assertions.assertFalse(filter.isEmptyFilter()); + Assertions.assertTrue(filter.isLiteralFilter()); + Assertions.assertFalse(filter.isRegexFilter()); + Assertions.assertTrue(filter.isStreamFilter()); + Assertions.assertFalse(filter.isRecordFilter()); + Assertions.assertTrue(filter.isStreamNameFilter()); + Assertions.assertTrue(filter.isMatch(streamName)); + Assertions.assertFalse(filter.isMatch("other-stream")); + } + + @Test + public void testFromStreamValidation() { + Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumeFilter.fromStream(null)); + Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumeFilter.fromStream("")); + Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumeFilter.fromStream(" ")); + Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumeFilter.fromStream("~stream")); + Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumeFilter.fromStream("a")); + } + + @Test + public void testFromPrefixes() { + ConsumeFilter filter = ConsumeFilter.fromPrefixes(ConsumeFilterScope.RECORD, "test-", "other-"); + + Assertions.assertEquals(ConsumeFilterScope.RECORD, filter.getScope()); + Assertions.assertEquals(ConsumeFilterType.REGEX, filter.getType()); + Assertions.assertTrue(filter.getExpression().contains("test-")); + Assertions.assertTrue(filter.getExpression().contains("other-")); + Assertions.assertFalse(filter.isEmptyFilter()); + Assertions.assertFalse(filter.isLiteralFilter()); + Assertions.assertTrue(filter.isRegexFilter()); + Assertions.assertFalse(filter.isStreamFilter()); + Assertions.assertTrue(filter.isRecordFilter()); + Assertions.assertFalse(filter.isStreamNameFilter()); + Assertions.assertTrue(filter.isMatch("test-event")); + Assertions.assertTrue(filter.isMatch("other-event")); + Assertions.assertFalse(filter.isMatch("some-event")); + } + + @Test + public void testFromPrefixesValidation() { + Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumeFilter.fromPrefixes(ConsumeFilterScope.RECORD)); + Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumeFilter.fromPrefixes(ConsumeFilterScope.RECORD, (String[])null)); + Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumeFilter.fromPrefixes(ConsumeFilterScope.RECORD, "")); + Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumeFilter.fromPrefixes(ConsumeFilterScope.RECORD, " ")); + Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumeFilter.fromPrefixes(ConsumeFilterScope.RECORD, "test-", null)); + } + + @Test + public void testFromPrefixesString() { + ConsumeFilter filter = ConsumeFilter.fromPrefixes(ConsumeFilterScope.STREAM, "test-,other-"); + + Assertions.assertEquals(ConsumeFilterScope.STREAM, filter.getScope()); + Assertions.assertEquals(ConsumeFilterType.REGEX, filter.getType()); + Assertions.assertTrue(filter.getExpression().contains("test-")); + Assertions.assertTrue(filter.getExpression().contains("other-")); + Assertions.assertFalse(filter.isEmptyFilter()); + Assertions.assertFalse(filter.isLiteralFilter()); + Assertions.assertTrue(filter.isRegexFilter()); + Assertions.assertTrue(filter.isStreamFilter()); + Assertions.assertFalse(filter.isRecordFilter()); + Assertions.assertFalse(filter.isStreamNameFilter()); + Assertions.assertTrue(filter.isMatch("test-stream")); + Assertions.assertTrue(filter.isMatch("other-stream")); + Assertions.assertFalse(filter.isMatch("some-stream")); + } + + @Test + public void testFromPrefixesStringValidation() { + Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumeFilter.fromPrefixes(ConsumeFilterScope.RECORD, (String)null)); + Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumeFilter.fromPrefixes(ConsumeFilterScope.RECORD, "")); + Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumeFilter.fromPrefixes(ConsumeFilterScope.RECORD, " ")); + } + + @Test + public void testFromRegex() { + ConsumeFilter filter = ConsumeFilter.fromRegex(ConsumeFilterScope.RECORD, "test-.*"); + + Assertions.assertEquals(ConsumeFilterScope.RECORD, filter.getScope()); + Assertions.assertEquals(ConsumeFilterType.REGEX, filter.getType()); + Assertions.assertEquals("test-.*", filter.getExpression()); + Assertions.assertFalse(filter.isEmptyFilter()); + Assertions.assertFalse(filter.isLiteralFilter()); + Assertions.assertTrue(filter.isRegexFilter()); + Assertions.assertFalse(filter.isStreamFilter()); + Assertions.assertTrue(filter.isRecordFilter()); + Assertions.assertFalse(filter.isStreamNameFilter()); + Assertions.assertTrue(filter.isMatch("test-event")); + Assertions.assertFalse(filter.isMatch("other-event")); + } + + @Test + public void testFromRegexWithTilde() { + ConsumeFilter filter = ConsumeFilter.fromRegex(ConsumeFilterScope.STREAM, "~test-.*"); + + Assertions.assertEquals(ConsumeFilterScope.STREAM, filter.getScope()); + Assertions.assertEquals(ConsumeFilterType.REGEX, filter.getType()); + Assertions.assertEquals("test-.*", filter.getExpression()); + Assertions.assertFalse(filter.isEmptyFilter()); + Assertions.assertFalse(filter.isLiteralFilter()); + Assertions.assertTrue(filter.isRegexFilter()); + Assertions.assertTrue(filter.isStreamFilter()); + Assertions.assertFalse(filter.isRecordFilter()); + Assertions.assertFalse(filter.isStreamNameFilter()); + Assertions.assertTrue(filter.isMatch("test-stream")); + Assertions.assertFalse(filter.isMatch("other-stream")); + } + + @Test + public void testFromRegexValidation() { + Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumeFilter.fromRegex(ConsumeFilterScope.RECORD, "[")); + } + + @Test + public void testCreate() { + ConsumeFilter filter = ConsumeFilter.create(ConsumeFilterScope.RECORD, "test-event"); + + Assertions.assertEquals(ConsumeFilterScope.RECORD, filter.getScope()); + Assertions.assertEquals(ConsumeFilterType.LITERAL, filter.getType()); + Assertions.assertEquals("test-event", filter.getExpression()); + Assertions.assertFalse(filter.isEmptyFilter()); + Assertions.assertTrue(filter.isLiteralFilter()); + Assertions.assertFalse(filter.isRegexFilter()); + Assertions.assertFalse(filter.isStreamFilter()); + Assertions.assertTrue(filter.isRecordFilter()); + Assertions.assertFalse(filter.isStreamNameFilter()); + Assertions.assertTrue(filter.isMatch("test-event")); + Assertions.assertFalse(filter.isMatch("other-event")); + } + + @Test + public void testCreateWithRegex() { + ConsumeFilter filter = ConsumeFilter.create(ConsumeFilterScope.STREAM, "~test-.*"); + + Assertions.assertEquals(ConsumeFilterScope.STREAM, filter.getScope()); + Assertions.assertEquals(ConsumeFilterType.REGEX, filter.getType()); + Assertions.assertEquals("test-.*", filter.getExpression()); + Assertions.assertFalse(filter.isEmptyFilter()); + Assertions.assertFalse(filter.isLiteralFilter()); + Assertions.assertTrue(filter.isRegexFilter()); + Assertions.assertTrue(filter.isStreamFilter()); + Assertions.assertFalse(filter.isRecordFilter()); + Assertions.assertFalse(filter.isStreamNameFilter()); + Assertions.assertTrue(filter.isMatch("test-stream")); + Assertions.assertFalse(filter.isMatch("other-stream")); + } + + @Test + public void testCreateValidation() { + Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumeFilter.create(ConsumeFilterScope.RECORD, null)); + Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumeFilter.create(ConsumeFilterScope.RECORD, "")); + Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumeFilter.create(ConsumeFilterScope.RECORD, " ")); + } + + @Test + public void testToFilterOptions() { + ConsumeFilter streamFilter = ConsumeFilter.fromStream("test-stream"); + SubscriptionFilter subscriptionFilter = streamFilter.toSubscriptionFilter(1000); + + Assertions.assertNotNull(subscriptionFilter); + + ConsumeFilter recordFilter = ConsumeFilter.create(ConsumeFilterScope.RECORD, "test-event"); + subscriptionFilter = recordFilter.toSubscriptionFilter(1000); + + Assertions.assertNotNull(subscriptionFilter); + + ConsumeFilter emptyFilter = ConsumeFilter.NONE; + subscriptionFilter = emptyFilter.toSubscriptionFilter(1000); + + Assertions.assertNull(subscriptionFilter); + } + + @Test + public void testEqualsAndHashCode() { + ConsumeFilter filter1 = ConsumeFilter.fromStream("test-stream"); + ConsumeFilter filter2 = ConsumeFilter.fromStream("test-stream"); + ConsumeFilter filter3 = ConsumeFilter.fromStream("other-stream"); + + Assertions.assertEquals(filter1, filter2); + Assertions.assertEquals(filter1.hashCode(), filter2.hashCode()); + + Assertions.assertNotEquals(filter1, filter3); + Assertions.assertNotEquals(filter1.hashCode(), filter3.hashCode()); + } + + @Test + public void testToString() { + ConsumeFilter filter = ConsumeFilter.fromStream("test-stream"); + + Assertions.assertEquals("[STREAM|LITERAL] test-stream", filter.toString()); + } +} \ No newline at end of file diff --git a/src/test/java/io/kurrent/dbclient/v2/MessageTests.java b/src/test/java/io/kurrent/dbclient/v2/MessageTests.java new file mode 100644 index 00000000..83894fe6 --- /dev/null +++ b/src/test/java/io/kurrent/dbclient/v2/MessageTests.java @@ -0,0 +1,85 @@ +package io.kurrent.dbclient.v2; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.UUID; + +public class MessageTests { + + @Test + public void testEmptyMessage() { + Message empty = Message.EMPTY; + + Assertions.assertNull(empty.getValue()); + Assertions.assertNotNull(empty.getMetadata()); + Assertions.assertEquals(0, empty.getMetadata().size()); + Assertions.assertNotNull(empty.getRecordId()); + Assertions.assertEquals(SchemaDataFormat.JSON, empty.getDataFormat()); + } + + @Test + public void testMessageConstructor() { + Metadata metadata = new Metadata(); + metadata.put("key1", "value1"); + + UUID recordId = UUID.randomUUID(); + + Message message = new Message("test payload", metadata, recordId, SchemaDataFormat.PROTOBUF); + + Assertions.assertEquals("test payload", message.getValue()); + Assertions.assertSame(metadata, message.getMetadata()); + Assertions.assertEquals(1, message.getMetadata().size()); + Assertions.assertEquals("value1", message.getMetadata().get("key1")); + Assertions.assertEquals(recordId, message.getRecordId()); + Assertions.assertEquals(SchemaDataFormat.PROTOBUF, message.getDataFormat()); + } + + @Test + public void testMessageBuilder() { + Metadata metadata = new Metadata(); + metadata.put("key1", "value1"); + + UUID recordId = UUID.randomUUID(); + + Message message = Message.builder() + .value("test payload") + .metadata(metadata) + .recordId(recordId) + .dataFormat(SchemaDataFormat.AVRO) + .build(); + + Assertions.assertEquals("test payload", message.getValue()); + Assertions.assertSame(metadata, message.getMetadata()); + Assertions.assertEquals(1, message.getMetadata().size()); + Assertions.assertEquals("value1", message.getMetadata().get("key1")); + Assertions.assertEquals(recordId, message.getRecordId()); + Assertions.assertEquals(SchemaDataFormat.AVRO, message.getDataFormat()); + } + + @Test + public void testMessageBuilderWithDefaults() { + Message message = Message.builder() + .value("test payload") + .build(); + + Assertions.assertEquals("test payload", message.getValue()); + Assertions.assertNotNull(message.getMetadata()); + Assertions.assertEquals(0, message.getMetadata().size()); + Assertions.assertNotNull(message.getRecordId()); + Assertions.assertEquals(SchemaDataFormat.JSON, message.getDataFormat()); + } + + @Test + public void testMessageBuilderWithNullValues() { + Message message = Message.builder() + .value(null) + .build(); + + Assertions.assertNull(message.getValue()); + Assertions.assertNotNull(message.getMetadata()); + Assertions.assertEquals(0, message.getMetadata().size()); + Assertions.assertNotNull(message.getRecordId()); + Assertions.assertEquals(SchemaDataFormat.JSON, message.getDataFormat()); + } +} \ No newline at end of file diff --git a/src/test/java/io/kurrent/dbclient/v2/MetadataTests.java b/src/test/java/io/kurrent/dbclient/v2/MetadataTests.java new file mode 100644 index 00000000..8c6e588a --- /dev/null +++ b/src/test/java/io/kurrent/dbclient/v2/MetadataTests.java @@ -0,0 +1,105 @@ +package io.kurrent.dbclient.v2; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +public class MetadataTests { + @Test + public void testMetadataConstructors() { + // Test default constructor + Metadata metadata1 = new Metadata(); + Assertions.assertEquals(0, metadata1.size()); + + // Test constructor with existing metadata + metadata1.put("key1", "value1"); + Metadata metadata2 = new Metadata(metadata1); + Assertions.assertEquals(1, metadata2.size()); + Assertions.assertEquals("value1", metadata2.get("key1")); + + // Test constructor with dictionary + Map dictionary = new HashMap<>(); + dictionary.put("key2", "value2"); + Metadata metadata3 = new Metadata(dictionary); + Assertions.assertEquals(1, metadata3.size()); + Assertions.assertEquals("value2", metadata3.get("key2")); + } + + @Test + public void testMetadataSetMethod() { + Metadata metadata = new Metadata(); + + // Test set method with chaining + metadata.set("key1", "value1").set("key2", 123); + + Assertions.assertEquals(2, metadata.size()); + Assertions.assertEquals("value1", metadata.get("key1")); + Assertions.assertEquals(123, metadata.get("key2")); + } + + @Test + public void testMetadataGetMethod() { + Metadata metadata = new Metadata(); + metadata.put("stringKey", "stringValue"); + metadata.put("intKey", 123); + metadata.put("boolKey", true); + + // Test get method with type parameter + String stringValue = metadata.get("stringKey", String.class); + Integer intValue = metadata.get("intKey", Integer.class); + Boolean boolValue = metadata.get("boolKey", Boolean.class); + + Assertions.assertEquals("stringValue", stringValue); + Assertions.assertEquals(123, intValue); + Assertions.assertEquals(true, boolValue); + + // Test get method with a default value + String nonExistentValue = metadata.get("nonExistentKey", "defaultValue", String.class); + Assertions.assertEquals("defaultValue", nonExistentValue); + } + + @Test + public void testMetadataTryGetMethod() { + Metadata metadata = new Metadata(); + metadata.put("stringKey", "stringValue"); + metadata.put("intKey", 123); + metadata.put("boolKey", true); + metadata.put("uuidKey", UUID.fromString("00000000-0000-0000-0000-000000000001")); + + Instant instant = Instant.parse("2023-01-01T00:00:00Z"); + metadata.put("instantKey", instant); + + // Test tryGet method with direct type match + Optional stringResult = metadata.tryGet("stringKey", String.class); + Assertions.assertTrue(stringResult.isPresent()); + Assertions.assertEquals("stringValue", stringResult.get()); + + // Test tryGet method with type conversion + Optional intResult = metadata.tryGet("intKey", Integer.class); + Assertions.assertTrue(intResult.isPresent()); + Assertions.assertEquals(123, intResult.get()); + + // Test tryGet method with string conversion + metadata.put("stringIntKey", "456"); + Optional stringIntResult = metadata.tryGet("stringIntKey", Integer.class); + Assertions.assertTrue(stringIntResult.isPresent()); + Assertions.assertEquals(456, stringIntResult.get()); + + Optional instantResult = metadata.tryGet("instantKey", Instant.class); + Assertions.assertTrue(instantResult.isPresent()); + Assertions.assertEquals(instant, instantResult.get()); + + // Test tryGet method with a non-existent key + Optional nonExistentResult = metadata.tryGet("nonExistentKey", String.class); + Assertions.assertFalse(nonExistentResult.isPresent()); + + // Test tryGet method with incompatible type + Optional incompatibleResult = metadata.tryGet("stringKey", UUID.class); + Assertions.assertFalse(incompatibleResult.isPresent()); + } +} \ No newline at end of file diff --git a/src/test/java/io/kurrent/dbclient/v2/RegisteredSchemaTests.java b/src/test/java/io/kurrent/dbclient/v2/RegisteredSchemaTests.java new file mode 100644 index 00000000..e65ef299 --- /dev/null +++ b/src/test/java/io/kurrent/dbclient/v2/RegisteredSchemaTests.java @@ -0,0 +1,105 @@ +package io.kurrent.dbclient.v2; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.time.OffsetDateTime; + +public class RegisteredSchemaTests { + + @Test + public void testNoneRegisteredSchema() { + RegisteredSchema none = RegisteredSchema.NONE; + + Assertions.assertNull(none.getSchemaName()); + Assertions.assertEquals(SchemaDataFormat.UNSPECIFIED, none.getDataFormat()); + Assertions.assertNull(none.getSchemaVersionId()); + Assertions.assertNull(none.getDefinition()); + Assertions.assertEquals(0, none.getVersionNumber()); + Assertions.assertNotNull(none.getCreatedAt()); + } + + @Test + public void testRegisteredSchemaConstructor() { + String schemaName = "test-schema"; + SchemaDataFormat dataFormat = SchemaDataFormat.JSON; + String schemaVersionId = "schema-version-123"; + String definition = "{\"type\":\"object\",\"properties\":{}}"; + int versionNumber = 1; + OffsetDateTime createdAt = OffsetDateTime.now(); + + RegisteredSchema schema = new RegisteredSchema( + schemaName, + dataFormat, + schemaVersionId, + definition, + versionNumber, + createdAt + ); + + Assertions.assertEquals(schemaName, schema.getSchemaName()); + Assertions.assertEquals(dataFormat, schema.getDataFormat()); + Assertions.assertEquals(schemaVersionId, schema.getSchemaVersionId()); + Assertions.assertEquals(definition, schema.getDefinition()); + Assertions.assertEquals(versionNumber, schema.getVersionNumber()); + Assertions.assertEquals(createdAt, schema.getCreatedAt()); + } + + @Test + public void testToSchemaInfo() { + String schemaName = "test-schema"; + SchemaDataFormat dataFormat = SchemaDataFormat.JSON; + + RegisteredSchema schema = new RegisteredSchema( + schemaName, + dataFormat, + "schema-version-123", + "{\"type\":\"object\",\"properties\":{}}", + 1, + OffsetDateTime.now() + ); + + SchemaInfo schemaInfo = schema.toSchemaInfo(); + + Assertions.assertEquals(schemaName, schemaInfo.getSchemaName()); + Assertions.assertEquals(dataFormat, schemaInfo.getDataFormat()); + } + + @Test + public void testEqualsAndHashCode() { + OffsetDateTime now = OffsetDateTime.now(); + + RegisteredSchema schema1 = new RegisteredSchema( + "test-schema", + SchemaDataFormat.JSON, + "schema-version-123", + "{\"type\":\"object\",\"properties\":{}}", + 1, + now + ); + + RegisteredSchema schema2 = new RegisteredSchema( + "test-schema", + SchemaDataFormat.JSON, + "schema-version-123", + "{\"type\":\"object\",\"properties\":{}}", + 1, + now + ); + + RegisteredSchema schema3 = new RegisteredSchema( + "different-schema", + SchemaDataFormat.PROTOBUF, + "schema-version-456", + "message Test {}", + 2, + now + ); + + Assertions.assertEquals(schema1, schema2); + Assertions.assertEquals(schema1.hashCode(), schema2.hashCode()); + + Assertions.assertNotEquals(schema1, schema3); + Assertions.assertNotEquals(schema1.hashCode(), schema3.hashCode()); + } +} \ No newline at end of file diff --git a/src/test/java/io/kurrent/dbclient/v2/SchemaDataFormatTests.java b/src/test/java/io/kurrent/dbclient/v2/SchemaDataFormatTests.java new file mode 100644 index 00000000..a07987f3 --- /dev/null +++ b/src/test/java/io/kurrent/dbclient/v2/SchemaDataFormatTests.java @@ -0,0 +1,30 @@ +package io.kurrent.dbclient.v2; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class SchemaDataFormatTests { + + @Test + public void testEnumValues() { + // Verify enum values + Assertions.assertEquals(0, SchemaDataFormat.UNSPECIFIED.getValue()); + Assertions.assertEquals(1, SchemaDataFormat.JSON.getValue()); + Assertions.assertEquals(2, SchemaDataFormat.PROTOBUF.getValue()); + Assertions.assertEquals(3, SchemaDataFormat.AVRO.getValue()); + Assertions.assertEquals(4, SchemaDataFormat.BYTES.getValue()); + } + + @Test + public void testFromValue() { + // Verify fromValue method + Assertions.assertEquals(SchemaDataFormat.UNSPECIFIED, SchemaDataFormat.fromValue(0)); + Assertions.assertEquals(SchemaDataFormat.JSON, SchemaDataFormat.fromValue(1)); + Assertions.assertEquals(SchemaDataFormat.PROTOBUF, SchemaDataFormat.fromValue(2)); + Assertions.assertEquals(SchemaDataFormat.AVRO, SchemaDataFormat.fromValue(3)); + Assertions.assertEquals(SchemaDataFormat.BYTES, SchemaDataFormat.fromValue(4)); + + // Test invalid value + Assertions.assertEquals(SchemaDataFormat.UNSPECIFIED, SchemaDataFormat.fromValue(99)); + } +} \ No newline at end of file diff --git a/src/test/java/io/kurrent/dbclient/v2/SchemaInfoTests.java b/src/test/java/io/kurrent/dbclient/v2/SchemaInfoTests.java new file mode 100644 index 00000000..61331b0c --- /dev/null +++ b/src/test/java/io/kurrent/dbclient/v2/SchemaInfoTests.java @@ -0,0 +1,167 @@ +package io.kurrent.dbclient.v2; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class SchemaInfoTests { + + @Test + public void testNoneSchemaInfo() { + SchemaInfo none = SchemaInfo.NONE; + + Assertions.assertEquals("", none.getSchemaName()); + Assertions.assertEquals(SchemaDataFormat.UNSPECIFIED, none.getDataFormat()); + Assertions.assertEquals("application/octet-stream", none.getContentType()); + Assertions.assertTrue(none.isSchemaNameMissing()); + } + + @Test + public void testSchemaInfoConstructor() { + SchemaInfo jsonSchema = new SchemaInfo("test-schema", SchemaDataFormat.JSON); + + Assertions.assertEquals("test-schema", jsonSchema.getSchemaName()); + Assertions.assertEquals(SchemaDataFormat.JSON, jsonSchema.getDataFormat()); + Assertions.assertEquals("application/json", jsonSchema.getContentType()); + Assertions.assertFalse(jsonSchema.isSchemaNameMissing()); + + SchemaInfo protobufSchema = new SchemaInfo("proto-schema", SchemaDataFormat.PROTOBUF); + + Assertions.assertEquals("proto-schema", protobufSchema.getSchemaName()); + Assertions.assertEquals(SchemaDataFormat.PROTOBUF, protobufSchema.getDataFormat()); + Assertions.assertEquals("application/vnd.google.protobuf", protobufSchema.getContentType()); + Assertions.assertFalse(protobufSchema.isSchemaNameMissing()); + + SchemaInfo avroSchema = new SchemaInfo("avro-schema", SchemaDataFormat.AVRO); + + Assertions.assertEquals("avro-schema", avroSchema.getSchemaName()); + Assertions.assertEquals(SchemaDataFormat.AVRO, avroSchema.getDataFormat()); + Assertions.assertEquals("application/vnd.apache.avro+json", avroSchema.getContentType()); + Assertions.assertFalse(avroSchema.isSchemaNameMissing()); + + SchemaInfo bytesSchema = new SchemaInfo("bytes-schema", SchemaDataFormat.BYTES); + + Assertions.assertEquals("bytes-schema", bytesSchema.getSchemaName()); + Assertions.assertEquals(SchemaDataFormat.BYTES, bytesSchema.getDataFormat()); + Assertions.assertEquals("application/octet-stream", bytesSchema.getContentType()); + Assertions.assertFalse(bytesSchema.isSchemaNameMissing()); + } + + @Test + public void testSchemaNameMissing() { + SchemaInfo emptyName = new SchemaInfo("", SchemaDataFormat.JSON); + Assertions.assertTrue(emptyName.isSchemaNameMissing()); + + SchemaInfo whitespace = new SchemaInfo(" ", SchemaDataFormat.JSON); + Assertions.assertTrue(whitespace.isSchemaNameMissing()); + + SchemaInfo nullName = new SchemaInfo(null, SchemaDataFormat.JSON); + Assertions.assertTrue(nullName.isSchemaNameMissing()); + + SchemaInfo validName = new SchemaInfo("valid-name", SchemaDataFormat.JSON); + Assertions.assertFalse(validName.isSchemaNameMissing()); + } + + @Test + public void testInjectIntoMetadata() { + SchemaInfo schema = new SchemaInfo("test-schema", SchemaDataFormat.JSON); + Metadata metadata = new Metadata(); + + schema.injectIntoMetadata(metadata); + + Assertions.assertEquals(2, metadata.size()); + Assertions.assertEquals("test-schema", metadata.get("schema-name", String.class)); + Assertions.assertEquals("json", metadata.get("schema-data-format", String.class)); + } + + @Test + public void testInjectSchemaNameIntoMetadata() { + SchemaInfo schema = new SchemaInfo("test-schema", SchemaDataFormat.JSON); + Metadata metadata = new Metadata(); + + schema.injectSchemaNameIntoMetadata(metadata); + + Assertions.assertEquals(1, metadata.size()); + Assertions.assertEquals("test-schema", metadata.get("schema-name", String.class)); + Assertions.assertNull(metadata.get("schema-data-format", String.class)); + } + + @Test + public void testFromMetadata() { + Metadata metadata = new Metadata(); + metadata.set("schema-name", "test-schema"); + metadata.set("schema-data-format", "json"); + + SchemaInfo schema = SchemaInfo.fromMetadata(metadata); + + Assertions.assertEquals("test-schema", schema.getSchemaName()); + Assertions.assertEquals(SchemaDataFormat.JSON, schema.getDataFormat()); + Assertions.assertEquals("application/json", schema.getContentType()); + } + + @Test + public void testFromMetadataWithMissingValues() { + Metadata emptyMetadata = new Metadata(); + SchemaInfo schema = SchemaInfo.fromMetadata(emptyMetadata); + + Assertions.assertEquals("", schema.getSchemaName()); + Assertions.assertEquals(SchemaDataFormat.UNSPECIFIED, schema.getDataFormat()); + + Metadata nameOnlyMetadata = new Metadata(); + nameOnlyMetadata.set("schema-name", "test-schema"); + + SchemaInfo nameOnlySchema = SchemaInfo.fromMetadata(nameOnlyMetadata); + + Assertions.assertEquals("test-schema", nameOnlySchema.getSchemaName()); + Assertions.assertEquals(SchemaDataFormat.UNSPECIFIED, nameOnlySchema.getDataFormat()); + + Metadata formatOnlyMetadata = new Metadata(); + formatOnlyMetadata.set("schema-data-format", "protobuf"); + + SchemaInfo formatOnlySchema = SchemaInfo.fromMetadata(formatOnlyMetadata); + + Assertions.assertEquals("", formatOnlySchema.getSchemaName()); + Assertions.assertEquals(SchemaDataFormat.PROTOBUF, formatOnlySchema.getDataFormat()); + } + + @Test + public void testFromContentType() { + SchemaInfo jsonSchema = SchemaInfo.fromContentType("test-schema", "application/json"); + + Assertions.assertEquals("test-schema", jsonSchema.getSchemaName()); + Assertions.assertEquals(SchemaDataFormat.JSON, jsonSchema.getDataFormat()); + + SchemaInfo protobufSchema = SchemaInfo.fromContentType("proto-schema", "application/vnd.google.protobuf"); + + Assertions.assertEquals("proto-schema", protobufSchema.getSchemaName()); + Assertions.assertEquals(SchemaDataFormat.PROTOBUF, protobufSchema.getDataFormat()); + + SchemaInfo bytesSchema = SchemaInfo.fromContentType("bytes-schema", "application/octet-stream"); + + Assertions.assertEquals("bytes-schema", bytesSchema.getSchemaName()); + Assertions.assertEquals(SchemaDataFormat.BYTES, bytesSchema.getDataFormat()); + + SchemaInfo unknownSchema = SchemaInfo.fromContentType("unknown-schema", "unknown/content-type"); + + Assertions.assertEquals("unknown-schema", unknownSchema.getSchemaName()); + Assertions.assertEquals(SchemaDataFormat.UNSPECIFIED, unknownSchema.getDataFormat()); + } + + @Test + public void testFromContentTypeWithInvalidArguments() { + Assertions.assertThrows(IllegalArgumentException.class, () -> { + SchemaInfo.fromContentType(null, "application/json"); + }); + + Assertions.assertThrows(IllegalArgumentException.class, () -> { + SchemaInfo.fromContentType("", "application/json"); + }); + + Assertions.assertThrows(IllegalArgumentException.class, () -> { + SchemaInfo.fromContentType("test-schema", null); + }); + + Assertions.assertThrows(IllegalArgumentException.class, () -> { + SchemaInfo.fromContentType("test-schema", ""); + }); + } +} \ No newline at end of file