Skip to content

support delta-sharing-capabilities header #201

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/app/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ tasks.jacocoTestCoverageVerification {
violationRules {
rule {
limit {
minimum = BigDecimal.valueOf(0.75)
minimum = BigDecimal.valueOf(0.77)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package io.whitefox.api.deltasharing;

import io.micrometer.common.util.StringUtils;
import io.whitefox.api.deltasharing.errors.InvalidDeltaSharingCapabilities;
import io.whitefox.api.server.DeltaHeaders;
import io.whitefox.core.services.capabilities.ClientCapabilities;
import io.whitefox.core.services.capabilities.ReaderFeatures;
import io.whitefox.core.services.capabilities.ResponseFormat;
import io.whitefox.core.services.exceptions.UnknownResponseFormat;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@ApplicationScoped
public class ClientCapabilitiesMapper implements DeltaHeaders {

/**
* @param header the string representation of the capabilities of the delta-sharing client
* @return the clean and business oriented version of it
*/
public ClientCapabilities parseDeltaSharingCapabilities(String header) {

if (header == null) {
return ClientCapabilities.parquet();
} else {
Map<String, Set<String>> rawValues = Arrays.stream(header.split(";", -1))
.flatMap(entry -> {
if (StringUtils.isBlank(entry)) {
return Stream.empty();
}
var keyAndValues = entry.split("=", -1);
if (keyAndValues.length != 2) {
throw new InvalidDeltaSharingCapabilities(String.format(
"Each %s must be in the format key=value", DELTA_SHARE_CAPABILITIES_HEADER));
}
var key = keyAndValues[0];
var values = Arrays.stream(keyAndValues[1].split(",", -1))
.collect(Collectors.toUnmodifiableSet());
return Stream.of(Map.entry(key, values));
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Set<String> responseFormats = rawValues.get(DELTA_SHARING_RESPONSE_FORMAT);
String theResponseFormat = pickResponseFormat(responseFormats);
if (ResponseFormat.parquet.stringRepresentation().equalsIgnoreCase(theResponseFormat)) {
return ClientCapabilities.parquet();
} else if (ResponseFormat.delta.stringRepresentation().equalsIgnoreCase(theResponseFormat)) {
var unparsed =
Optional.ofNullable(rawValues.get(DELTA_SHARING_READER_FEATURES)).orElse(Set.of());
return ClientCapabilities.delta(unparsed.stream()
.map(ReaderFeatures::fromString)
.flatMap(Optional::stream)
.collect(Collectors.toSet()));
} else {
throw new UnknownResponseFormat(
String.format("Unknown response format %s", theResponseFormat));
}
}
}

private String pickResponseFormat(Set<String> responseFormats) {
if (responseFormats == null || responseFormats.isEmpty()) {
return ResponseFormat.parquet.stringRepresentation();
} else {
// Quoting the protocol:
// > If there's a list of responseFormat specified, such as responseFormat=delta,parquet.
// > The server may choose to respond in parquet format if the table does not have any
// advanced features.
// > The server must respond in delta format if the table has advanced features which are not
// compatible
// > with the parquet format.
// so here we choose to return delta (if present) so that the service can choose to downgrade
// it
// for compatibility reasons
if (responseFormats.contains(ResponseFormat.delta.stringRepresentation())) {
return ResponseFormat.delta.stringRepresentation();
} else {
return responseFormats.iterator().next();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package io.whitefox.api.deltasharing;

import io.whitefox.api.deltasharing.model.v1.Format;
import io.whitefox.api.deltasharing.model.v1.TableMetadataResponse;
import io.whitefox.api.deltasharing.model.v1.TableQueryResponse;
import io.whitefox.api.deltasharing.model.v1.generated.*;
import io.whitefox.api.deltasharing.model.v1.parquet.ParquetFile;
import io.whitefox.api.deltasharing.model.v1.parquet.ParquetMetadata;
import io.whitefox.api.deltasharing.model.v1.parquet.ParquetProtocol;
import io.whitefox.api.server.CommonMappers;
import io.whitefox.api.server.WhitefoxMappers;
import io.whitefox.core.*;
import io.whitefox.core.Schema;
import io.whitefox.core.Share;
import io.whitefox.core.services.capabilities.ResponseFormat;
import java.util.*;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -67,19 +68,27 @@ public static TableQueryResponse readTableResult2api(ReadTableResult readTableRe
}

private static ParquetMetadata metadata2Api(Metadata metadata) {
return ParquetMetadata.builder()
.metadata(ParquetMetadata.Metadata.builder()
.id(metadata.id())
.name(metadata.name())
.description(metadata.description())
.format(WhitefoxMappers.format2api(metadata.format()))
.schemaString(metadata.tableSchema().structType().toJson())
.partitionColumns(metadata.partitionColumns())
.configuration(Optional.of(metadata.configuration()))
.version(metadata.version())
.numFiles(metadata.numFiles())
.build())
.build();
switch (metadata.format()) {
case parquet:
return ParquetMetadata.builder()
.metadata(ParquetMetadata.Metadata.builder()
.id(metadata.id())
.name(metadata.name())
.description(metadata.description())
.format(new Format())
.schemaString(metadata.tableSchema().structType().toJson())
.partitionColumns(metadata.partitionColumns())
.configuration(Optional.ofNullable(metadata.configuration()))
.version(metadata.version())
.numFiles(metadata.numFiles())
.build())
.build();
case delta:
throw new IllegalArgumentException("Delta response format is not supported");
default:
throw new IllegalArgumentException(
String.format("%s response format is not supported", metadata.format()));
}
}

private static ParquetProtocol protocol2Api(Protocol protocol) {
Expand Down Expand Up @@ -115,22 +124,10 @@ public static io.whitefox.api.deltasharing.model.v1.generated.Table table2api(
}

/**
* NOTE: this is an undocumented feature of the reference impl of delta-sharing, it's not part of the
* protocol
* ----
* Return the [[io.whitefox.api.server.DeltaHeaders.DELTA_SHARE_CAPABILITIES_HEADER]] header
* that will be set in the response w/r/t the one received in the request.
* If the request did not contain any, we will return an empty one.
* Serializes the response format in its text-based representation
*/
public static Map<String, String> toHeaderCapabilitiesMap(String headerCapabilities) {
if (headerCapabilities == null) {
return Map.of();
}
return Arrays.stream(headerCapabilities.toLowerCase().split(";"))
.map(h -> h.split("="))
.filter(h -> h.length == 2)
.map(splits -> Map.entry(splits[0], splits[1]))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
public static String toResponseFormatHeader(ResponseFormat responseFormat) {
return responseFormat.stringRepresentation();
}

public static TableMetadataResponse toTableResponseMetadata(Metadata m) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.whitefox.api.deltasharing.errors;

public class InvalidDeltaSharingCapabilities extends IllegalArgumentException {
public InvalidDeltaSharingCapabilities(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static io.whitefox.api.server.CommonMappers.mapList;

import io.whitefox.api.deltasharing.ClientCapabilitiesMapper;
import io.whitefox.api.deltasharing.DeltaMappers;
import io.whitefox.api.deltasharing.encoders.DeltaPageTokenEncoder;
import io.whitefox.api.deltasharing.model.v1.generated.ListSchemasResponse;
Expand Down Expand Up @@ -29,18 +30,22 @@ public class DeltaSharesApiImpl implements DeltaApiApi, ApiUtils {
private final TableMetadataSerializer tableResponseSerializer;
private final TableQueryResponseSerializer tableQueryResponseSerializer;

private final ClientCapabilitiesMapper clientCapabilitiesMapper;

@Inject
public DeltaSharesApiImpl(
DeltaSharesService deltaSharesService,
ShareService shareService,
DeltaPageTokenEncoder encoder,
TableMetadataSerializer tableResponseSerializer,
TableQueryResponseSerializer tableQueryResponseSerializer) {
TableQueryResponseSerializer tableQueryResponseSerializer,
ClientCapabilitiesMapper clientCapabilitiesMapper) {
this.deltaSharesService = deltaSharesService;
this.tokenEncoder = encoder;
this.tableResponseSerializer = tableResponseSerializer;
this.tableQueryResponseSerializer = tableQueryResponseSerializer;
this.shareService = shareService;
this.clientCapabilitiesMapper = clientCapabilitiesMapper;
}

@Override
Expand All @@ -62,9 +67,10 @@ public Response getTableChanges(
String endingTimestamp,
Boolean includeHistoricalMetadata,
String deltaSharingCapabilities) {
return Response.ok().build();
return Response.status(501).build();
}

// TODO handle capabilities
@Override
public Response getTableMetadata(
String share,
Expand All @@ -75,20 +81,21 @@ public Response getTableMetadata(
return wrapExceptions(
() -> {
var startingTimestamp = parseTimestamp(startingTimestampStr);
var clientCapabilities =
clientCapabilitiesMapper.parseDeltaSharingCapabilities(deltaSharingCapabilities);
return optionalToNotFound(
deltaSharesService.getTableMetadata(share, schema, table, startingTimestamp),
deltaSharesService.getTableMetadata(
share, schema, table, startingTimestamp, clientCapabilities),
m -> optionalToNotFound(
deltaSharesService.getTableVersion(share, schema, table, startingTimestamp),
v -> Response.ok(
tableResponseSerializer.serialize(
DeltaMappers.toTableResponseMetadata(m)),
ndjsonMediaType)
.status(Response.Status.OK.getStatusCode())
.header(DELTA_TABLE_VERSION_HEADER, String.valueOf(v))
.header(
DELTA_SHARE_CAPABILITIES_HEADER,
getResponseFormatHeader(
DeltaMappers.toHeaderCapabilitiesMap(deltaSharingCapabilities)))
DeltaMappers.toResponseFormatHeader(m.format()))
.build()));
},
exceptionToResponse);
Expand Down Expand Up @@ -190,15 +197,18 @@ public Response queryTable(
return wrapExceptions(
() -> {
var readResult = deltaSharesService.queryTable(
share, schema, table, DeltaMappers.api2ReadTableRequest(queryRequest));
share,
schema,
table,
DeltaMappers.api2ReadTableRequest(queryRequest),
clientCapabilitiesMapper.parseDeltaSharingCapabilities(deltaSharingCapabilities));
var serializedReadResult =
tableQueryResponseSerializer.serialize(DeltaMappers.readTableResult2api(readResult));
return Response.ok(serializedReadResult, ndjsonMediaType)
.header(DELTA_TABLE_VERSION_HEADER, readResult.version())
.header(
DELTA_SHARE_CAPABILITIES_HEADER,
getResponseFormatHeader(
DeltaMappers.toHeaderCapabilitiesMap(deltaSharingCapabilities)))
DeltaMappers.toResponseFormatHeader(readResult.responseFormat()))
.build();
},
exceptionToResponse);
Expand Down
14 changes: 0 additions & 14 deletions server/app/src/main/java/io/whitefox/api/server/ApiUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@
import io.quarkus.runtime.util.ExceptionUtil;
import io.whitefox.api.deltasharing.model.v1.generated.CommonErrorResponse;
import io.whitefox.core.Principal;
import io.whitefox.core.services.DeltaSharedTable;
import io.whitefox.core.services.exceptions.AlreadyExists;
import io.whitefox.core.services.exceptions.NotFound;
import jakarta.ws.rs.core.Response;
import java.sql.Timestamp;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -70,18 +68,6 @@ default <T> Response optionalToNotFound(Optional<T> opt, Function<T, Response> f
return opt.map(fn).orElse(notFoundResponse());
}

default String getResponseFormatHeader(Map<String, String> deltaSharingCapabilities) {
return String.format(
"%s=%s",
DeltaHeaders.DELTA_SHARING_RESPONSE_FORMAT, getResponseFormat(deltaSharingCapabilities));
}

default String getResponseFormat(Map<String, String> deltaSharingCapabilities) {
return deltaSharingCapabilities.getOrDefault(
DeltaHeaders.DELTA_SHARING_RESPONSE_FORMAT,
DeltaSharedTable.DeltaShareTableFormat.RESPONSE_FORMAT_PARQUET);
}

default Principal getRequestPrincipal() {
return new Principal("Mr. Fox");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

public interface DeltaHeaders {
String DELTA_SHARING_RESPONSE_FORMAT = "responseformat";
String DELTA_SHARING_READER_FEATURES = "readerfeatures";
String DELTA_TABLE_VERSION_HEADER = "Delta-Table-Version";
String DELTA_SHARE_CAPABILITIES_HEADER = "delta-sharing-capabilities";
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package io.whitefox.api.server;

import io.whitefox.api.deltasharing.model.v1.Format;
import io.whitefox.api.deltasharing.model.v1.parquet.ParquetMetadata;
import io.whitefox.api.deltasharing.model.v1.parquet.ParquetProtocol;
import io.whitefox.api.model.v1.generated.*;
import io.whitefox.core.*;
Expand Down Expand Up @@ -172,31 +170,6 @@ public static MetastoreType api2MetastoreType(
}
}

private static ParquetMetadata metadata2Api(Metadata metadata) {
return ParquetMetadata.builder()
.metadata(ParquetMetadata.Metadata.builder()
.id(metadata.id())
.name(metadata.name())
.description(metadata.description())
.format(format2api(metadata.format()))
.schemaString(metadata.tableSchema().structType().toJson())
.partitionColumns(metadata.partitionColumns())
.configuration(Optional.ofNullable(metadata.configuration()))
.version(metadata.version())
.numFiles(metadata.numFiles())
.build())
.build();
}

public static Format format2api(Metadata.Format format) {
switch (format) {
case PARQUET:
return new Format();
}
// never gonna happen, java is dumb
return null;
}

private static ParquetProtocol protocol2Api(Protocol protocol) {
return ParquetProtocol.ofMinReaderVersion(protocol.minReaderVersion().orElse(1));
}
Expand Down
Loading