Skip to content

Commit 8a80003

Browse files
wip
1 parent 7bb1f7e commit 8a80003

17 files changed

+751
-323
lines changed

protocol/delta-sharing-protocol-api.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -787,7 +787,7 @@ components:
787787

788788
# This is not used for the spec but comes handy for autogeneration
789789
TableQueryResponseObject:
790-
anyOf:
790+
oneOf:
791791
- $ref: '#/components/schemas/ParquetTableQueryResponseObject'
792792
- $ref: '#/components/schemas/DeltaTableQueryResponseObject'
793793
ParquetTableQueryResponseObject:

server/app/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ val serverGeneratorProperties = mapOf(
4949
"dateLibrary" to "java8",
5050
"disallowAdditionalPropertiesIfNotPresent" to "false",
5151
"generateBuilders" to "false",
52+
"legacyDiscriminatorBehavior" to "false",
5253
"generatePom" to "false",
5354
"interfaceOnly" to "true",
5455
"library" to "quarkus",

server/app/src/main/java/io/whitefox/api/deltasharing/DeltaMappers.java

Lines changed: 39 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.whitefox.core.*;
66
import io.whitefox.core.Schema;
77
import io.whitefox.core.Share;
8+
import io.whitefox.core.services.DeltaSharingCapabilities;
89
import java.util.*;
910
import java.util.stream.Collectors;
1011

@@ -60,44 +61,44 @@ public static TableQueryResponseObject readTableResult2api(ReadTableResult readT
6061

6162
private static ParquetMetadataObject metadata2Api(Metadata metadata) {
6263
return new ParquetMetadataObject()
63-
.metaData(new ParquetMetadataObjectMetaData()
64-
.numFiles(metadata.numFiles().orElse(null))
65-
.version(metadata.version().orElse(null))
66-
.size(metadata.size().orElse(null))
67-
.id(metadata.id())
68-
.name(metadata.name().orElse(null))
69-
.description(metadata.description().orElse(null))
70-
.format(new ParquetFormatObject().provider(metadata.format().provider()))
71-
.schemaString(metadata.tableSchema().structType().toJson())
72-
.partitionColumns(metadata.partitionColumns())
73-
._configuration(metadata.configuration()));
64+
.metaData(new ParquetMetadataObjectMetaData()
65+
.numFiles(metadata.numFiles().orElse(null))
66+
.version(metadata.version())
67+
.size(metadata.size().orElse(null))
68+
.id(metadata.id())
69+
.name(metadata.name().orElse(null))
70+
.description(metadata.description().orElse(null))
71+
.format(new ParquetFormatObject().provider(metadata.format().provider()))
72+
.schemaString(metadata.tableSchema().structType().toJson())
73+
.partitionColumns(metadata.partitionColumns())
74+
._configuration(metadata.configuration()));
7475
}
7576

7677
private static DeltaProtocolObject protocol2Api(Protocol protocol) {
7778
return new DeltaProtocolObject()
78-
.protocol(new DeltaProtocolObjectProtocol()
79-
.deltaProtocol(new DeltaProtocolObjectProtocolDeltaProtocol()
80-
.minReaderVersion(protocol.minReaderVersion().orElse(1))
81-
.minWriterVersion(protocol.minWriterVersion().orElse(1))));
79+
.protocol(new DeltaProtocolObjectProtocol()
80+
.deltaProtocol(new DeltaProtocolObjectProtocolDeltaProtocol()
81+
.minReaderVersion(protocol.minReaderVersion().orElse(1))
82+
.minWriterVersion(protocol.minWriterVersion().orElse(1))));
8283
}
8384

8485
private static DeltaFileObject file2Api(TableFile f) {
8586
return new DeltaFileObject()
86-
.id(f.id())
87-
.version(f.version().orElse(null))
88-
.deletionVectorFileId(null) // TODO
89-
.timestamp(f.timestamp().orElse(null))
90-
.expirationTimestamp(f.expirationTimestamp())
91-
.deltaSingleAction(new DeltaSingleAction()
92-
._file(new DeltaAddFileAction()
93-
.id(f.id())
94-
.url(f.url())
95-
.partitionValues(f.partitionValues())
96-
.size(f.size())
97-
.stats(f.stats().orElse(null))
98-
.version(f.version().orElse(null))
99-
.timestamp(f.timestamp().orElse(null))
100-
.expirationTimestamp(f.expirationTimestamp())));
87+
.id(f.id())
88+
.version(f.version().orElse(null))
89+
.deletionVectorFileId(null) // TODO
90+
.timestamp(f.timestamp().orElse(null))
91+
.expirationTimestamp(f.expirationTimestamp())
92+
.deltaSingleAction(new DeltaSingleAction()
93+
._file(new DeltaAddFileAction()
94+
.id(f.id())
95+
.url(f.url())
96+
.partitionValues(f.partitionValues())
97+
.size(f.size())
98+
.stats(f.stats().orElse(null))
99+
.version(f.version().orElse(null))
100+
.timestamp(f.timestamp().orElse(null))
101+
.expirationTimestamp(f.expirationTimestamp())));
101102
}
102103

103104
public static TableReferenceAndReadRequest api2TableReferenceAndReadRequest(
@@ -113,28 +114,16 @@ public static io.whitefox.api.deltasharing.model.v1.generated.Table table2api(
113114
.schema(sharedTable.schema());
114115
}
115116

116-
/**
117-
* NOTE: this is an undocumented feature of the reference impl of delta-sharing, it's not part of the
118-
* protocol
119-
* ----
120-
* Return the [[io.whitefox.api.server.DeltaHeaders.DELTA_SHARE_CAPABILITIES_HEADER]] header
121-
* that will be set in the response w/r/t the one received in the request.
122-
* If the request did not contain any, we will return an empty one.
123-
*/
124-
public static Map<String, String> toHeaderCapabilitiesMap(String headerCapabilities) {
125-
if (headerCapabilities == null) {
126-
return Map.of();
127-
}
128-
return Arrays.stream(headerCapabilities.toLowerCase().split(";"))
129-
.map(h -> h.split("="))
130-
.filter(h -> h.length == 2)
131-
.map(splits -> Map.entry(splits[0], splits[1]))
132-
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
133-
}
134-
135117
public static TableMetadataResponseObject toTableResponseMetadata(Metadata m) {
136118
return new TableMetadataResponseObject()
137-
.protocol(new ParquetProtocolObject().protocol(new ParquetProtocolObjectProtocol().minReaderVersion(1)))
119+
.protocol(new ParquetProtocolObject()
120+
.protocol(new ParquetProtocolObjectProtocol().minReaderVersion(1)))
138121
.metadata(metadata2Api(m));
139122
}
123+
124+
public static String toCapabilitiesHeader(DeltaSharingCapabilities deltaSharingCapabilities) {
125+
return deltaSharingCapabilities.values().entrySet().stream()
126+
.map(entry -> entry.getKey() + "=" + String.join(",", entry.getValue()))
127+
.collect(Collectors.joining(";"));
128+
}
140129
}

server/app/src/main/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImpl.java

Lines changed: 39 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import io.whitefox.api.server.ApiUtils;
1515
import io.whitefox.core.services.ContentAndToken;
1616
import io.whitefox.core.services.DeltaSharesService;
17+
import io.whitefox.core.services.DeltaSharingCapabilities;
1718
import io.whitefox.core.services.ShareService;
1819
import jakarta.inject.Inject;
1920
import jakarta.ws.rs.core.MediaType;
@@ -60,8 +61,9 @@ public Response getTableChanges(
6061
Integer startingVersion,
6162
Integer endingVersion,
6263
String endingTimestamp,
63-
Boolean includeHistoricalMetadata) {
64-
return Response.ok().build();
64+
Boolean includeHistoricalMetadata,
65+
String deltaSharingCapabilities) {
66+
return Response.status(Response.Status.NOT_IMPLEMENTED).build();
6567
}
6668

6769
@Override
@@ -72,20 +74,22 @@ public Response getTableMetadata(
7274
String startingTimestamp,
7375
String deltaSharingCapabilities) {
7476
return wrapExceptions(
75-
() -> optionalToNotFound(
76-
deltaSharesService.getTableMetadata(share, schema, table, startingTimestamp),
77-
m -> optionalToNotFound(
78-
deltaSharesService.getTableVersion(share, schema, table, startingTimestamp),
79-
v -> Response.ok(
80-
tableResponseSerializer.serialize(DeltaMappers.toTableResponseMetadata(m)),
81-
ndjsonMediaType)
82-
.status(Response.Status.OK.getStatusCode())
83-
.header(DELTA_TABLE_VERSION_HEADER, String.valueOf(v))
84-
.header(
85-
DELTA_SHARE_CAPABILITIES_HEADER,
86-
getResponseFormatHeader(
87-
DeltaMappers.toHeaderCapabilitiesMap(deltaSharingCapabilities)))
88-
.build())),
77+
() -> {
78+
DeltaSharingCapabilities requestCapabilities =
79+
new DeltaSharingCapabilities(deltaSharingCapabilities);
80+
return optionalToNotFound(
81+
deltaSharesService.getTableMetadata(
82+
share, schema, table, startingTimestamp, requestCapabilities),
83+
m -> Response.ok(
84+
tableResponseSerializer.serialize(DeltaMappers.toTableResponseMetadata(m)),
85+
ndjsonMediaType)
86+
.status(Response.Status.OK.getStatusCode())
87+
.header(DELTA_TABLE_VERSION_HEADER, String.valueOf(m.version()))
88+
.header(
89+
DeltaSharingCapabilities.DELTA_SHARE_CAPABILITIES_HEADER,
90+
DeltaMappers.toCapabilitiesHeader(m.tableCapabilities()))
91+
.build());
92+
},
8993
exceptionToResponse);
9094
}
9195

@@ -193,20 +197,25 @@ public Response queryTable(
193197
return wrapExceptions(
194198
() -> optionalToNotFound(
195199
deltaSharesService.getTableVersion(share, schema, table, startingTimestamp),
196-
version -> Response.ok(
197-
tableQueryResponseSerializer.serialize(
198-
DeltaMappers.readTableResult2api(deltaSharesService.queryTable(
199-
share,
200-
schema,
201-
table,
202-
DeltaMappers.api2ReadTableRequest(queryRequest)))),
203-
ndjsonMediaType)
204-
.header(DELTA_TABLE_VERSION_HEADER, version)
205-
.header(
206-
DELTA_SHARE_CAPABILITIES_HEADER,
207-
getResponseFormatHeader(
208-
DeltaMappers.toHeaderCapabilitiesMap(deltaSharingCapabilities)))
209-
.build()),
200+
version -> {
201+
var capabilities = new DeltaSharingCapabilities(deltaSharingCapabilities);
202+
var readTableResult = deltaSharesService.queryTable(
203+
share,
204+
schema,
205+
table,
206+
DeltaMappers.api2ReadTableRequest(queryRequest),
207+
capabilities);
208+
return Response.ok(
209+
tableQueryResponseSerializer.serialize(
210+
DeltaMappers.readTableResult2api(readTableResult)),
211+
ndjsonMediaType)
212+
.header(DELTA_TABLE_VERSION_HEADER, version)
213+
.header(
214+
DeltaSharingCapabilities.DELTA_SHARE_CAPABILITIES_HEADER,
215+
DeltaMappers.toCapabilitiesHeader(
216+
readTableResult.metadata().tableCapabilities()))
217+
.build();
218+
}),
210219
exceptionToResponse);
211220
}
212221

server/app/src/main/java/io/whitefox/api/server/ApiUtils.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,6 @@ default <T> Response optionalToNotFound(Optional<T> opt, Function<T, Response> f
6060
return opt.map(fn).orElse(notFoundResponse());
6161
}
6262

63-
default String getResponseFormatHeader(Map<String, String> deltaSharingCapabilities) {
64-
return String.format(
65-
"%s=%s",
66-
DeltaHeaders.DELTA_SHARING_RESPONSE_FORMAT, getResponseFormat(deltaSharingCapabilities));
67-
}
68-
6963
default String getResponseFormat(Map<String, String> deltaSharingCapabilities) {
7064
return deltaSharingCapabilities.getOrDefault(
7165
DeltaHeaders.DELTA_SHARING_RESPONSE_FORMAT,
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
package io.whitefox.api.server;
22

3+
import io.whitefox.core.services.DeltaSharingCapabilities;
4+
35
public interface DeltaHeaders {
4-
String DELTA_SHARING_RESPONSE_FORMAT = "responseformat";
6+
String DELTA_SHARING_RESPONSE_FORMAT = DeltaSharingCapabilities.DELTA_SHARING_RESPONSE_FORMAT;
57
String DELTA_TABLE_VERSION_HEADER = "Delta-Table-Version";
6-
String DELTA_SHARE_CAPABILITIES_HEADER = "delta-sharing-capabilities";
78
}

server/app/src/main/java/io/whitefox/api/server/WhitefoxMappers.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ public static MetastoreType api2MetastoreType(
173173
}
174174
}
175175

176+
// TODO we need to resolve the json schema anyOf/oneOf problem
176177
private static MetadataObject metadata2Api(Metadata metadata) {
177178
return new MetadataObject()
178179
.metaData(new MetadataObjectMetaData()

0 commit comments

Comments
 (0)