diff --git a/server/app/build.gradle.kts b/server/app/build.gradle.kts index d7462a46c..d9b41cab9 100644 --- a/server/app/build.gradle.kts +++ b/server/app/build.gradle.kts @@ -129,7 +129,7 @@ tasks.jacocoTestCoverageVerification { violationRules { rule { limit { - minimum = BigDecimal.valueOf(0.75) + minimum = BigDecimal.valueOf(0.77) } } } diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/ClientCapabilitiesMapper.java b/server/app/src/main/java/io/whitefox/api/deltasharing/ClientCapabilitiesMapper.java new file mode 100644 index 000000000..6e246954a --- /dev/null +++ b/server/app/src/main/java/io/whitefox/api/deltasharing/ClientCapabilitiesMapper.java @@ -0,0 +1,85 @@ +package io.whitefox.api.deltasharing; + +import io.micrometer.common.util.StringUtils; +import io.whitefox.api.deltasharing.errors.InvalidDeltaSharingCapabilities; +import io.whitefox.api.server.DeltaHeaders; +import io.whitefox.core.services.capabilities.ClientCapabilities; +import io.whitefox.core.services.capabilities.ReaderFeatures; +import io.whitefox.core.services.capabilities.ResponseFormat; +import io.whitefox.core.services.exceptions.UnknownResponseFormat; +import jakarta.enterprise.context.ApplicationScoped; +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@ApplicationScoped +public class ClientCapabilitiesMapper implements DeltaHeaders { + + /** + * @param header the string representation of the capabilities of the delta-sharing client + * @return the clean and business oriented version of it + */ + public ClientCapabilities parseDeltaSharingCapabilities(String header) { + + if (header == null) { + return ClientCapabilities.parquet(); + } else { + Map> rawValues = Arrays.stream(header.split(";", -1)) + .flatMap(entry -> { + if (StringUtils.isBlank(entry)) { + return Stream.empty(); + } + var keyAndValues = entry.split("=", -1); + if (keyAndValues.length != 2) { + throw new InvalidDeltaSharingCapabilities(String.format( + "Each %s must be in the format key=value", DELTA_SHARE_CAPABILITIES_HEADER)); + } + var key = keyAndValues[0]; + var values = Arrays.stream(keyAndValues[1].split(",", -1)) + .collect(Collectors.toUnmodifiableSet()); + return Stream.of(Map.entry(key, values)); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + Set responseFormats = rawValues.get(DELTA_SHARING_RESPONSE_FORMAT); + String theResponseFormat = pickResponseFormat(responseFormats); + if (ResponseFormat.parquet.stringRepresentation().equalsIgnoreCase(theResponseFormat)) { + return ClientCapabilities.parquet(); + } else if (ResponseFormat.delta.stringRepresentation().equalsIgnoreCase(theResponseFormat)) { + var unparsed = + Optional.ofNullable(rawValues.get(DELTA_SHARING_READER_FEATURES)).orElse(Set.of()); + return ClientCapabilities.delta(unparsed.stream() + .map(ReaderFeatures::fromString) + .flatMap(Optional::stream) + .collect(Collectors.toSet())); + } else { + throw new UnknownResponseFormat( + String.format("Unknown response format %s", theResponseFormat)); + } + } + } + + private String pickResponseFormat(Set responseFormats) { + if (responseFormats == null || responseFormats.isEmpty()) { + return ResponseFormat.parquet.stringRepresentation(); + } else { + // Quoting the protocol: + // > If there's a list of responseFormat specified, such as responseFormat=delta,parquet. + // > The server may choose to respond in parquet format if the table does not have any + // advanced features. + // > The server must respond in delta format if the table has advanced features which are not + // compatible + // > with the parquet format. + // so here we choose to return delta (if present) so that the service can choose to downgrade + // it + // for compatibility reasons + if (responseFormats.contains(ResponseFormat.delta.stringRepresentation())) { + return ResponseFormat.delta.stringRepresentation(); + } else { + return responseFormats.iterator().next(); + } + } + } +} diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/DeltaMappers.java b/server/app/src/main/java/io/whitefox/api/deltasharing/DeltaMappers.java index 406c65ea4..37808f742 100644 --- a/server/app/src/main/java/io/whitefox/api/deltasharing/DeltaMappers.java +++ b/server/app/src/main/java/io/whitefox/api/deltasharing/DeltaMappers.java @@ -1,5 +1,6 @@ package io.whitefox.api.deltasharing; +import io.whitefox.api.deltasharing.model.v1.Format; import io.whitefox.api.deltasharing.model.v1.TableMetadataResponse; import io.whitefox.api.deltasharing.model.v1.TableQueryResponse; import io.whitefox.api.deltasharing.model.v1.generated.*; @@ -7,10 +8,10 @@ import io.whitefox.api.deltasharing.model.v1.parquet.ParquetMetadata; import io.whitefox.api.deltasharing.model.v1.parquet.ParquetProtocol; import io.whitefox.api.server.CommonMappers; -import io.whitefox.api.server.WhitefoxMappers; import io.whitefox.core.*; import io.whitefox.core.Schema; import io.whitefox.core.Share; +import io.whitefox.core.services.capabilities.ResponseFormat; import java.util.*; import java.util.stream.Collectors; @@ -67,19 +68,27 @@ public static TableQueryResponse readTableResult2api(ReadTableResult readTableRe } private static ParquetMetadata metadata2Api(Metadata metadata) { - return ParquetMetadata.builder() - .metadata(ParquetMetadata.Metadata.builder() - .id(metadata.id()) - .name(metadata.name()) - .description(metadata.description()) - .format(WhitefoxMappers.format2api(metadata.format())) - .schemaString(metadata.tableSchema().structType().toJson()) - .partitionColumns(metadata.partitionColumns()) - .configuration(Optional.of(metadata.configuration())) - .version(metadata.version()) - .numFiles(metadata.numFiles()) - .build()) - .build(); + switch (metadata.format()) { + case parquet: + return ParquetMetadata.builder() + .metadata(ParquetMetadata.Metadata.builder() + .id(metadata.id()) + .name(metadata.name()) + .description(metadata.description()) + .format(new Format()) + .schemaString(metadata.tableSchema().structType().toJson()) + .partitionColumns(metadata.partitionColumns()) + .configuration(Optional.ofNullable(metadata.configuration())) + .version(metadata.version()) + .numFiles(metadata.numFiles()) + .build()) + .build(); + case delta: + throw new IllegalArgumentException("Delta response format is not supported"); + default: + throw new IllegalArgumentException( + String.format("%s response format is not supported", metadata.format())); + } } private static ParquetProtocol protocol2Api(Protocol protocol) { @@ -115,22 +124,10 @@ public static io.whitefox.api.deltasharing.model.v1.generated.Table table2api( } /** - * NOTE: this is an undocumented feature of the reference impl of delta-sharing, it's not part of the - * protocol - * ---- - * Return the [[io.whitefox.api.server.DeltaHeaders.DELTA_SHARE_CAPABILITIES_HEADER]] header - * that will be set in the response w/r/t the one received in the request. - * If the request did not contain any, we will return an empty one. + * Serializes the response format in its text-based representation */ - public static Map toHeaderCapabilitiesMap(String headerCapabilities) { - if (headerCapabilities == null) { - return Map.of(); - } - return Arrays.stream(headerCapabilities.toLowerCase().split(";")) - .map(h -> h.split("=")) - .filter(h -> h.length == 2) - .map(splits -> Map.entry(splits[0], splits[1])) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + public static String toResponseFormatHeader(ResponseFormat responseFormat) { + return responseFormat.stringRepresentation(); } public static TableMetadataResponse toTableResponseMetadata(Metadata m) { diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/errors/InvalidDeltaSharingCapabilities.java b/server/app/src/main/java/io/whitefox/api/deltasharing/errors/InvalidDeltaSharingCapabilities.java new file mode 100644 index 000000000..4a3a955ae --- /dev/null +++ b/server/app/src/main/java/io/whitefox/api/deltasharing/errors/InvalidDeltaSharingCapabilities.java @@ -0,0 +1,7 @@ +package io.whitefox.api.deltasharing.errors; + +public class InvalidDeltaSharingCapabilities extends IllegalArgumentException { + public InvalidDeltaSharingCapabilities(String message) { + super(message); + } +} diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImpl.java b/server/app/src/main/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImpl.java index e037bab94..eceda2e08 100644 --- a/server/app/src/main/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImpl.java +++ b/server/app/src/main/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImpl.java @@ -2,6 +2,7 @@ import static io.whitefox.api.server.CommonMappers.mapList; +import io.whitefox.api.deltasharing.ClientCapabilitiesMapper; import io.whitefox.api.deltasharing.DeltaMappers; import io.whitefox.api.deltasharing.encoders.DeltaPageTokenEncoder; import io.whitefox.api.deltasharing.model.v1.generated.ListSchemasResponse; @@ -29,18 +30,22 @@ public class DeltaSharesApiImpl implements DeltaApiApi, ApiUtils { private final TableMetadataSerializer tableResponseSerializer; private final TableQueryResponseSerializer tableQueryResponseSerializer; + private final ClientCapabilitiesMapper clientCapabilitiesMapper; + @Inject public DeltaSharesApiImpl( DeltaSharesService deltaSharesService, ShareService shareService, DeltaPageTokenEncoder encoder, TableMetadataSerializer tableResponseSerializer, - TableQueryResponseSerializer tableQueryResponseSerializer) { + TableQueryResponseSerializer tableQueryResponseSerializer, + ClientCapabilitiesMapper clientCapabilitiesMapper) { this.deltaSharesService = deltaSharesService; this.tokenEncoder = encoder; this.tableResponseSerializer = tableResponseSerializer; this.tableQueryResponseSerializer = tableQueryResponseSerializer; this.shareService = shareService; + this.clientCapabilitiesMapper = clientCapabilitiesMapper; } @Override @@ -62,9 +67,10 @@ public Response getTableChanges( String endingTimestamp, Boolean includeHistoricalMetadata, String deltaSharingCapabilities) { - return Response.ok().build(); + return Response.status(501).build(); } + // TODO handle capabilities @Override public Response getTableMetadata( String share, @@ -75,20 +81,21 @@ public Response getTableMetadata( return wrapExceptions( () -> { var startingTimestamp = parseTimestamp(startingTimestampStr); + var clientCapabilities = + clientCapabilitiesMapper.parseDeltaSharingCapabilities(deltaSharingCapabilities); return optionalToNotFound( - deltaSharesService.getTableMetadata(share, schema, table, startingTimestamp), + deltaSharesService.getTableMetadata( + share, schema, table, startingTimestamp, clientCapabilities), m -> optionalToNotFound( deltaSharesService.getTableVersion(share, schema, table, startingTimestamp), v -> Response.ok( tableResponseSerializer.serialize( DeltaMappers.toTableResponseMetadata(m)), ndjsonMediaType) - .status(Response.Status.OK.getStatusCode()) .header(DELTA_TABLE_VERSION_HEADER, String.valueOf(v)) .header( DELTA_SHARE_CAPABILITIES_HEADER, - getResponseFormatHeader( - DeltaMappers.toHeaderCapabilitiesMap(deltaSharingCapabilities))) + DeltaMappers.toResponseFormatHeader(m.format())) .build())); }, exceptionToResponse); @@ -190,15 +197,18 @@ public Response queryTable( return wrapExceptions( () -> { var readResult = deltaSharesService.queryTable( - share, schema, table, DeltaMappers.api2ReadTableRequest(queryRequest)); + share, + schema, + table, + DeltaMappers.api2ReadTableRequest(queryRequest), + clientCapabilitiesMapper.parseDeltaSharingCapabilities(deltaSharingCapabilities)); var serializedReadResult = tableQueryResponseSerializer.serialize(DeltaMappers.readTableResult2api(readResult)); return Response.ok(serializedReadResult, ndjsonMediaType) .header(DELTA_TABLE_VERSION_HEADER, readResult.version()) .header( DELTA_SHARE_CAPABILITIES_HEADER, - getResponseFormatHeader( - DeltaMappers.toHeaderCapabilitiesMap(deltaSharingCapabilities))) + DeltaMappers.toResponseFormatHeader(readResult.responseFormat())) .build(); }, exceptionToResponse); diff --git a/server/app/src/main/java/io/whitefox/api/server/ApiUtils.java b/server/app/src/main/java/io/whitefox/api/server/ApiUtils.java index 965dcf28b..a1ed72872 100644 --- a/server/app/src/main/java/io/whitefox/api/server/ApiUtils.java +++ b/server/app/src/main/java/io/whitefox/api/server/ApiUtils.java @@ -3,7 +3,6 @@ import io.quarkus.runtime.util.ExceptionUtil; import io.whitefox.api.deltasharing.model.v1.generated.CommonErrorResponse; import io.whitefox.core.Principal; -import io.whitefox.core.services.DeltaSharedTable; import io.whitefox.core.services.exceptions.AlreadyExists; import io.whitefox.core.services.exceptions.NotFound; import jakarta.ws.rs.core.Response; @@ -11,7 +10,6 @@ import java.time.OffsetDateTime; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeParseException; -import java.util.Map; import java.util.Optional; import java.util.function.Function; import java.util.function.Supplier; @@ -70,18 +68,6 @@ default Response optionalToNotFound(Optional opt, Function f return opt.map(fn).orElse(notFoundResponse()); } - default String getResponseFormatHeader(Map deltaSharingCapabilities) { - return String.format( - "%s=%s", - DeltaHeaders.DELTA_SHARING_RESPONSE_FORMAT, getResponseFormat(deltaSharingCapabilities)); - } - - default String getResponseFormat(Map deltaSharingCapabilities) { - return deltaSharingCapabilities.getOrDefault( - DeltaHeaders.DELTA_SHARING_RESPONSE_FORMAT, - DeltaSharedTable.DeltaShareTableFormat.RESPONSE_FORMAT_PARQUET); - } - default Principal getRequestPrincipal() { return new Principal("Mr. Fox"); } diff --git a/server/app/src/main/java/io/whitefox/api/server/DeltaHeaders.java b/server/app/src/main/java/io/whitefox/api/server/DeltaHeaders.java index 6c261d1a0..e22a309eb 100644 --- a/server/app/src/main/java/io/whitefox/api/server/DeltaHeaders.java +++ b/server/app/src/main/java/io/whitefox/api/server/DeltaHeaders.java @@ -2,6 +2,7 @@ public interface DeltaHeaders { String DELTA_SHARING_RESPONSE_FORMAT = "responseformat"; + String DELTA_SHARING_READER_FEATURES = "readerfeatures"; String DELTA_TABLE_VERSION_HEADER = "Delta-Table-Version"; String DELTA_SHARE_CAPABILITIES_HEADER = "delta-sharing-capabilities"; } diff --git a/server/app/src/main/java/io/whitefox/api/server/WhitefoxMappers.java b/server/app/src/main/java/io/whitefox/api/server/WhitefoxMappers.java index 08a656cb3..74a3e9dbd 100644 --- a/server/app/src/main/java/io/whitefox/api/server/WhitefoxMappers.java +++ b/server/app/src/main/java/io/whitefox/api/server/WhitefoxMappers.java @@ -1,7 +1,5 @@ package io.whitefox.api.server; -import io.whitefox.api.deltasharing.model.v1.Format; -import io.whitefox.api.deltasharing.model.v1.parquet.ParquetMetadata; import io.whitefox.api.deltasharing.model.v1.parquet.ParquetProtocol; import io.whitefox.api.model.v1.generated.*; import io.whitefox.core.*; @@ -172,31 +170,6 @@ public static MetastoreType api2MetastoreType( } } - private static ParquetMetadata metadata2Api(Metadata metadata) { - return ParquetMetadata.builder() - .metadata(ParquetMetadata.Metadata.builder() - .id(metadata.id()) - .name(metadata.name()) - .description(metadata.description()) - .format(format2api(metadata.format())) - .schemaString(metadata.tableSchema().structType().toJson()) - .partitionColumns(metadata.partitionColumns()) - .configuration(Optional.ofNullable(metadata.configuration())) - .version(metadata.version()) - .numFiles(metadata.numFiles()) - .build()) - .build(); - } - - public static Format format2api(Metadata.Format format) { - switch (format) { - case PARQUET: - return new Format(); - } - // never gonna happen, java is dumb - return null; - } - private static ParquetProtocol protocol2Api(Protocol protocol) { return ParquetProtocol.ofMinReaderVersion(protocol.minReaderVersion().orElse(1)); } diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/ClientCapabilitiesMapperTest.java b/server/app/src/test/java/io/whitefox/api/deltasharing/ClientCapabilitiesMapperTest.java new file mode 100644 index 000000000..80a9bf0e1 --- /dev/null +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/ClientCapabilitiesMapperTest.java @@ -0,0 +1,111 @@ +package io.whitefox.api.deltasharing; + +import io.whitefox.api.server.DeltaHeaders; +import io.whitefox.core.services.capabilities.CapabilitiesConstants; +import io.whitefox.core.services.capabilities.ReaderFeatures; +import io.whitefox.core.services.capabilities.ResponseFormat; +import io.whitefox.core.services.exceptions.UnknownResponseFormat; +import java.util.Set; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class ClientCapabilitiesMapperTest implements DeltaHeaders { + + ClientCapabilitiesMapper mapper = new ClientCapabilitiesMapper(); + String responseFormatDelta = "responseformat=delta"; + + @Test + void parseSimpleResponseFormatDelta() { + Assertions.assertEquals( + ResponseFormat.delta, + mapper.parseDeltaSharingCapabilities(responseFormatDelta).responseFormat()); + } + + @Test + void parseSimpleResponseFormatParquet() { + Assertions.assertEquals( + ResponseFormat.parquet, + mapper.parseDeltaSharingCapabilities("responseformat=PaRquEt").responseFormat()); + } + + @Test + void failToParseUnknownResponseFormatAndFail() { + Assertions.assertThrows( + UnknownResponseFormat.class, + () -> mapper.parseDeltaSharingCapabilities("responseformat=iceberg").responseFormat()); + } + + @Test + void failToParseUnknownResponseFormatAndReturnOthers() { + Assertions.assertEquals( + ResponseFormat.delta, + mapper + .parseDeltaSharingCapabilities("responseformat=iceberg,parquet,delta") + .responseFormat()); + } + + @Test + void noCapabilitiesEqualsDefault() { + Assertions.assertEquals( + ResponseFormat.parquet, + mapper.parseDeltaSharingCapabilities((String) null).responseFormat()); + Assertions.assertEquals( + Set.of(), mapper.parseDeltaSharingCapabilities((String) null).readerFeatures()); + Assertions.assertEquals( + ResponseFormat.parquet, mapper.parseDeltaSharingCapabilities("").responseFormat()); + Assertions.assertEquals(Set.of(), mapper.parseDeltaSharingCapabilities("").readerFeatures()); + } + + @Test + void parseSimpleReaderFeature() { + Assertions.assertEquals( + Set.of(ReaderFeatures.DELETION_VECTORS), + mapper + .parseDeltaSharingCapabilities(String.format( + responseFormatDelta + ";%s=%s", + DELTA_SHARING_READER_FEATURES, + CapabilitiesConstants.DELTA_SHARING_READER_FEATURE_DELETION_VECTOR)) + .readerFeatures()); + } + + @Test + void failToParseUnknownReaderFeatureAndReturnNothing() { + Assertions.assertEquals( + Set.of(), + mapper + .parseDeltaSharingCapabilities( + String.format("%s=%s", DELTA_SHARING_READER_FEATURES, "unknown")) + .readerFeatures()); + } + + @Test + void failToParseUnknownReaderFeatureAndReturnOthers() { + Assertions.assertEquals( + Set.of(ReaderFeatures.COLUMN_MAPPING, ReaderFeatures.DOMAIN_METADATA), + mapper + .parseDeltaSharingCapabilities(String.format( + responseFormatDelta + ";%s=%s,%s,%s", + DELTA_SHARING_READER_FEATURES, + "unknown", + CapabilitiesConstants.DELTA_SHARING_READER_FEATURE_COLUMN_MAPPING, + CapabilitiesConstants.DELTA_SHARING_READER_FEATURE_DOMAIN_METADATA)) + .readerFeatures()); + } + + @Test + void kitchenSink() { + var readerFeatures = String.format( + "%s=%s,%s,%s", + DELTA_SHARING_READER_FEATURES, + "unknown", + CapabilitiesConstants.DELTA_SHARING_READER_FEATURE_COLUMN_MAPPING, + CapabilitiesConstants.DELTA_SHARING_READER_FEATURE_DOMAIN_METADATA); + var responseFormat = "responseformat=iceberg,parquet,delta"; + var capabilities = mapper.parseDeltaSharingCapabilities( + String.format("%s;%s", readerFeatures, responseFormat)); + Assertions.assertEquals(ResponseFormat.delta, capabilities.responseFormat()); + Assertions.assertEquals( + Set.of(ReaderFeatures.COLUMN_MAPPING, ReaderFeatures.DOMAIN_METADATA), + capabilities.readerFeatures()); + } +} diff --git a/server/core/src/main/java/io/whitefox/core/Metadata.java b/server/core/src/main/java/io/whitefox/core/Metadata.java index 79c6eaaf7..53f074adf 100644 --- a/server/core/src/main/java/io/whitefox/core/Metadata.java +++ b/server/core/src/main/java/io/whitefox/core/Metadata.java @@ -1,147 +1,21 @@ package io.whitefox.core; -import io.whitefox.annotations.SkipCoverageGenerated; +import io.whitefox.core.services.capabilities.ResponseFormat; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; +import lombok.Value; +@Value public class Metadata { - private final String id; - private final Optional name; - private final Optional description; - private final Format format; - private final TableSchema tableSchema; - private final List partitionColumns; - private final Map configuration; - private final Optional version; - private final Optional size; - private final Optional numFiles; - - public enum Format { - PARQUET("parquet"); - - private final String provider; - - private Format(final String provider) { - this.provider = provider; - } - - public String provider() { - return this.provider; - } - } - - public Metadata( - String id, - Optional name, - Optional description, - Format format, - TableSchema tableSchema, - List partitionColumns, - Map configuration, - Optional version, - Optional size, - Optional numFiles) { - this.id = id; - this.name = name; - this.description = description; - this.format = format; - this.tableSchema = tableSchema; - this.partitionColumns = partitionColumns; - this.configuration = configuration; - this.version = version; - this.size = size; - this.numFiles = numFiles; - } - - public String id() { - return id; - } - - public Optional name() { - return name; - } - - public Optional description() { - return description; - } - - public Format format() { - return format; - } - - public TableSchema tableSchema() { - return tableSchema; - } - - public List partitionColumns() { - return partitionColumns; - } - - public Map configuration() { - return configuration; - } - - public Optional version() { - return version; - } - - public Optional size() { - return size; - } - - public Optional numFiles() { - return numFiles; - } - - @Override - @SkipCoverageGenerated - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Metadata metadata = (Metadata) o; - return Objects.equals(id, metadata.id) - && Objects.equals(name, metadata.name) - && Objects.equals(description, metadata.description) - && format == metadata.format - && Objects.equals(tableSchema, metadata.tableSchema) - && Objects.equals(partitionColumns, metadata.partitionColumns) - && Objects.equals(configuration, metadata.configuration) - && Objects.equals(version, metadata.version) - && Objects.equals(size, metadata.size) - && Objects.equals(numFiles, metadata.numFiles); - } - - @Override - @SkipCoverageGenerated - public int hashCode() { - return Objects.hash( - id, - name, - description, - format, - tableSchema, - partitionColumns, - configuration, - version, - size, - numFiles); - } - - @Override - @SkipCoverageGenerated - public String toString() { - return "Metadata{" + "id='" - + id + '\'' + ", name=" - + name + ", description=" - + description + ", format=" - + format + ", tableSchema=" - + tableSchema + ", partitionColumns=" - + partitionColumns + ", configuration=" - + configuration + ", version=" - + version + ", size=" - + size + ", numFiles=" - + numFiles + '}'; - } + String id; + Optional name; + Optional description; + ResponseFormat format; + TableSchema tableSchema; + List partitionColumns; + Map configuration; + Optional version; + Optional size; + Optional numFiles; } diff --git a/server/core/src/main/java/io/whitefox/core/ReadTableResult.java b/server/core/src/main/java/io/whitefox/core/ReadTableResult.java index 252811242..2a7ef8cef 100644 --- a/server/core/src/main/java/io/whitefox/core/ReadTableResult.java +++ b/server/core/src/main/java/io/whitefox/core/ReadTableResult.java @@ -1,5 +1,6 @@ package io.whitefox.core; +import io.whitefox.core.services.capabilities.ResponseFormat; import java.util.List; import lombok.Value; @@ -9,4 +10,5 @@ public class ReadTableResult { Metadata metadata; List files; long version; + ResponseFormat responseFormat; } diff --git a/server/core/src/main/java/io/whitefox/core/services/DeltaSharedTable.java b/server/core/src/main/java/io/whitefox/core/services/DeltaSharedTable.java index cbeb66819..abdc92656 100644 --- a/server/core/src/main/java/io/whitefox/core/services/DeltaSharedTable.java +++ b/server/core/src/main/java/io/whitefox/core/services/DeltaSharedTable.java @@ -9,6 +9,7 @@ import io.whitefox.core.*; import io.whitefox.core.Metadata; import io.whitefox.core.TableSchema; +import io.whitefox.core.services.capabilities.ResponseFormat; import io.whitefox.core.types.predicates.PredicateException; import java.sql.Timestamp; import java.util.List; @@ -72,7 +73,7 @@ private Metadata metadataFromSnapshot(Snapshot snapshot) { snapshot.getMetadata().getId(), Optional.of(tableDetails.name()), Optional.ofNullable(snapshot.getMetadata().getDescription()), - Metadata.Format.PARQUET, + ResponseFormat.parquet, new TableSchema(tableSchemaConverter.convertDeltaSchemaToWhitefox( snapshot.getMetadata().getSchema())), snapshot.getMetadata().getPartitionColumns(), @@ -185,9 +186,4 @@ private String location() { // remove all "/" at the end of the path return location.replaceAll("/+$", ""); } - - public static class DeltaShareTableFormat { - public static final String RESPONSE_FORMAT_PARQUET = "parquet"; - public static final String RESPONSE_FORMAT_DELTA = "delta"; - } } diff --git a/server/core/src/main/java/io/whitefox/core/services/DeltaSharesService.java b/server/core/src/main/java/io/whitefox/core/services/DeltaSharesService.java index 5ce6afeee..3d6bcc297 100644 --- a/server/core/src/main/java/io/whitefox/core/services/DeltaSharesService.java +++ b/server/core/src/main/java/io/whitefox/core/services/DeltaSharesService.java @@ -5,6 +5,7 @@ import io.whitefox.core.Schema; import io.whitefox.core.Share; import io.whitefox.core.SharedTable; +import io.whitefox.core.services.capabilities.ClientCapabilities; import java.sql.Timestamp; import java.util.List; import java.util.Optional; @@ -18,7 +19,11 @@ ContentAndToken> listShares( Optional nextPageToken, Optional maxResults); Optional getTableMetadata( - String share, String schema, String table, Optional startingTimestamp); + String share, + String schema, + String table, + Optional startingTimestamp, + ClientCapabilities clientCapabilities); Optional>> listSchemas( String share, Optional nextPageToken, Optional maxResults); @@ -33,5 +38,9 @@ Optional>> listTablesOfShare( String share, Optional token, Optional maxResults); ReadTableResult queryTable( - String share, String schema, String table, ReadTableRequest queryRequest); + String share, + String schema, + String table, + ReadTableRequest queryRequest, + ClientCapabilities clientCapabilities); } diff --git a/server/core/src/main/java/io/whitefox/core/services/DeltaSharesServiceImpl.java b/server/core/src/main/java/io/whitefox/core/services/DeltaSharesServiceImpl.java index 1c8c713aa..41e1355f0 100644 --- a/server/core/src/main/java/io/whitefox/core/services/DeltaSharesServiceImpl.java +++ b/server/core/src/main/java/io/whitefox/core/services/DeltaSharesServiceImpl.java @@ -1,6 +1,9 @@ package io.whitefox.core.services; import io.whitefox.core.*; +import io.whitefox.core.services.capabilities.ClientCapabilities; +import io.whitefox.core.services.capabilities.ResponseFormat; +import io.whitefox.core.services.exceptions.IncompatibleTableWithClient; import io.whitefox.core.services.exceptions.TableNotFound; import io.whitefox.persistence.StorageManager; import jakarta.enterprise.context.ApplicationScoped; @@ -8,6 +11,7 @@ import java.sql.Timestamp; import java.util.List; import java.util.Optional; +import java.util.function.Function; import java.util.stream.Collectors; import org.eclipse.microprofile.config.inject.ConfigProperty; @@ -61,11 +65,17 @@ public ContentAndToken> listShares( @Override public Optional getTableMetadata( - String share, String schema, String table, Optional startingTimestamp) { - return storageManager.getSharedTable(share, schema, table).flatMap(t -> tableLoaderFactory - .newTableLoader(t.internalTable()) - .loadTable(t) - .getMetadata(startingTimestamp)); + String share, + String schema, + String tableName, + Optional startingTimestamp, + ClientCapabilities clientCapabilities) { + var table = storageManager + .getSharedTable(share, schema, tableName) + .map(t -> tableLoaderFactory.newTableLoader(t.internalTable()).loadTable(t)); + return table + .flatMap(t -> t.getMetadata(startingTimestamp)) + .map(m -> checkResponseFormat(clientCapabilities, Metadata::format, m, tableName)); } @Override @@ -124,7 +134,11 @@ public Optional>> listTablesOfShare( @Override public ReadTableResult queryTable( - String share, String schema, String tableName, ReadTableRequest queryRequest) { + String share, + String schema, + String tableName, + ReadTableRequest queryRequest, + ClientCapabilities clientCapabilities) { SharedTable sharedTable = storageManager .getSharedTable(share, schema, tableName) .orElseThrow(() -> new TableNotFound(String.format( @@ -136,12 +150,32 @@ public ReadTableResult queryTable( .newTableLoader(sharedTable.internalTable()) .loadTable(sharedTable) .queryTable(queryRequest); - return new ReadTableResult( - readTableResultToBeSigned.protocol(), - readTableResultToBeSigned.metadata(), - readTableResultToBeSigned.other().stream() - .map(fileSigner::sign) - .collect(Collectors.toList()), - readTableResultToBeSigned.version()); + return checkResponseFormat( + clientCapabilities, + ReadTableResult::responseFormat, + new ReadTableResult( + readTableResultToBeSigned.protocol(), + readTableResultToBeSigned.metadata(), + readTableResultToBeSigned.other().stream() + .map(fileSigner::sign) + .collect(Collectors.toList()), + readTableResultToBeSigned.version(), + ResponseFormat.parquet), + tableName); + } + + private A checkResponseFormat( + ClientCapabilities clientCapabilities, + Function formatExtractor, + A formatContainer, + String tableName) { + if (!clientCapabilities + .responseFormat() + .isCompatibleWith(formatExtractor.apply(formatContainer))) { + throw new IncompatibleTableWithClient( + "Table " + tableName + " is not compatible with client " + clientCapabilities); + } else { + return formatContainer; + } } } diff --git a/server/core/src/main/java/io/whitefox/core/services/IcebergSharedTable.java b/server/core/src/main/java/io/whitefox/core/services/IcebergSharedTable.java index 395809b31..fb160b527 100644 --- a/server/core/src/main/java/io/whitefox/core/services/IcebergSharedTable.java +++ b/server/core/src/main/java/io/whitefox/core/services/IcebergSharedTable.java @@ -4,6 +4,7 @@ import io.whitefox.core.ReadTableRequest; import io.whitefox.core.ReadTableResultToBeSigned; import io.whitefox.core.TableSchema; +import io.whitefox.core.services.capabilities.ResponseFormat; import java.sql.Timestamp; import java.util.Optional; import java.util.stream.Collectors; @@ -41,7 +42,7 @@ private Metadata getMetadataFromSnapshot(Snapshot snapshot) { String.valueOf(snapshot.snapshotId()), Optional.of(icebergTable.name()), Optional.empty(), - Metadata.Format.PARQUET, + ResponseFormat.parquet, new TableSchema(tableSchemaConverter.convertIcebergSchemaToWhitefox( icebergTable.schema().asStruct())), icebergTable.spec().fields().stream() diff --git a/server/core/src/main/java/io/whitefox/core/services/capabilities/CapabilitiesConstants.java b/server/core/src/main/java/io/whitefox/core/services/capabilities/CapabilitiesConstants.java new file mode 100644 index 000000000..3cd4f982c --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/services/capabilities/CapabilitiesConstants.java @@ -0,0 +1,17 @@ +package io.whitefox.core.services.capabilities; + +// here because otherwise I could not use it in TableReaderFeatures +public class CapabilitiesConstants { + public static final String DELTA_SHARING_READER_FEATURE_DELETION_VECTOR = "deletionvectors"; + public static final String DELTA_SHARING_READER_FEATURE_COLUMN_MAPPING = "columnmapping"; + public static final String DELTA_SHARING_READER_FEATURE_TIMESTAMP_NTZ = "timestampntz"; + public static final String DELTA_SHARING_READER_FEATURE_DOMAIN_METADATA = "domainmetadata"; + public static final String DELTA_SHARING_READER_FEATURE_V2CHECKPOINT = "v2checkpoint"; + public static final String DELTA_SHARING_READER_FEATURE_CHECK_CONSTRAINTS = "checkconstraints"; + public static final String DELTA_SHARING_READER_FEATURE_GENERATED_COLUMNS = "generatedcolumns"; + public static final String DELTA_SHARING_READER_FEATURE_ALLOW_COLUMN_DEFAULTS = + "allowcolumndefaults"; + public static final String DELTA_SHARING_READER_FEATURE_IDENTITY_COLUMNS = "identitycolumns"; + public static final String ICEBERG_V1 = "icebergv1"; + public static final String ICEBERG_V2 = "icebergv2"; +} diff --git a/server/core/src/main/java/io/whitefox/core/services/capabilities/ClientCapabilities.java b/server/core/src/main/java/io/whitefox/core/services/capabilities/ClientCapabilities.java new file mode 100644 index 000000000..c4f3b2a8d --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/services/capabilities/ClientCapabilities.java @@ -0,0 +1,53 @@ +package io.whitefox.core.services.capabilities; + +import java.util.Collections; +import java.util.Set; + +public interface ClientCapabilities { + Set readerFeatures(); + + ResponseFormat responseFormat(); + + ParquetClientCapabilities PARQUET_INSTANCE = new ParquetClientCapabilities(); + + static DeltaClientCapabilities delta(Set readerFeatures) { + return new DeltaClientCapabilities(readerFeatures); + } + + static ParquetClientCapabilities parquet() { + return PARQUET_INSTANCE; + } + + class DeltaClientCapabilities implements ClientCapabilities { + private final Set readerFeatures; + + @Override + public Set readerFeatures() { + return readerFeatures; + } + + @Override + public ResponseFormat responseFormat() { + return ResponseFormat.delta; + } + + private DeltaClientCapabilities(Set readerFeatures) { + this.readerFeatures = Collections.unmodifiableSet(readerFeatures); + } + } + + class ParquetClientCapabilities implements ClientCapabilities { + + private ParquetClientCapabilities() {} + + @Override + public Set readerFeatures() { + return Set.of(); + } + + @Override + public ResponseFormat responseFormat() { + return ResponseFormat.parquet; + } + } +} diff --git a/server/core/src/main/java/io/whitefox/core/services/capabilities/ReaderFeatures.java b/server/core/src/main/java/io/whitefox/core/services/capabilities/ReaderFeatures.java new file mode 100644 index 000000000..deefe2466 --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/services/capabilities/ReaderFeatures.java @@ -0,0 +1,60 @@ +package io.whitefox.core.services.capabilities; + +import static io.whitefox.core.services.capabilities.CapabilitiesConstants.*; + +import io.whitefox.annotations.SkipCoverageGenerated; +import java.util.Optional; + +@SkipCoverageGenerated +public enum ReaderFeatures { + DELETION_VECTORS(DELTA_SHARING_READER_FEATURE_DELETION_VECTOR), + COLUMN_MAPPING(DELTA_SHARING_READER_FEATURE_COLUMN_MAPPING), + TIMESTAMP_NTZ(DELTA_SHARING_READER_FEATURE_TIMESTAMP_NTZ), + DOMAIN_METADATA(DELTA_SHARING_READER_FEATURE_DOMAIN_METADATA), + V2CHECKPOINT(DELTA_SHARING_READER_FEATURE_V2CHECKPOINT), + CHECK_CONSTRAINTS(DELTA_SHARING_READER_FEATURE_CHECK_CONSTRAINTS), + GENERATED_COLUMNS(DELTA_SHARING_READER_FEATURE_GENERATED_COLUMNS), + ALLOW_COLUMN_DEFAULTS(DELTA_SHARING_READER_FEATURE_ALLOW_COLUMN_DEFAULTS), + IDENTITY_COLUMNS(DELTA_SHARING_READER_FEATURE_IDENTITY_COLUMNS), + ICEBERG_V1(CapabilitiesConstants.ICEBERG_V1), + ICEBERG_V2(CapabilitiesConstants.ICEBERG_V2); + + ReaderFeatures(String stringRepresentation) { + this.stringRepresentation = stringRepresentation; + } + + private final String stringRepresentation; + + public String stringRepresentation() { + return stringRepresentation; + } + + public static Optional fromString(String s) { + switch (s.toLowerCase()) { + case DELTA_SHARING_READER_FEATURE_DELETION_VECTOR: + return Optional.of(DELETION_VECTORS); + case DELTA_SHARING_READER_FEATURE_COLUMN_MAPPING: + return Optional.of(COLUMN_MAPPING); + case DELTA_SHARING_READER_FEATURE_TIMESTAMP_NTZ: + return Optional.of(TIMESTAMP_NTZ); + case DELTA_SHARING_READER_FEATURE_DOMAIN_METADATA: + return Optional.of(DOMAIN_METADATA); + case DELTA_SHARING_READER_FEATURE_V2CHECKPOINT: + return Optional.of(V2CHECKPOINT); + case DELTA_SHARING_READER_FEATURE_CHECK_CONSTRAINTS: + return Optional.of(CHECK_CONSTRAINTS); + case DELTA_SHARING_READER_FEATURE_GENERATED_COLUMNS: + return Optional.of(GENERATED_COLUMNS); + case DELTA_SHARING_READER_FEATURE_ALLOW_COLUMN_DEFAULTS: + return Optional.of(ALLOW_COLUMN_DEFAULTS); + case DELTA_SHARING_READER_FEATURE_IDENTITY_COLUMNS: + return Optional.of(IDENTITY_COLUMNS); + case CapabilitiesConstants.ICEBERG_V1: + return Optional.of(ICEBERG_V1); + case CapabilitiesConstants.ICEBERG_V2: + return Optional.of(ICEBERG_V2); + default: + return Optional.empty(); + } + } +} diff --git a/server/core/src/main/java/io/whitefox/core/services/capabilities/ResponseFormat.java b/server/core/src/main/java/io/whitefox/core/services/capabilities/ResponseFormat.java new file mode 100644 index 000000000..bbd9b9f88 --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/services/capabilities/ResponseFormat.java @@ -0,0 +1,40 @@ +package io.whitefox.core.services.capabilities; + +import io.whitefox.core.services.exceptions.UnknownResponseFormat; + +public enum ResponseFormat { + parquet("parquet"), + delta("delta"); + + private final String stringRepresentation; + + public String stringRepresentation() { + return stringRepresentation; + } + + ResponseFormat(String str) { + stringRepresentation = str; + } + + /** + * This is seen from the client perspective, i.e. a parquet client is not compatible with a delta response + * while the other way around is compatible + */ + public boolean isCompatibleWith(ResponseFormat other) { + switch (this) { + case parquet: + switch (other) { + case parquet: + return true; + case delta: + return false; + default: + throw new UnknownResponseFormat("Unknown response format: " + other); + } + case delta: + return true; + default: + throw new UnknownResponseFormat("Unknown response format: " + this); + } + } +} diff --git a/server/core/src/main/java/io/whitefox/core/services/exceptions/IncompatibleTableWithClient.java b/server/core/src/main/java/io/whitefox/core/services/exceptions/IncompatibleTableWithClient.java new file mode 100644 index 000000000..9a9396e50 --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/services/exceptions/IncompatibleTableWithClient.java @@ -0,0 +1,7 @@ +package io.whitefox.core.services.exceptions; + +public class IncompatibleTableWithClient extends RuntimeException { + public IncompatibleTableWithClient(String message) { + super(message); + } +} diff --git a/server/core/src/main/java/io/whitefox/core/services/exceptions/UnknownResponseFormat.java b/server/core/src/main/java/io/whitefox/core/services/exceptions/UnknownResponseFormat.java new file mode 100644 index 000000000..1076ee18b --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/services/exceptions/UnknownResponseFormat.java @@ -0,0 +1,7 @@ +package io.whitefox.core.services.exceptions; + +public class UnknownResponseFormat extends IllegalArgumentException { + public UnknownResponseFormat(String message) { + super(message); + } +} diff --git a/server/core/src/test/java/io/whitefox/core/services/DeltaShareServiceTest.java b/server/core/src/test/java/io/whitefox/core/services/DeltaShareServiceTest.java index 90e2f8f17..e20697748 100644 --- a/server/core/src/test/java/io/whitefox/core/services/DeltaShareServiceTest.java +++ b/server/core/src/test/java/io/whitefox/core/services/DeltaShareServiceTest.java @@ -6,6 +6,7 @@ import io.whitefox.core.Schema; import io.whitefox.core.Share; import io.whitefox.core.SharedTable; +import io.whitefox.core.services.capabilities.ClientCapabilities; import io.whitefox.core.services.exceptions.TableNotFound; import io.whitefox.persistence.StorageManager; import io.whitefox.persistence.memory.InMemoryStorageManager; @@ -204,8 +205,8 @@ public void getDeltaTableMetadata() { StorageManager storageManager = new InMemoryStorageManager(shares); DeltaSharesService deltaSharesService = new DeltaSharesServiceImpl(storageManager, 100, tableLoaderFactory, fileSignerFactory); - var tableMetadata = - deltaSharesService.getTableMetadata("name", "default", "table1", Optional.empty()); + var tableMetadata = deltaSharesService.getTableMetadata( + "name", "default", "table1", Optional.empty(), ClientCapabilities.parquet()); Assertions.assertTrue(tableMetadata.isPresent()); Assertions.assertEquals( "56d48189-cdbc-44f2-9b0e-2bded4c79ed7", tableMetadata.get().id()); @@ -226,8 +227,8 @@ public void tableMetadataNotFound() { StorageManager storageManager = new InMemoryStorageManager(shares); DeltaSharesService deltaSharesService = new DeltaSharesServiceImpl(storageManager, 100, tableLoaderFactory, fileSignerFactory); - var resultTable = - deltaSharesService.getTableMetadata("name", "default", "tableNotFound", Optional.empty()); + var resultTable = deltaSharesService.getTableMetadata( + "name", "default", "tableNotFound", Optional.empty(), ClientCapabilities.parquet()); Assertions.assertTrue(resultTable.isEmpty()); } @@ -255,7 +256,8 @@ public void queryExistingTable() { "default", "partitioned-delta-table", new ReadTableRequest.ReadTableCurrentVersion( - Optional.empty(), Optional.empty(), Optional.empty())); + Optional.empty(), Optional.empty(), Optional.empty()), + ClientCapabilities.parquet()); Assertions.assertEquals(9, resultTable.files().size()); } @@ -276,6 +278,7 @@ public void queryNonExistingTable() { new DeltaSharesServiceImpl(storageManager, 100, tableLoaderFactory, fileSignerFactory); Assertions.assertThrows( TableNotFound.class, - () -> deltaSharesService.queryTable("name", "default", "tableNotFound", null)); + () -> deltaSharesService.queryTable( + "name", "default", "tableNotFound", null, ClientCapabilities.parquet())); } } diff --git a/server/core/src/test/java/io/whitefox/core/types/predicates/PredicateParsingTest.java b/server/core/src/test/java/io/whitefox/core/types/predicates/PredicateParsingTest.java index f70ea8730..fa6cb3d56 100644 --- a/server/core/src/test/java/io/whitefox/core/types/predicates/PredicateParsingTest.java +++ b/server/core/src/test/java/io/whitefox/core/types/predicates/PredicateParsingTest.java @@ -5,6 +5,7 @@ import io.whitefox.core.Metadata; import io.whitefox.core.PredicateUtils; import io.whitefox.core.TableSchema; +import io.whitefox.core.services.capabilities.ResponseFormat; import io.whitefox.core.types.DateType; import io.whitefox.core.types.StructField; import io.whitefox.core.types.StructType; @@ -36,7 +37,7 @@ void testParsingOfInvalidSql() { "id", Optional.empty(), Optional.empty(), - Metadata.Format.PARQUET, + ResponseFormat.parquet, new TableSchema( new StructType(List.of(new StructField("date", DateType.DATE, true, Map.of())))), List.of("date", "age"), @@ -60,7 +61,7 @@ void testParsingOfSqlEqual() throws PredicateException { "id", Optional.empty(), Optional.empty(), - Metadata.Format.PARQUET, + ResponseFormat.parquet, new TableSchema( new StructType(List.of(new StructField("date", DateType.DATE, true, Map.of())))), List.of("date", "age"),