From a4eaa9ac506f9ff94b536c2f6c67ebaf83914b25 Mon Sep 17 00:00:00 2001 From: Antonio Murgia Date: Mon, 29 Jan 2024 17:40:44 +0100 Subject: [PATCH 1/5] delta-sharing protocols --- .../docs/protocols/delta-sharing-protocol.md | 278 ++++++++++++++++- protocol/delta-sharing-protocol-api.yml | 279 ++++++++++++++++-- 2 files changed, 525 insertions(+), 32 deletions(-) diff --git a/docsite/docs/protocols/delta-sharing-protocol.md b/docsite/docs/protocols/delta-sharing-protocol.md index 35a6fb5d4..e73ec48f1 100644 --- a/docsite/docs/protocols/delta-sharing-protocol.md +++ b/docsite/docs/protocols/delta-sharing-protocol.md @@ -1376,12 +1376,39 @@ delta-table-version: 123 This is the API for clients to query the table schema and other metadata. -HTTP Request | Value --|- -Method | `GET` -Header | `Authorization: Bearer [token]` -URL | `{prefix}/shares/{share}/schemas/{schema}/tables/{table}/metadata` -URL Parameters | **\{share\}**: The share name to query. It's case-insensitive.
**\{schema\}**: The schema name to query. It's case-insensitive.
**\{table\}**: The table name to query. It's case-insensitive. + + + + + + + + + + + + + + + + + + + + + +
HTTP RequestValue
Method`GET`
Headers +`Authorization: Bearer {token}` + +Optional: `delta-sharing-capabilities: responseformat=delta;readerfeatures=deletionvectors`, see +[Delta Sharing Capabilities Header](#delta-sharing-capabilities-header) for details. +
URL +`{prefix}/shares/{share}/schemas/{schema}/tables/{table}/metadata` +
URL Parameters +**\{share\}**: The share name to query. It's case-insensitive.
+**\{schema\}**: The schema name to query. It's case-insensitive.
+**\{table\}**: The table name to query. It's case-insensitive. +
200: The table metadata was successfully returned. @@ -1409,10 +1436,17 @@ URL Parameters | **\{share\}**: The share name to query. It's case-insensitive.< A sequence of JSON strings delimited by newline. Each line is a JSON object defined in [API Response Format](#api-response-format). +When `responseformat=parquet`, each line is a JSON object defined in [API Response Format in Parquet](#api-response-format-in-parquet). + The response contains two lines: - The first line is [a JSON wrapper object](#json-wrapper-object-in-each-line) containing the table [Protocol](#protocol) object. - The second line is [a JSON wrapper object](#json-wrapper-object-in-each-line) containing the table [Metadata](#metadata) object. +When `responseformat=delta`, each line is a Json object defined in [API Response Format in Delta](#api-response-format-in-delta). +The response contains two lines: +- The first line is [a JSON wrapper object](#json-wrapper-object-in-each-line-in-delta) containing the delta [Protocol](#protocol-in-delta-format) object. +- The second line is [a JSON wrapper object](#json-wrapper-object-in-each-line-in-delta) containing the delta [Metadata](#metadata-in-delta-format) object. + @@ -1572,7 +1606,7 @@ The response contains two lines:
-Example (See [API Response Format](#api-response-format) for more details about the format): +Example (See [API Response Format in Parquet](#api-response-format-in-parquet) for more details about the format): `GET {prefix}/shares/vaccine_share/schemas/acme_vaccine_data/tables/vaccine_patients/metadata` @@ -1627,6 +1661,10 @@ This is the API for clients to read data from a table. Optional: `Content-Type: application/json; charset=utf-8` +Optional: `delta-sharing-capabilities: responseformat=delta;readerfeatures=deletionvectors`, see +[Delta Sharing Capabilities Header](#delta-sharing-capabilities-header) for details. + + @@ -1697,7 +1735,8 @@ returned in the response. Body -A sequence of JSON strings delimited by newline. Each line is a JSON object defined in [API Response Format](#api-response-format). +When `responseformat=parquet`, a sequence of JSON strings delimited by newline. Each line is a JSON object defined in [API Response Format in Parquet](#api-response-format-in-parquet). + The response contains multiple lines: - The first line is [a JSON wrapper object](#json-wrapper-object-in-each-line) containing the table [Protocol](#protocol) object. @@ -1707,6 +1746,18 @@ The response contains multiple lines: - The lines are [files](#file) in the table (otherwise). - The ordering of the lines doesn't matter. +When `responseformat=delta`, a sequence of JSON strings delimited by newline. Each line is a JSON object defined in [API Response Format in Delta](#api-response-format-in-delta). + +The response contains multiple lines: +- The first line is [a JSON wrapper object](#json-wrapper-object-in-each-line-in-delta) containing the delta [Protocol](#protocol-in-delta-format) object. +- The second line is [a JSON wrapper object](#json-wrapper-object-in-each-line-in-delta) containing the delta [Metadata](#metadata-in-delta-format) object. +- The rest of the lines are [JSON wrapper objects](#json-wrapper-object-in-each-line-in-delta) for [Metadata](#metadata-in-delta-format), or [files](#file-in-delta-format). + - The lines are [files](#file-in-delta-format) which wraps the delta single action in the table (otherwise), with possible historical [Metadata](#metadata-in-delta-format) (when startingVersion is set). + - The ordering of the lines doesn't matter. + +The delta actions are wrapped because they will be used to construct a local delta log on the recipient +side and then leverage the delta library to read data. + @@ -1899,7 +1950,7 @@ The request body should be a JSON string containing the following optional field When `predicateHints` and `limitHint` are both present, the server should apply `predicateHints` first then `limitHint`. As these two parameters are hints rather than enforcement, the client must always apply `predicateHints` and `limitHint` on the response returned by the server if it wishes to filter and limit the returned data. An empty JSON object (`{}`) should be provided when these two parameters are missing. -Example (See [API Response Format](#api-response-format) for more details about the format): +Example (See [API Response Format in Parquet](#api-response-format-in-parquet) for more details about the format): `POST {prefix}/shares/vaccine_share/schemas/acme_vaccine_data/tables/vaccine_patients/query` @@ -1993,6 +2044,8 @@ The change data feed represents row-level changes between versions of a Delta ta `Authorization: Bearer [token]` +Optional: `delta-sharing-capabilities: responseformat=delta;readerfeatures=deletionvectors`, see [Delta Sharing Capabilities Header](#delta-sharing-capabilities-header) for details. + @@ -2048,7 +2101,7 @@ The change data feed represents row-level changes between versions of a Delta ta Body -A sequence of JSON strings delimited by newline. Each line is a JSON object defined in [API Response Format](#api-response-format). +When `responseformat=parquet`, a sequence of JSON strings delimited by newline. Each line is a JSON object defined in [API Response Format in Parquet](#api-response-format-in-parquet). The response contains multiple lines: - The first line is [a JSON wrapper object](#json-wrapper-object-in-each-line) containing the table [Protocol](#protocol) object. @@ -2057,6 +2110,13 @@ The response contains multiple lines: - Historical [Metadata](#metadata) will be returned if includeHistoricalMetadata is set to true. - The ordering of the lines doesn't matter. +When `responseformat=delta`, a sequence of JSON strings delimited by newline. Each line is a JSON object defined in [API Response Format in Parquet](#api-response-format-in-delta). +- The first line is [a JSON wrapper object](#json-wrapper-object-in-each-line-in-delta) containing the delta [Protocol](#protocol-in-delta-format) object. +- The second line is [a JSON wrapper object](#json-wrapper-object-in-each-line-in-delta) containing the delta [Metadata](#metadata-in-delta-format) object. +- The rest of the lines are [JSON wrapper objects](#json-wrapper-object-in-each-line) for [Files](#file-in-delta-format) of the change data feed. + - Historical [Metadata](#metadata) will be returned if includeHistoricalMetadata is set to true. + - The ordering of the lines doesn't matter. + @@ -2216,7 +2276,7 @@ The response contains multiple lines: -Example (See [API Response Format](#api-response-format) for more details about the format): +Example (See [API Response Format in Parquet](#api-response-format-in-parquet) for more details about the format): `GET {prefix}/shares/vaccine_share/schemas/acme_vaccine_data/tables/vaccine_patients/changes?startingVersion=0&endingVersion=2` @@ -2289,7 +2349,61 @@ content-type: application/x-ndjson; charset=utf-8 ### Timestamp Format Accepted timestamp format by a delta sharing server: in the ISO8601 format, in the UTC timezone, such as `2022-01-01T00:00:00Z`. -## API Response Format +## Delta Sharing Capabilities Header + +This section explains the details of delta sharing capabilities header, which was introduced to help +delta sharing catch up with features in [delta protocol](https://github.com/delta-io/delta/blob/master/PROTOCOL.md). + +The key of the header is **delta-sharing-capabilities**, the value is semicolon separated capabilities. +Each capability is in the format of "key=value1,value2", values are separated by commas. +Example: "responseformat=delta;readerfeatures=deletionvectors,columnmapping". All keys and values should +be case-insensitive when processed by the server. + +This header can be used in the request for [Query Table Metadata](#query-table-metadata), +[Query Table](#read-data-from-a-table), and [Query Table Changes](#read-change-data-feed-from-a-table). + +**Compatibility** + + + + + + + + + + + + + + + + + +
Client/ServerServer that doesn't recognize the headerServer that recognizes the header
Client that doesn't specify the headerResponse is in parquet formatResponse must be in parquet format.
Client that specifies the headerThe header is ignored at the server, and the format of the response must always be parquet.The header is processed properly by the server.
If there's only one responseFormat specified, the server must respect and return in the requested format.
If there's a list of responseFormat specified, such as `responseFormat=delta,parquet`. The server may choose to respond in parquet format if the table does not have any advanced features. The server must respond in delta format if the table has advanced features which are not compatible with the parquet format.
+ +- If the client requests `delta` format and the response is in `parquet` format, the delta sharing +client will NOT throw an error. Ideally, the caller of the client's method should handle such +responses to be compatible with legacy servers. +- If the client doesn't specify any header, or requests `parquet` format and the response is in +`delta` format, the delta sharing client must throw an error. + +### responseFormat +Indicates the format to expect in the [API Response Format in Parquet](#api-response-format-in-parquet), two values are supported. + +- parquet: Represents the format of the delta sharing protocol that has been used in `delta-sharing-spark` 1.0 +and less, also the default format if `responseFormat` is missing from the header. All the existing delta +sharing connectors are able to process data in this format. +- **delta**: format can be used to read a shared delta table with minReaderVersion > 1, which contains +readerFeatures such as Deletion Vector or Column Mapping. `delta-sharing-spark` libraries +that are able to process `responseformat=delta` will be released soon. + +### readerFeatures +readerfeatures is only useful when `responseformat=delta`, it includes values from [delta reader +features](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#table-features). It's set by the +caller of `DeltaSharingClient` to indicate its ability to process delta readerFeatures. + +## API Response Format in Parquet This section discusses the API Response Format returned by the server. @@ -2299,7 +2413,7 @@ The JSON object in each line is a wrapper object that may contain the following Field Name | Data Type | Description | Optional/Required -|-|-|- -protocol | The [Protocol](#protocol) JSON object. | Defines the versioning information about the API Response Format. | Optional +protocol | The [Protocol](#protocol) JSON object. | Defines the versioning information about the API Response Format in Parquet. | Optional metaData | The [Metadata](#metadata) JSON object. | The table metadata including schema, partitionColumns, etc. | Optional file | The [File](#file) JSON object. | An individual data file in the table. | Optional @@ -2335,7 +2449,7 @@ description | String | User-provided description for this table | Optional format | [Format](#format) Object | Specification of the encoding for the files stored in the table. | Required schemaString | String | Schema of the table. This is a serialized JSON string which can be deserialized to a [Schema](#schema-object) Object. | Required partitionColumns | Array[String] | An array containing the names of columns by which the data should be partitioned. When a table doesn’t have partition columns, this will be an **empty** array. | Required -configuration | Map[String, String] | A map containing configuration options for the table +configuration | Map[String, String] | A map containing configuration options for the table | Optional version | Long | The table version the metadata corresponds to, returned when querying table data with a version or timestamp parameter, or cdf query with includeHistoricalMetadata set to true. | Optional size | Long | The size of the table in bytes, will be returned if available in the delta log. | Optional numFiles | Long | The number of files in the table, will be returned if available in the delta log. | Optional @@ -2762,6 +2876,119 @@ nullCount | The number of `null` values for this column minValues | A value smaller than all values present in the file for this column maxValues | A value larger than all values present in the file for this column +## API Response Format in Delta + +This section discusses the API Response Format in Delta returned by the server. When a table is shared +as delta format, the actions in the response could be put in a delta log in the local storage on the +recipient side for the delta library to read data out of it directly. This way of sharing makes the +delta sharing protocol more transparent and robust in supporting advanced delta feature, and minimizes code duplication. + +### JSON Wrapper Object In Each Line in Delta + +The JSON object in each line is a wrapper object that may contain the following fields. For each +field, it is a wrapper of a [delta action](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#actions)(which keeps the action in its delta format and with original +values), and with some additional fields for delta sharing functionalities. + +Field Name | Data Type | Description | Optional/Required +-|-|-|- +protocol | The [Protocol in Delta Format](#protocol-in-delta-format) JSON object. | A wrapper of delta protocol. | Optional +metaData | The [Metadata in Delta Format](#metadata-in-delta-format) JSON object. | A wrapper of delta metadata, including some delta sharing specific fields. | Optional +file | The [File in Delta Format](#file-in-delta-format) JSON object. | A wrapper of a delta single action in the table. | Optional + +It must contain only **ONE** of the above fields. + +### Protocol in Delta Format + +A wrapper of a [delta protocol](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#protocol-evolution). + +Field Name | Data Type | Description | Optional/Required +-|-|-|- +deltaProtocol | Delta Protocol | Need to be parsed by a delta library as a delta protocol. | Required + +Example (for illustration purposes; each JSON object must be a single line in the response): + +```json +{ + "protocol": { + "deltaProtocol": { + "minReaderVersion": 3, + "minWriterVersion": 7 + } + } +} +``` + +### Metadata in Delta Format + +A wrapper of a [delta Metadata](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#change-metadata). + +Field Name | Data Type | Description | Optional/Required +-|-|-|- +deltaMetadata | Delta Metadata | Need to be parsed by a delta library as delta metadata | Required +version | Long | The table version the metadata corresponds to, returned when querying table data with a version or timestamp parameter, or cdf query with includeHistoricalMetadata set to true. | Optional +size | Long | The size of the table in bytes, will be returned if available in the delta log. | Optional +numFiles | Long | The number of files in the table, will be returned if available in the delta log. | Optional + +Example (for illustration purposes; each JSON object must be a single line in the response): + +```json +{ + "metaData": { + "version": 20, + "size": 123456, + "numFiles": 5, + "deltaMetadata": { + "partitionColumns": [ + "date" + ], + "format": { + "provider": "parquet" + }, + "schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"eventTime\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}", + "id": "f8d5c169-3d01-4ca3-ad9e-7dc3355aedb2", + "configuration": { + "enableChangeDataFeed": "true" + } + } + } +} +``` + +### File in Delta Format + +A wrapper of a delta file action, which can be [Add File and Remove File](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-file-and-remove-file), +or [Add CDC File](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-cdc-file) + +Field Name | Data Type | Description | Optional/Required +-|-|-|- +id | String | A unique string for the file in a table. The same file is guaranteed to have the same id across multiple requests. A client may cache the file content and use this id as a key to decide whether to use the cached file content. | Required +deletionVectorFileId | String | A unique string for the deletion vector file in a table. The same deletion vector file is guaranteed to have the same id across multiple requests. A client may cache the file content and use this id as a key to decide whether to use the cached file content. | Optional +version | Long | The table version of the file, returned when querying a table data with a version or timestamp parameter. | Optional +timestamp | Long | The unix timestamp corresponding to the table version of the file, in milliseconds, returned when querying a table data with a version or timestamp parameter. | Optional +expirationTimestamp | Long | The unix timestamp corresponding to the expiration of the url, in milliseconds, returned when the server supports the feature. | Optional +deltaSingleAction | Delta SingleAction | Need to be parsed by a delta library as a delta single action, the path field is replaced by pr-signed url. | Required + +Example (for illustration purposes; each JSON object must be a single line in the response): + +```json +{ + "file": { + "id": "591723a8-6a27-4240-a90e-57426f4736d2", + "size": 573, + "expirationTimestamp": 1652140800000, + "deltaSingleAction": { + "add": { + "path": "https://.s3.us-west-2.amazonaws.com/delta-exchange-test/table2/date%3D2021-04-28/part-00000-591723a8-6a27-4240-a90e-57426f4736d2.c000.snappy.parquet?...", + "partitionValues": { + "date": "2021-04-28" + }, + "stats": "{\"numRecords\":1,\"minValues\":{\"eventTime\":\"2021-04-28T23:33:48.719Z\"},\"maxValues\":{\"eventTime\":\"2021-04-28T23:33:48.719Z\"},\"nullCount\":{\"eventTime\":0}}" + } + } + } +} +``` + ## SQL Expressions for Filtering The client may send a sequence of predicates to the server as a hint to request fewer files when it only wishes to query a subset of the data (e.g., data where the `country` field is `US`). The server may try its best to filter files based on the predicates. This is **BEST EFFORT**, so the server may return files that don’t satisfy the predicates. For example, if the server fails to parse a SQL expression, the server can skip it. Hence, the client should always apply predicates to filter the data returned by the server. @@ -2824,7 +3051,9 @@ ValueType | Description "long" | Represents a Long type. "string" | Represents a String type. "date" | Represents a Date type in "yyyy-mm-dd" format. - +"float" | Represents a Float type. +"double" | Represents a Double type. +"timestamp" | Represents a timestamp in [Timestamp Format](#timestamp-format). Examples @@ -2880,6 +3109,25 @@ Examples } ``` +## Delta Sharing Streaming Specs +Delta Sharing Streaming is supported starting from delta-sharing-spark 0.6.0. As it's implemented +based on spark structured streaming, it leverages a pull model to consume the new data of the shared +table from the delta sharing server. In addition to most options supported in delta streaming, +there are two options/spark configs for delta sharing streaming. + +- spark config **spark.delta.sharing.streaming.queryTableVersionIntervalSeconds**: DeltaSharingSource +leverages [getTableVersion](#query-table-version) rpc to check whether there is new data available +to consume. In order to reduce the traffic burden to the delta sharing server, there's a minimum 30 +seconds interval between two getTableVersion rpcs to the delta sharing server. Though, if you are ok +with less freshness on the data and want to reduce the traffic to the server, you can set this +config to a larger number, for example: 60s or 120s. An error will be thrown if it's set less than 30 seconds. +- option **maxVersionsPerRpc**: DeltaSharingSource leverages [QueryTable](#read-data-from-a-table) +rpc to continuously read new data from the delta sharing server. There might be too much +new data to be returned from the server if the streaming has paused for a while on the recipient +side. Its default value is 100, a smaller number is recommended such as `.option("maxVersionsPerRpc", 10)` +to reduce the traffic load for each rpc. This shouldn't affect the freshness of the data significantly +assuming the process time of the delta sharing server grows linearly with `maxVersionsPerRpc`. + # Profile File Format diff --git a/protocol/delta-sharing-protocol-api.yml b/protocol/delta-sharing-protocol-api.yml index 8cd1e94e8..3e6c55954 100644 --- a/protocol/delta-sharing-protocol-api.yml +++ b/protocol/delta-sharing-protocol-api.yml @@ -521,6 +521,12 @@ paths: description: 'If set to true, return the historical metadata if seen in the delta log. This is for the streaming client to check if the table schema is still read compatible.' schema: type: boolean + - in: header + name: delta-sharing-capabilities + required: false + description: 'Delta Sharing Capabilities' + schema: + type: string responses: '400': $ref: "#/components/responses/400" @@ -575,11 +581,14 @@ components: not Unary Represents a logical not check. This op should have once child. The supported value types: ValueType Description - "bool" Represents an Boolean type. - "int" Represents an Integer type. - "long" Represents a Long type. - "string" Represents a String type. - "date" Represents a Date type in "yyyy-mm-dd" format. + "bool" Represents an Boolean type. + "int" Represents an Integer type. + "long" Represents a Long type. + "string" Represents a String type. + "date" Represents a Date type in "yyyy-mm-dd" format. + "float" Represents a Float type. + "double" Represents a Double type. + "timestamp" Represents a timestamp in ISO8601 format, in the UTC timezone. ListShareResponse: type: object @@ -765,27 +774,31 @@ components: properties: protocol: # it refers to ./delta-sharing-protocol.md#protocol - $ref: '#/components/schemas/ProtocolObject' + $ref: '#/components/schemas/ParquetProtocolObject' metadata: # it refers to ./delta-sharing-protocol.md#metadata - $ref: '#/components/schemas/MetadataObject' + $ref: '#/components/schemas/ParquetMetadataObject' # This is not used for the spec but comes handy for autogeneration TableQueryResponseObject: + oneOf: + - $ref: '#/components/schemas/ParquetTableQueryResponseObject' + - $ref: '#/components/schemas/DeltaTableQueryResponseObject' + ParquetTableQueryResponseObject: type: object properties: protocol: # it refers to ./delta-sharing-protocol.md#protocol - $ref: '#/components/schemas/ProtocolObject' + $ref: '#/components/schemas/ParquetProtocolObject' metadata: # it refers to ./delta-sharing-protocol.md#metadata - $ref: '#/components/schemas/MetadataObject' + $ref: '#/components/schemas/ParquetMetadataObject' files: type: array items: # it refers to ./delta-sharing-protocol.md#file - $ref: '#/components/schemas/FileObject' - FileObject: + $ref: '#/components/schemas/ParquetFileObject' + ParquetFileObject: type: object properties: file: @@ -819,7 +832,7 @@ components: - id - partitionValues - size - ProtocolObject: + ParquetProtocolObject: type: object properties: protocol: @@ -828,15 +841,14 @@ components: minReaderVersion: type: integer format: int32 - FormatObject: + ParquetFormatObject: type: object properties: provider: type: string required: - provider - - MetadataObject: + ParquetMetadataObject: type: object properties: metaData: @@ -849,7 +861,7 @@ components: description: type: string format: - $ref: '#/components/schemas/FormatObject' + $ref: '#/components/schemas/ParquetFormatObject' schemaString: type: string partitionColumns: @@ -875,7 +887,240 @@ components: - format - schemaString - partitionColumns - + DeltaTableQueryResponseObject: + type: object + properties: + protocol: + # it refers to ./delta-sharing-protocol.md#protocol + $ref: '#/components/schemas/DeltaProtocolObject' + metadata: + # it refers to ./delta-sharing-protocol.md#metadata + $ref: '#/components/schemas/DeltaMetadataObject' + files: + type: array + items: + # it refers to ./delta-sharing-protocol.md#file + $ref: '#/components/schemas/DeltaFileObject' + DeltaProtocolObject: + type: object + properties: + protocol: + type: object + properties: + deltaProtocol: + type: object + properties: + minReaderVersion: + type: integer + format: int32 + minWriterVersion: + type: integer + format: int32 + DeltaFormatObject: + type: object + properties: + provider: + type: string + options: + type: object + additionalProperties: + type: string + required: + - provider + DeltaMetadata: + type: object + description: see https://github.com/delta-io/delta/blob/master/PROTOCOL.md#change-metadata + required: + - id + - format + - schemaString + - partitionColumns + - configuration + properties: + id: + type: string + name: + type: string + description: + type: string + format: + $ref: '#/components/schemas/DeltaFormatObject' + schemaString: + type: string + partitionColumns: + type: array + items: + type: string + createdTime: + type: integer + format: int64 + configuration: + type: object + additionalProperties: + type: string + DeltaMetadataObject: + type: object + properties: + metaData: + type: object + properties: + version: + type: integer + format: int64 + size: + type: integer + format: int64 + numFiles: + type: integer + format: int64 + deltaMetadata: + $ref: '#/components/schemas/DeltaMetadata' + required: [ deltaMetadata ] + DeltaFileObject: + required: + - id + - deltaSingleAction + properties: + id: + type: string + deletionVectorFileId: + type: string + version: + type: integer + format: int64 + timestamp: + type: integer + format: int64 + expirationTimestamp: + type: integer + format: int64 + deltaSingleAction: + $ref: '#/components/schemas/DeltaSingleAction' + DeltaSingleAction: + type: object + description: only one field can be not null, container of delta actions such as file, add, cdf or remove see https://github.com/delta-io/delta/tree/master/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions + properties: + add: + $ref: '#/components/schemas/DeltaAddFileForCDFAction' + cdf: + $ref: '#/components/schemas/DeltaAddCDCFileAction' + file: + $ref: '#/components/schemas/DeltaAddFileAction' + remove: + $ref: '#/components/schemas/DeltaRemoveFileAction' + DeltaAddFileForCDFAction: + type: object + description: see io.delta.sharing.server.model.AddFileForCDF + properties: + url: + type: string + id: + type: string + partitionValues: + type: object + additionalProperties: + type: string + size: + type: integer + format: int64 + expirationTimestamp: + type: integer + format: int64 + version: + type: integer + format: int64 + timestamp: + type: integer + format: int64 + stats: + type: string + DeltaAddCDCFileAction: + type: object + description: see io.delta.sharing.server.model.AddCDCFile + properties: + url: + type: string + id: + type: string + partitionValues: + type: object + additionalProperties: + type: string + size: + type: integer + format: int64 + expirationTimestamp: + type: integer + format: int64 + timestamp: + type: integer + format: int64 + version: + type: integer + format: int64 + DeltaAddFileAction: + type: object + description: see io.delta.sharing.server.model.AddFile + properties: + url: + type: string + id: + type: string + partitionValues: + type: object + additionalProperties: + type: string + size: + type: integer + format: int64 + stats: + type: string + expirationTimestamp: + type: integer + format: int64 + timestamp: + type: integer + format: int64 + version: + type: integer + format: int64 + DeltaRemoveFileAction: + type: object + properties: + url: + type: string + id: + type: string + partitionValues: + type: object + additionalProperties: + type: string + size: + type: integer + format: int64 + expirationTimestamp: + type: integer + format: int64 + timestamp: + type: integer + format: int64 + version: + type: integer + format: int64 + DeltaEndStreamAction: + description: An action that is returned as the last line of the streaming response. It allows the server to include additional data that might be dynamically generated while the streaming message is sent + type: object + properties: + refreshToken: + type: string + description: a token used to refresh pre-signed urls for a long running query + nextPageToken: + type: string + description: a token used to retrieve the subsequent page of a query + minUrlExpirationTimestamp: + description: the minimum url expiration timestamp of the urls returned in current response + type: integer + format: int64 responses: "400": description: The request is malformed From c58980a23a36b7800af56f023f7b4d1929082688 Mon Sep 17 00:00:00 2001 From: Antonio Murgia Date: Tue, 30 Jan 2024 16:55:21 +0100 Subject: [PATCH 2/5] Update the protocol and manually generate TableQuery/Metadata response for parquet still to do: do the same for Delta ones --- protocol/delta-sharing-protocol-api.yml | 119 ---------------- .../api/deltasharing/DeltaMappers.java | 68 ++++----- .../api/deltasharing/model/v1/Format.java | 10 ++ .../model/v1/TableMetadataResponse.java | 13 ++ .../model/v1/TableQueryResponse.java | 17 +++ .../model/v1/delta/DeltaProtocol.java | 45 ++++++ .../model/v1/parquet/ParquetAddFile.java | 85 ++++++++++++ .../model/v1/parquet/ParquetCDFFile.java | 76 ++++++++++ .../model/v1/parquet/ParquetFile.java | 89 ++++++++++++ .../model/v1/parquet/ParquetMetadata.java | 94 +++++++++++++ .../model/v1/parquet/ParquetProtocol.java | 28 ++++ .../model/v1/parquet/ParquetRemoveFile.java | 74 ++++++++++ .../serializers/TableMetadataSerializer.java | 10 +- .../TableQueryResponseSerializer.java | 14 +- .../server/DeltaSharesApiImpl.java | 3 +- .../whitefox/api/server/WhitefoxMappers.java | 42 +++--- .../api/deltasharing/SampleTables.java | 131 ++++++++++-------- .../model/v1/FormatSerializationTest.java | 24 ++++ .../delta/DeltaProtocolSerializationTest.java | 26 ++++ .../ParquetAddFileSerializationTest.java | 47 +++++++ .../ParquetCDFFileSerializationTest.java | 45 ++++++ .../parquet/ParquetFileSerializationTest.java | 44 ++++++ .../ParquetMetadataSerializationTest.java | 92 ++++++++++++ .../ParquetProtocolSerializationTest.java | 25 ++++ .../ParquetRemoveFileSerializationTest.java | 46 ++++++ .../server/DeltaSharesApiImplAwsTest.java | 41 +++--- .../server/DeltaSharesApiImplTest.java | 60 ++++---- 27 files changed, 1092 insertions(+), 276 deletions(-) create mode 100644 server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/Format.java create mode 100644 server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/TableMetadataResponse.java create mode 100644 server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/TableQueryResponse.java create mode 100644 server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaProtocol.java create mode 100644 server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetAddFile.java create mode 100644 server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetCDFFile.java create mode 100644 server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetFile.java create mode 100644 server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetMetadata.java create mode 100644 server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetProtocol.java create mode 100644 server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetRemoveFile.java create mode 100644 server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/FormatSerializationTest.java create mode 100644 server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaProtocolSerializationTest.java create mode 100644 server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetAddFileSerializationTest.java create mode 100644 server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetCDFFileSerializationTest.java create mode 100644 server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetFileSerializationTest.java create mode 100644 server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetMetadataSerializationTest.java create mode 100644 server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetProtocolSerializationTest.java create mode 100644 server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetRemoveFileSerializationTest.java diff --git a/protocol/delta-sharing-protocol-api.yml b/protocol/delta-sharing-protocol-api.yml index 3e6c55954..b88fc043a 100644 --- a/protocol/delta-sharing-protocol-api.yml +++ b/protocol/delta-sharing-protocol-api.yml @@ -768,125 +768,6 @@ components: message: type: string - # This is not used for the spec but comes handy for autogeneration - TableMetadataResponseObject: - type: object - properties: - protocol: - # it refers to ./delta-sharing-protocol.md#protocol - $ref: '#/components/schemas/ParquetProtocolObject' - metadata: - # it refers to ./delta-sharing-protocol.md#metadata - $ref: '#/components/schemas/ParquetMetadataObject' - - # This is not used for the spec but comes handy for autogeneration - TableQueryResponseObject: - oneOf: - - $ref: '#/components/schemas/ParquetTableQueryResponseObject' - - $ref: '#/components/schemas/DeltaTableQueryResponseObject' - ParquetTableQueryResponseObject: - type: object - properties: - protocol: - # it refers to ./delta-sharing-protocol.md#protocol - $ref: '#/components/schemas/ParquetProtocolObject' - metadata: - # it refers to ./delta-sharing-protocol.md#metadata - $ref: '#/components/schemas/ParquetMetadataObject' - files: - type: array - items: - # it refers to ./delta-sharing-protocol.md#file - $ref: '#/components/schemas/ParquetFileObject' - ParquetFileObject: - type: object - properties: - file: - type: object - properties: - url: - type: string - id: - type: string - partitionValues: - type: object - additionalProperties: - type: - string - size: - type: integer - format: int64 - stats: - type: string - version: - type: integer - format: int64 - timestamp: - type: integer - format: int64 - expirationTimestamp: - type: integer - format: int64 - required: - - url - - id - - partitionValues - - size - ParquetProtocolObject: - type: object - properties: - protocol: - type: object - properties: - minReaderVersion: - type: integer - format: int32 - ParquetFormatObject: - type: object - properties: - provider: - type: string - required: - - provider - ParquetMetadataObject: - type: object - properties: - metaData: - type: object - properties: - id: - type: string - name: - type: string - description: - type: string - format: - $ref: '#/components/schemas/ParquetFormatObject' - schemaString: - type: string - partitionColumns: - type: array - items: - type: string - configuration: - type: object - additionalProperties: - type: - string - version: - type: integer - format: int64 - size: - type: integer - format: int64 - numFiles: - type: integer - format: int64 - required: - - id - - format - - schemaString - - partitionColumns DeltaTableQueryResponseObject: type: object properties: diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/DeltaMappers.java b/server/app/src/main/java/io/whitefox/api/deltasharing/DeltaMappers.java index 47503f78e..406c65ea4 100644 --- a/server/app/src/main/java/io/whitefox/api/deltasharing/DeltaMappers.java +++ b/server/app/src/main/java/io/whitefox/api/deltasharing/DeltaMappers.java @@ -1,7 +1,13 @@ package io.whitefox.api.deltasharing; +import io.whitefox.api.deltasharing.model.v1.TableMetadataResponse; +import io.whitefox.api.deltasharing.model.v1.TableQueryResponse; import io.whitefox.api.deltasharing.model.v1.generated.*; +import io.whitefox.api.deltasharing.model.v1.parquet.ParquetFile; +import io.whitefox.api.deltasharing.model.v1.parquet.ParquetMetadata; +import io.whitefox.api.deltasharing.model.v1.parquet.ParquetProtocol; import io.whitefox.api.server.CommonMappers; +import io.whitefox.api.server.WhitefoxMappers; import io.whitefox.core.*; import io.whitefox.core.Schema; import io.whitefox.core.Share; @@ -53,46 +59,46 @@ public static ReadTableRequest api2ReadTableRequest(QueryRequest request) { } } - public static TableQueryResponseObject readTableResult2api(ReadTableResult readTableResult) { - return new TableQueryResponseObject() - .metadata(metadata2Api(readTableResult.metadata())) - .protocol(protocol2Api(readTableResult.protocol())) - .files(readTableResult.files().stream() - .map(DeltaMappers::file2Api) - .collect(Collectors.toList())); + public static TableQueryResponse readTableResult2api(ReadTableResult readTableResult) { + return new TableQueryResponse( + protocol2Api(readTableResult.protocol()), + metadata2Api(readTableResult.metadata()), + readTableResult.files().stream().map(DeltaMappers::file2Api).collect(Collectors.toList())); } - private static MetadataObject metadata2Api(Metadata metadata) { - return new MetadataObject() - .metaData(new MetadataObjectMetaData() + private static ParquetMetadata metadata2Api(Metadata metadata) { + return ParquetMetadata.builder() + .metadata(ParquetMetadata.Metadata.builder() .id(metadata.id()) - .name(metadata.name().orElse(null)) - .description(metadata.description().orElse(null)) - .format(new FormatObject().provider(metadata.format().provider())) + .name(metadata.name()) + .description(metadata.description()) + .format(WhitefoxMappers.format2api(metadata.format())) .schemaString(metadata.tableSchema().structType().toJson()) .partitionColumns(metadata.partitionColumns()) - ._configuration(metadata.configuration()) - .version(metadata.version().orElse(null)) - .numFiles(metadata.numFiles().orElse(null))); + .configuration(Optional.of(metadata.configuration())) + .version(metadata.version()) + .numFiles(metadata.numFiles()) + .build()) + .build(); } - private static ProtocolObject protocol2Api(Protocol protocol) { - return new ProtocolObject() - .protocol(new ProtocolObjectProtocol() - .minReaderVersion(protocol.minReaderVersion().orElse(1))); + private static ParquetProtocol protocol2Api(Protocol protocol) { + return ParquetProtocol.ofMinReaderVersion(protocol.minReaderVersion().orElse(1)); } - private static FileObject file2Api(TableFile f) { - return new FileObject() - ._file(new FileObjectFile() + private static ParquetFile file2Api(TableFile f) { + return ParquetFile.builder() + .file(ParquetFile.File.builder() .id(f.id()) .url(f.url()) .partitionValues(f.partitionValues()) .size(f.size()) - .stats(f.stats().orElse(null)) - .version(f.version().orElse(null)) - .timestamp(f.timestamp().orElse(null)) - .expirationTimestamp(f.expirationTimestamp())); + .stats(f.stats()) + .version(f.version()) + .timestamp(f.timestamp()) + .expirationTimestamp(Optional.of(f.expirationTimestamp())) + .build()) + .build(); } public static TableReferenceAndReadRequest api2TableReferenceAndReadRequest( @@ -127,9 +133,9 @@ public static Map toHeaderCapabilitiesMap(String headerCapabilit .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } - public static TableMetadataResponseObject toTableResponseMetadata(Metadata m) { - return new TableMetadataResponseObject() - .protocol(new ProtocolObject().protocol(new ProtocolObjectProtocol().minReaderVersion(1))) - .metadata(metadata2Api(m)); + public static TableMetadataResponse toTableResponseMetadata(Metadata m) { + return new TableMetadataResponse( + ParquetProtocol.ofMinReaderVersion(1), // smell + metadata2Api(m)); } } diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/Format.java b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/Format.java new file mode 100644 index 000000000..c83dcf540 --- /dev/null +++ b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/Format.java @@ -0,0 +1,10 @@ +package io.whitefox.api.deltasharing.model.v1; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Value; + +@Value +public class Format { + @JsonProperty + String provider = "parquet"; +} diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/TableMetadataResponse.java b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/TableMetadataResponse.java new file mode 100644 index 000000000..7b99bb625 --- /dev/null +++ b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/TableMetadataResponse.java @@ -0,0 +1,13 @@ +package io.whitefox.api.deltasharing.model.v1; + +import io.whitefox.api.deltasharing.model.v1.parquet.ParquetMetadata; +import io.whitefox.api.deltasharing.model.v1.parquet.ParquetProtocol; +import lombok.NonNull; +import lombok.Value; + +@Value +public class TableMetadataResponse { + @NonNull ParquetProtocol protocol; + + @NonNull ParquetMetadata metadata; +} diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/TableQueryResponse.java b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/TableQueryResponse.java new file mode 100644 index 000000000..52f7311af --- /dev/null +++ b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/TableQueryResponse.java @@ -0,0 +1,17 @@ +package io.whitefox.api.deltasharing.model.v1; + +import io.whitefox.api.deltasharing.model.v1.parquet.ParquetFile; +import io.whitefox.api.deltasharing.model.v1.parquet.ParquetMetadata; +import io.whitefox.api.deltasharing.model.v1.parquet.ParquetProtocol; +import java.util.List; +import lombok.NonNull; +import lombok.Value; + +@Value +public class TableQueryResponse { + @NonNull ParquetProtocol protocol; + + @NonNull ParquetMetadata metadata; + + @NonNull List files; +} diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaProtocol.java b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaProtocol.java new file mode 100644 index 000000000..e9cfcd875 --- /dev/null +++ b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaProtocol.java @@ -0,0 +1,45 @@ +package io.whitefox.api.deltasharing.model.v1.delta; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Value; +import lombok.experimental.SuperBuilder; +import lombok.extern.jackson.Jacksonized; + +@SuperBuilder +@Jacksonized +@Value +public class DeltaProtocol { + + @JsonProperty + Protocol protocol; + + @SuperBuilder + @Jacksonized + @Value + private static class Protocol { + @JsonProperty + InternalDeltaProtocol deltaProtocol; + } + + @SuperBuilder + @Jacksonized + @Value + private static class InternalDeltaProtocol { + @JsonProperty + int minReaderVersion; + + @JsonProperty + int minWriterVersion; + } + + public static DeltaProtocol of(int minReaderVersion, int maxReaderVersion) { + return DeltaProtocol.builder() + .protocol(Protocol.builder() + .deltaProtocol(InternalDeltaProtocol.builder() + .minReaderVersion(minReaderVersion) + .minWriterVersion(maxReaderVersion) + .build()) + .build()) + .build(); + } +} diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetAddFile.java b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetAddFile.java new file mode 100644 index 000000000..8c2099347 --- /dev/null +++ b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetAddFile.java @@ -0,0 +1,85 @@ +package io.whitefox.api.deltasharing.model.v1.parquet; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Map; +import java.util.Optional; +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; +import lombok.experimental.SuperBuilder; +import lombok.extern.jackson.Jacksonized; + +@Value +@SuperBuilder +@Jacksonized +public class ParquetAddFile { + + @JsonProperty + @NonNull Add add; + + @Value + @Builder + @Jacksonized + @JsonInclude(JsonInclude.Include.NON_ABSENT) + public static class Add { + /** + * A https url that a client can use to read the file directly. + * The same file in different responses may have different urls. + */ + @JsonProperty + @NonNull String url; + + /** + * A unique string for the file in a table. + * The same file is guaranteed to have the same id across multiple requests. + * A client may cache the file content and use this id as a key to decide whether to use the cached file content. + */ + @JsonProperty + @NonNull String id; + + /** + * A map from partition column to value for this file. + * When the table doesn’t have partition columns, this will be an empty map. + * See Partition Value Serialization for how to parse the partition values. + */ + @JsonProperty + @NonNull Map partitionValues; + + /** + * The size of this file in bytes. + */ + @JsonProperty + long size; + + /** + * The timestamp of the file in milliseconds from epoch. + */ + @JsonProperty + long timestamp; + + /** + * The table version of this file. + */ + @JsonProperty + int version; + + /** + * Contains statistics (e.g., count, min/max values for columns) about the data in this file. + * This field may be missing. A file may or may not have stats. + * This is a serialized JSON string which can be deserialized to a Statistics Struct. + * A client can decide whether to use stats or drop it. + */ + @JsonProperty + @Builder.Default + Optional stats = Optional.empty(); + + /** + * The unix timestamp corresponding to the expiration of the url, in milliseconds, + * returned when the server supports the feature. + */ + @JsonProperty + @Builder.Default + Optional expirationTimestamp = Optional.empty(); + } +} diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetCDFFile.java b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetCDFFile.java new file mode 100644 index 000000000..adf0acd84 --- /dev/null +++ b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetCDFFile.java @@ -0,0 +1,76 @@ +package io.whitefox.api.deltasharing.model.v1.parquet; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Map; +import java.util.Optional; +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; +import lombok.experimental.SuperBuilder; +import lombok.extern.jackson.Jacksonized; + +@Value +@SuperBuilder +@Jacksonized +public class ParquetCDFFile { + + @JsonProperty + @NonNull CDF cdf; + + @Value + @Builder + @Jacksonized + @JsonInclude(JsonInclude.Include.NON_ABSENT) + public static class CDF { + + /** + * A https url that a client can use to read the file directly. + * The same file in different responses may have different urls. + */ + @JsonProperty + @NonNull String url; + + /** + * A unique string for the file in a table. + * The same file is guaranteed to have the same id across multiple requests. + * A client may cache the file content and use this id as a key to decide whether to use the cached file content. + */ + @JsonProperty + @NonNull String id; + + /** + * A map from partition column to value for this file. + * When the table doesn’t have partition columns, this will be an empty map. + * See Partition Value Serialization for how to parse the partition values. + */ + @JsonProperty + @NonNull Map partitionValues; + + /** + * The size of this file in bytes. + */ + @JsonProperty + long size; + + /** + * The timestamp of the file in milliseconds from epoch. + */ + @JsonProperty + long timestamp; + + /** + * The table version of this file. + */ + @JsonProperty + int version; + + /** + * The unix timestamp corresponding to the expiration of the url, in milliseconds, + * returned when the server supports the feature. + */ + @JsonProperty + @Builder.Default + Optional expirationTimestamp = Optional.empty(); + } +} diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetFile.java b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetFile.java new file mode 100644 index 000000000..92ff8a6df --- /dev/null +++ b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetFile.java @@ -0,0 +1,89 @@ +package io.whitefox.api.deltasharing.model.v1.parquet; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Map; +import java.util.Optional; +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; +import lombok.experimental.SuperBuilder; +import lombok.extern.jackson.Jacksonized; + +@Value +@SuperBuilder +@Jacksonized +public class ParquetFile { + + @JsonProperty + @NonNull ParquetFile.File file; + + @Value + @Builder + @Jacksonized + @JsonInclude(JsonInclude.Include.NON_ABSENT) + public static class File { + + /** + * A https url that a client can use to read the file directly. + * The same file in different responses may have different urls. + */ + @JsonProperty + @NonNull String url; + + /** + * A unique string for the file in a table. + * The same file is guaranteed to have the same id across multiple requests. + * A client may cache the file content and use this id as a key to decide whether to use the cached file content. + */ + @JsonProperty + @NonNull String id; + + /** + * A map from partition column to value for this file. + * When the table doesn’t have partition columns, this will be an empty map. + * See Partition Value Serialization for how to parse the partition values. + */ + @JsonProperty + @NonNull Map partitionValues; + + /** + * The size of this file in bytes. + */ + @JsonProperty + long size; + + /** + * Contains statistics (e.g., count, min/max values for columns) about the data in this file. + * This field may be missing. A file may or may not have stats. + * This is a serialized JSON string which can be deserialized to a Statistics Struct. + * A client can decide whether to use stats or drop it. + */ + @JsonProperty + @Builder.Default + Optional stats = Optional.empty(); + + /** + * The table version of the file, returned when querying a table data with a version or timestamp parameter. + */ + @JsonProperty + @Builder.Default + Optional version = Optional.empty(); + + /** + * The unix timestamp corresponding to the table version of the file, in milliseconds, + * returned when querying a table data with a version or timestamp parameter. + */ + @JsonProperty + @Builder.Default + Optional timestamp = Optional.empty(); + + /** + * The unix timestamp corresponding to the expiration of the url, in milliseconds, + * returned when the server supports the feature. + */ + @JsonProperty + @Builder.Default + Optional expirationTimestamp = Optional.empty(); + } +} diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetMetadata.java b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetMetadata.java new file mode 100644 index 000000000..2112f3f43 --- /dev/null +++ b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetMetadata.java @@ -0,0 +1,94 @@ +package io.whitefox.api.deltasharing.model.v1.parquet; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.whitefox.api.deltasharing.model.v1.Format; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; +import lombok.experimental.SuperBuilder; +import lombok.extern.jackson.Jacksonized; + +@Value +@SuperBuilder +@Jacksonized +public class ParquetMetadata { + @JsonProperty(value = "metaData") + @NonNull Metadata metadata; + + @Value + @SuperBuilder + @Jacksonized + @JsonInclude(JsonInclude.Include.NON_ABSENT) + public static class Metadata { + + /** + * Unique identifier for this table + */ + @JsonProperty + @NonNull String id; + + /** + * User-provided identifier for this table + */ + @JsonProperty + @Builder.Default + Optional name = Optional.empty(); + + /** + * User-provided description for this table + */ + @JsonProperty + @Builder.Default + Optional description = Optional.empty(); + + /** + * Specification of the encoding for the files stored in the table. + */ + @JsonProperty + @NonNull Format format; + + /** + * Schema of the table. This is a serialized JSON string which can be deserialized to a Schema Object. + */ + @JsonProperty + @NonNull String schemaString; + + /** + * An array containing the names of columns by which the data should be partitioned. When a table doesn’t have partition columns, this will be an empty array. + */ + @JsonProperty + @NonNull List partitionColumns; + + /** + * A map containing configuration options for the table + */ + @JsonProperty + @Builder.Default + Optional> configuration = Optional.empty(); + + /** + * The table version the metadata corresponds to, returned when querying table data with a version or timestamp parameter, or cdf query with includeHistoricalMetadata set to true. + */ + @JsonProperty + @Builder.Default + Optional version = Optional.empty(); + + /** + * The size of the table in bytes, will be returned if available in the delta log. + */ + @JsonProperty + @Builder.Default + Optional size = Optional.empty(); + + /** + * The number of files in the table, will be returned if available in the delta log. + */ + @JsonProperty + @Builder.Default + Optional numFiles = Optional.empty(); + } +} diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetProtocol.java b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetProtocol.java new file mode 100644 index 000000000..00ee8de72 --- /dev/null +++ b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetProtocol.java @@ -0,0 +1,28 @@ +package io.whitefox.api.deltasharing.model.v1.parquet; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Value; +import lombok.experimental.SuperBuilder; +import lombok.extern.jackson.Jacksonized; + +@SuperBuilder +@Jacksonized +@Value +public class ParquetProtocol { + @JsonProperty + Protocol protocol; + + @SuperBuilder + @Jacksonized + @Value + private static class Protocol { + @JsonProperty + int minReaderVersion; + } + + public static ParquetProtocol ofMinReaderVersion(int minReaderVersion) { + return ParquetProtocol.builder() + .protocol(Protocol.builder().minReaderVersion(minReaderVersion).build()) + .build(); + } +} diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetRemoveFile.java b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetRemoveFile.java new file mode 100644 index 000000000..4b1d28720 --- /dev/null +++ b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetRemoveFile.java @@ -0,0 +1,74 @@ +package io.whitefox.api.deltasharing.model.v1.parquet; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Map; +import java.util.Optional; +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; +import lombok.experimental.SuperBuilder; +import lombok.extern.jackson.Jacksonized; + +@Value +@SuperBuilder +@Jacksonized +public class ParquetRemoveFile { + @JsonProperty + @NonNull Remove remove; + + @Value + @Builder + @Jacksonized + @JsonInclude(JsonInclude.Include.NON_ABSENT) + public static class Remove { + /** + * A https url that a client can use to read the file directly. + * The same file in different responses may have different urls. + */ + @JsonProperty + @NonNull String url; + + /** + * A unique string for the file in a table. + * The same file is guaranteed to have the same id across multiple requests. + * A client may cache the file content and use this id as a key to decide whether to use the cached file content. + */ + @JsonProperty + @NonNull String id; + + /** + * A map from partition column to value for this file. + * When the table doesn’t have partition columns, this will be an empty map. + * See Partition Value Serialization for how to parse the partition values. + */ + @JsonProperty + @NonNull Map partitionValues; + + /** + * The size of this file in bytes. + */ + @JsonProperty + long size; + + /** + * The timestamp of the file in milliseconds from epoch. + */ + @JsonProperty + long timestamp; + + /** + * The table version of this file. + */ + @JsonProperty + int version; + + /** + * The unix timestamp corresponding to the expiration of the url, in milliseconds, + * returned when the server supports the feature. + */ + @JsonProperty + @Builder.Default + Optional expirationTimestamp = Optional.empty(); + } +} diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/serializers/TableMetadataSerializer.java b/server/app/src/main/java/io/whitefox/api/deltasharing/serializers/TableMetadataSerializer.java index dcbd24903..7eb470fc9 100644 --- a/server/app/src/main/java/io/whitefox/api/deltasharing/serializers/TableMetadataSerializer.java +++ b/server/app/src/main/java/io/whitefox/api/deltasharing/serializers/TableMetadataSerializer.java @@ -3,12 +3,12 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; -import io.whitefox.api.deltasharing.model.v1.generated.TableMetadataResponseObject; +import io.whitefox.api.deltasharing.model.v1.TableMetadataResponse; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @ApplicationScoped -public class TableMetadataSerializer implements Serializer { +public class TableMetadataSerializer implements Serializer { private final ObjectWriter objectWriter; private static final String LINE_FEED = "\n"; @@ -18,12 +18,12 @@ public TableMetadataSerializer(ObjectMapper objectMapper) { } @Override - public String serialize(TableMetadataResponseObject data) { + public String serialize(TableMetadataResponse data) { StringBuilder stringBuilder = new StringBuilder(); try { - stringBuilder.append(objectWriter.writeValueAsString(data.getProtocol())); + stringBuilder.append(objectWriter.writeValueAsString(data.protocol())); stringBuilder.append(LINE_FEED); - stringBuilder.append(objectWriter.writeValueAsString(data.getMetadata())); + stringBuilder.append(objectWriter.writeValueAsString(data.metadata())); return stringBuilder.toString(); } catch (JsonProcessingException e) { throw new RuntimeException(e); diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/serializers/TableQueryResponseSerializer.java b/server/app/src/main/java/io/whitefox/api/deltasharing/serializers/TableQueryResponseSerializer.java index 5fa6c74de..961a06c17 100644 --- a/server/app/src/main/java/io/whitefox/api/deltasharing/serializers/TableQueryResponseSerializer.java +++ b/server/app/src/main/java/io/whitefox/api/deltasharing/serializers/TableQueryResponseSerializer.java @@ -3,13 +3,13 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; -import io.whitefox.api.deltasharing.model.v1.generated.FileObject; -import io.whitefox.api.deltasharing.model.v1.generated.TableQueryResponseObject; +import io.whitefox.api.deltasharing.model.v1.TableQueryResponse; +import io.whitefox.api.deltasharing.model.v1.parquet.ParquetFile; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @ApplicationScoped -public class TableQueryResponseSerializer implements Serializer { +public class TableQueryResponseSerializer implements Serializer { private final ObjectWriter objectWriter; private static final String LINE_FEED = "\n"; @@ -19,13 +19,13 @@ public TableQueryResponseSerializer(ObjectMapper objectMapper) { } @Override - public String serialize(TableQueryResponseObject data) { + public String serialize(TableQueryResponse data) { StringBuilder stringBuilder = new StringBuilder(); try { - stringBuilder.append(objectWriter.writeValueAsString(data.getProtocol())); + stringBuilder.append(objectWriter.writeValueAsString(data.protocol())); stringBuilder.append(LINE_FEED); - stringBuilder.append(objectWriter.writeValueAsString(data.getMetadata())); - for (FileObject line : data.getFiles()) { + stringBuilder.append(objectWriter.writeValueAsString(data.metadata())); + for (ParquetFile line : data.files()) { stringBuilder.append(LINE_FEED); stringBuilder.append(objectWriter.writeValueAsString(line)); } diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImpl.java b/server/app/src/main/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImpl.java index 31f77c4a1..e037bab94 100644 --- a/server/app/src/main/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImpl.java +++ b/server/app/src/main/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImpl.java @@ -60,7 +60,8 @@ public Response getTableChanges( Integer startingVersion, Integer endingVersion, String endingTimestamp, - Boolean includeHistoricalMetadata) { + Boolean includeHistoricalMetadata, + String deltaSharingCapabilities) { return Response.ok().build(); } diff --git a/server/app/src/main/java/io/whitefox/api/server/WhitefoxMappers.java b/server/app/src/main/java/io/whitefox/api/server/WhitefoxMappers.java index ac5dd4d36..08a656cb3 100644 --- a/server/app/src/main/java/io/whitefox/api/server/WhitefoxMappers.java +++ b/server/app/src/main/java/io/whitefox/api/server/WhitefoxMappers.java @@ -1,16 +1,15 @@ package io.whitefox.api.server; -import io.whitefox.api.deltasharing.model.v1.generated.*; +import io.whitefox.api.deltasharing.model.v1.Format; +import io.whitefox.api.deltasharing.model.v1.parquet.ParquetMetadata; +import io.whitefox.api.deltasharing.model.v1.parquet.ParquetProtocol; import io.whitefox.api.model.v1.generated.*; import io.whitefox.core.*; import io.whitefox.core.Metastore; import io.whitefox.core.MetastoreProperties; -import io.whitefox.core.MetastoreType; import io.whitefox.core.Provider; -import io.whitefox.core.Share; import io.whitefox.core.Storage; import io.whitefox.core.StorageProperties; -import io.whitefox.core.StorageType; import io.whitefox.core.actions.*; import io.whitefox.core.actions.CreateMetastore; import io.whitefox.core.actions.CreateStorage; @@ -173,24 +172,33 @@ public static MetastoreType api2MetastoreType( } } - private static MetadataObject metadata2Api(Metadata metadata) { - return new MetadataObject() - .metaData(new MetadataObjectMetaData() + private static ParquetMetadata metadata2Api(Metadata metadata) { + return ParquetMetadata.builder() + .metadata(ParquetMetadata.Metadata.builder() .id(metadata.id()) - .name(metadata.name().orElse(null)) - .description(metadata.description().orElse(null)) - .format(new FormatObject().provider(metadata.format().provider())) + .name(metadata.name()) + .description(metadata.description()) + .format(format2api(metadata.format())) .schemaString(metadata.tableSchema().structType().toJson()) .partitionColumns(metadata.partitionColumns()) - ._configuration(metadata.configuration()) - .version(metadata.version().orElse(null)) - .numFiles(metadata.numFiles().orElse(null))); + .configuration(Optional.ofNullable(metadata.configuration())) + .version(metadata.version()) + .numFiles(metadata.numFiles()) + .build()) + .build(); } - private static ProtocolObject protocol2Api(Protocol protocol) { - return new ProtocolObject() - .protocol(new ProtocolObjectProtocol() - .minReaderVersion(protocol.minReaderVersion().orElse(1))); + public static Format format2api(Metadata.Format format) { + switch (format) { + case PARQUET: + return new Format(); + } + // never gonna happen, java is dumb + return null; + } + + private static ParquetProtocol protocol2Api(Protocol protocol) { + return ParquetProtocol.ofMinReaderVersion(protocol.minReaderVersion().orElse(1)); } public static CreateProvider api2CreateProvider( diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/SampleTables.java b/server/app/src/test/java/io/whitefox/api/deltasharing/SampleTables.java index a7a069505..2312aeb41 100644 --- a/server/app/src/test/java/io/whitefox/api/deltasharing/SampleTables.java +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/SampleTables.java @@ -8,7 +8,10 @@ import io.whitefox.S3TestConfig; import io.whitefox.api.deltasharing.model.FileObjectFileWithoutPresignedUrl; import io.whitefox.api.deltasharing.model.FileObjectWithoutPresignedUrl; -import io.whitefox.api.deltasharing.model.v1.generated.*; +import io.whitefox.api.deltasharing.model.v1.Format; +import io.whitefox.api.deltasharing.model.v1.parquet.ParquetFile; +import io.whitefox.api.deltasharing.model.v1.parquet.ParquetMetadata; +import io.whitefox.api.deltasharing.model.v1.parquet.ParquetProtocol; import io.whitefox.core.InternalTable; import io.whitefox.core.Principal; import io.whitefox.core.SharedTable; @@ -16,6 +19,7 @@ import io.whitefox.persistence.memory.InMemoryStorageManager; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; public class SampleTables { @@ -69,46 +73,50 @@ public static StorageManager createStorageManager() { 0L))); } - public static final MetadataObject deltaTable1Metadata = new MetadataObject() - .metaData(new MetadataObjectMetaData() + public static final ParquetMetadata deltaTable1Metadata = ParquetMetadata.builder() + .metadata(ParquetMetadata.Metadata.builder() .id("56d48189-cdbc-44f2-9b0e-2bded4c79ed7") - .name("table1") - .format(new FormatObject().provider("parquet")) + .name(Optional.of("table1")) + .format(new Format()) .schemaString( "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}") .partitionColumns(List.of()) - .version(0L) - ._configuration(Map.of())); + .version(Optional.of(0L)) + .configuration(Optional.of(Map.of())) + .build()) + .build(); - public static final MetadataObject s3DeltaTable1Metadata = new MetadataObject() - .metaData(new MetadataObjectMetaData() + public static final ParquetMetadata s3DeltaTable1Metadata = ParquetMetadata.builder() + .metadata(ParquetMetadata.Metadata.builder() .id("ed2297c4-8bb8-4c74-963d-8fed6bebfd8b") - .name("s3Table1") - .format(new FormatObject().provider("parquet")) + .name(Optional.of("s3Table1")) + .format(new Format()) .schemaString( "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}") .partitionColumns(List.of()) - .version(0L) - ._configuration(Map.of())); - public static final MetadataObject deltaTableWithHistory1Metadata = new MetadataObject() - .metaData(new MetadataObjectMetaData() + .version(Optional.of(0L)) + .configuration(Optional.of(Map.of())) + .build()) + .build(); + public static final ParquetMetadata deltaTableWithHistory1Metadata = ParquetMetadata.builder() + .metadata(ParquetMetadata.Metadata.builder() .id("56d48189-cdbc-44f2-9b0e-2bded4c79ed7") - .name("table-with-history") - .format(new FormatObject().provider("parquet")) + .name(Optional.of("table-with-history")) + .format(new Format()) .schemaString( "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}") .partitionColumns(List.of()) - .version(0L) - ._configuration(Map.of())); - public static final ProtocolObject deltaTable1Protocol = - new ProtocolObject().protocol(new ProtocolObjectProtocol().minReaderVersion(1)); + .version(Optional.of(0L)) + .configuration(Optional.of(Map.of())) + .build()) + .build(); + public static final ParquetProtocol deltaTable1Protocol = ParquetProtocol.ofMinReaderVersion(1); - public static final ProtocolObject s3DeltaTable1Protocol = - new ProtocolObject().protocol(new ProtocolObjectProtocol().minReaderVersion(1)); + public static final ParquetProtocol s3DeltaTable1Protocol = ParquetProtocol.ofMinReaderVersion(1); - public static final Set deltaTable1Files = Set.of( - new FileObject() - ._file(new FileObjectFile() + public static final Set deltaTable1Files = Set.of( + ParquetFile.builder() + .file(ParquetFile.File.builder() .url(deltaTable1Path + "part-00003-049d1c60-7ad6-45a3-af3f-65ffcabcc974-c000.snappy.parquet") .id(deltaTable1Path @@ -116,12 +124,15 @@ public static StorageManager createStorageManager() { .partitionValues(Map.of()) .size(478L) .stats( - "{\"numRecords\":1,\"minValues\":{\"id\":1},\"maxValues\":{\"id\":1},\"nullCount\":{\"id\":0}}") - .version(0L) - .timestamp(1695976443161L) - .expirationTimestamp(9223372036854775807L)), - new FileObject() - ._file(new FileObjectFile() + Optional.of( + "{\"numRecords\":1,\"minValues\":{\"id\":1},\"maxValues\":{\"id\":1},\"nullCount\":{\"id\":0}}")) + .version(Optional.of(0L)) + .timestamp(Optional.of(1695976443161L)) + .expirationTimestamp(Optional.of(9223372036854775807L)) + .build()) + .build(), + ParquetFile.builder() + .file(ParquetFile.File.builder() .url(deltaTable1Path + "part-00001-a67388a6-e20e-426e-a872-351c390779a5-c000.snappy.parquet") .id(deltaTable1Path @@ -129,12 +140,15 @@ public static StorageManager createStorageManager() { .partitionValues(Map.of()) .size(478L) .stats( - "{\"numRecords\":1,\"minValues\":{\"id\":0},\"maxValues\":{\"id\":0},\"nullCount\":{\"id\":0}}") - .version(0L) - .timestamp(1695976443161L) - .expirationTimestamp(9223372036854775807L)), - new FileObject() - ._file(new FileObjectFile() + Optional.of( + "{\"numRecords\":1,\"minValues\":{\"id\":0},\"maxValues\":{\"id\":0},\"nullCount\":{\"id\":0}}")) + .version(Optional.of(0L)) + .timestamp(Optional.of(1695976443161L)) + .expirationTimestamp(Optional.of(9223372036854775807L)) + .build()) + .build(), + ParquetFile.builder() + .file(ParquetFile.File.builder() .url(deltaTable1Path + "part-00007-3e861bbf-fe8b-44d0-bac7-712b8cf4608c-c000.snappy.parquet") .id(deltaTable1Path @@ -142,12 +156,15 @@ public static StorageManager createStorageManager() { .partitionValues(Map.of()) .size(478L) .stats( - "{\"numRecords\":1,\"minValues\":{\"id\":3},\"maxValues\":{\"id\":3},\"nullCount\":{\"id\":0}}") - .version(0L) - .timestamp(1695976443161L) - .expirationTimestamp(9223372036854775807L)), - new FileObject() - ._file(new FileObjectFile() + Optional.of( + "{\"numRecords\":1,\"minValues\":{\"id\":3},\"maxValues\":{\"id\":3},\"nullCount\":{\"id\":0}}")) + .version(Optional.of(0L)) + .timestamp(Optional.of(1695976443161L)) + .expirationTimestamp(Optional.of(9223372036854775807L)) + .build()) + .build(), + ParquetFile.builder() + .file(ParquetFile.File.builder() .url(deltaTable1Path + "part-00005-e7b9aad4-adf6-42ad-a17c-fbc93689b721-c000.snappy.parquet") .id(deltaTable1Path @@ -155,12 +172,15 @@ public static StorageManager createStorageManager() { .partitionValues(Map.of()) .size(478L) .stats( - "{\"numRecords\":1,\"minValues\":{\"id\":2},\"maxValues\":{\"id\":2},\"nullCount\":{\"id\":0}}") - .version(0L) - .timestamp(1695976443161L) - .expirationTimestamp(9223372036854775807L)), - new FileObject() - ._file(new FileObjectFile() + Optional.of( + "{\"numRecords\":1,\"minValues\":{\"id\":2},\"maxValues\":{\"id\":2},\"nullCount\":{\"id\":0}}")) + .version(Optional.of(0L)) + .timestamp(Optional.of(1695976443161L)) + .expirationTimestamp(Optional.of(9223372036854775807L)) + .build()) + .build(), + ParquetFile.builder() + .file(ParquetFile.File.builder() .url(deltaTable1Path + "part-00009-90280af8-7b24-4519-9e49-82db78a1651b-c000.snappy.parquet") .id(deltaTable1Path @@ -168,10 +188,13 @@ public static StorageManager createStorageManager() { .partitionValues(Map.of()) .size(478L) .stats( - "{\"numRecords\":1,\"minValues\":{\"id\":4},\"maxValues\":{\"id\":4},\"nullCount\":{\"id\":0}}") - .version(0L) - .timestamp(1695976443161L) - .expirationTimestamp(9223372036854775807L))); + Optional.of( + "{\"numRecords\":1,\"minValues\":{\"id\":4},\"maxValues\":{\"id\":4},\"nullCount\":{\"id\":0}}")) + .version(Optional.of(0L)) + .timestamp(Optional.of(1695976443161L)) + .expirationTimestamp(Optional.of(9223372036854775807L)) + .build()) + .build()); public static final Set s3DeltaTable1FilesWithoutPresignedUrl = Set.of( diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/FormatSerializationTest.java b/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/FormatSerializationTest.java new file mode 100644 index 000000000..d0cd55f62 --- /dev/null +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/FormatSerializationTest.java @@ -0,0 +1,24 @@ +package io.whitefox.api.deltasharing.model.v1; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class FormatSerializationTest { + ObjectMapper om = new ObjectMapper(); + + @Test + void serializationTest() throws JsonProcessingException { + var expected = "{\"provider\":\"parquet\"}"; + Assertions.assertEquals(expected, om.writer().writeValueAsString(new Format())); + } + + @Test + void deserializationTest() throws IOException { + var input = "{\"provider\":\"parquet\"}"; + var result = om.reader().readValue(input, Format.class); + Assertions.assertEquals(new Format(), result); + } +} diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaProtocolSerializationTest.java b/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaProtocolSerializationTest.java new file mode 100644 index 000000000..74359406b --- /dev/null +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaProtocolSerializationTest.java @@ -0,0 +1,26 @@ +package io.whitefox.api.deltasharing.model.v1.delta; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class DeltaProtocolSerializationTest { + ObjectMapper om = new ObjectMapper(); + + @Test + void serializationTest() throws JsonProcessingException { + var expected = + "{\"protocol\":{\"deltaProtocol\":{\"minReaderVersion\":3,\"minWriterVersion\":7}}}"; + Assertions.assertEquals(expected, om.writer().writeValueAsString(DeltaProtocol.of(3, 7))); + } + + @Test + void deserializationTest() throws IOException { + var input = + "{\"protocol\":{\"deltaProtocol\":{\"minReaderVersion\":3,\"minWriterVersion\":7}}}"; + var result = om.reader().readValue(input, DeltaProtocol.class); + Assertions.assertEquals(DeltaProtocol.of(3, 7), result); + } +} diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetAddFileSerializationTest.java b/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetAddFileSerializationTest.java new file mode 100644 index 000000000..dcfa20607 --- /dev/null +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetAddFileSerializationTest.java @@ -0,0 +1,47 @@ +package io.whitefox.api.deltasharing.model.v1.parquet; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import java.io.IOException; +import java.util.Map; +import java.util.Optional; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class ParquetAddFileSerializationTest { + String json = + "{\"add\":{\"url\":\"https://.s3.us-west-2.amazonaws.com/delta-exchange-test/table_cdf/date%3D2021-04-28/part-00000-591723a8-6a27-4240-a90e-57426f4736d2.c000.snappy.parquet?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20210501T010655Z&X-Amz-SignedHeaders=host&X-Amz-Expires=900&X-Amz-Credential=AKIAISZRDL4Q4Q7AIONA%2F20210501%2Fus-west-2%2Fs3%2Faws4_request&X-Amz-Signature=dd5d3ba1a179dc7e239d257feed046dccc95000d1aa0479ea6ff36d10d90ec94\",\"id\":\"591723a8-6a27-4240-a90e-57426f4736d2\",\"partitionValues\":{\"date\":\"2021-04-28\"},\"size\":573,\"timestamp\":1652140800000,\"version\":1,\"stats\":\"{\\\"numRecords\\\":1,\\\"minValues\\\":{\\\"eventTime\\\":\\\"2021-04-28T23:33:48.719Z\\\"},\\\"maxValues\\\":{\\\"eventTime\\\":\\\"2021-04-28T23:33:48.719Z\\\"},\\\"nullCount\\\":{\\\"eventTime\\\":0}}\",\"expirationTimestamp\":1652144400000}}"; + + ParquetAddFile object = ParquetAddFile.builder() + .add(ParquetAddFile.Add.builder() + .url( + "https://.s3.us-west-2.amazonaws.com/delta-exchange-test/table_cdf/date%3D2021-04-28/part-00000-591723a8-6a27-4240-a90e-57426f4736d2.c000.snappy.parquet?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20210501T010655Z&X-Amz-SignedHeaders=host&X-Amz-Expires=900&X-Amz-Credential=AKIAISZRDL4Q4Q7AIONA%2F20210501%2Fus-west-2%2Fs3%2Faws4_request&X-Amz-Signature=dd5d3ba1a179dc7e239d257feed046dccc95000d1aa0479ea6ff36d10d90ec94") + .id("591723a8-6a27-4240-a90e-57426f4736d2") + .size(573) + .timestamp(1652140800000L) + .version(1) + .partitionValues(Map.of("date", "2021-04-28")) + .stats( + Optional.of( + "{\"numRecords\":1,\"minValues\":{\"eventTime\":\"2021-04-28T23:33:48.719Z\"},\"maxValues\":{\"eventTime\":\"2021-04-28T23:33:48.719Z\"},\"nullCount\":{\"eventTime\":0}}")) + .expirationTimestamp(Optional.of(1652144400000L)) + .build()) + .build(); + ObjectMapper om; + + ParquetAddFileSerializationTest() { + om = new ObjectMapper(); + om.registerModule(new Jdk8Module()); + } + + @Test + void deserialize() throws IOException { + Assertions.assertEquals(object, om.reader().readValue(json, ParquetAddFile.class)); + } + + @Test + void serialize() throws JsonProcessingException { + Assertions.assertEquals(json, om.writer().writeValueAsString(object)); + } +} diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetCDFFileSerializationTest.java b/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetCDFFileSerializationTest.java new file mode 100644 index 000000000..80c580a1f --- /dev/null +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetCDFFileSerializationTest.java @@ -0,0 +1,45 @@ +package io.whitefox.api.deltasharing.model.v1.parquet; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import java.io.IOException; +import java.util.Map; +import java.util.Optional; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class ParquetCDFFileSerializationTest { + String json = + "{\"cdf\":{\"url\":\"https://.s3.us-west-2.amazonaws.com/delta-exchange-test/table_cdf/_change_data/date%3D2021-04-28/part-00000-591723a8-6a27-4240-a90e-57426f4736d2.c000.snappy.parquet?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20210501T010655Z&X-Amz-SignedHeaders=host&X-Amz-Expires=900&X-Amz-Credential=AKIAISZRDL4Q4Q7AIONA%2F20210501%2Fus-west-2%2Fs3%2Faws4_request&X-Amz-Signature=dd5d3ba1a179dc7e239d257feed046dccc95000d1aa0479ea6ff36d10d90ec94\",\"id\":\"591723a8-6a27-4240-a90e-57426f4736d2\",\"partitionValues\":{\"date\":\"2021-04-28\"},\"size\":573,\"timestamp\":1652140800000,\"version\":1,\"expirationTimestamp\":1652144400000}}"; + + ParquetCDFFile object = ParquetCDFFile.builder() + .cdf(ParquetCDFFile.CDF + .builder() + .url( + "https://.s3.us-west-2.amazonaws.com/delta-exchange-test/table_cdf/_change_data/date%3D2021-04-28/part-00000-591723a8-6a27-4240-a90e-57426f4736d2.c000.snappy.parquet?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20210501T010655Z&X-Amz-SignedHeaders=host&X-Amz-Expires=900&X-Amz-Credential=AKIAISZRDL4Q4Q7AIONA%2F20210501%2Fus-west-2%2Fs3%2Faws4_request&X-Amz-Signature=dd5d3ba1a179dc7e239d257feed046dccc95000d1aa0479ea6ff36d10d90ec94") + .id("591723a8-6a27-4240-a90e-57426f4736d2") + .size(573) + .partitionValues(Map.of("date", "2021-04-28")) + .timestamp(1652140800000L) + .version(1) + .expirationTimestamp(Optional.of(1652144400000L)) + .build()) + .build(); + ObjectMapper om; + + ParquetCDFFileSerializationTest() { + om = new ObjectMapper(); + om.registerModule(new Jdk8Module()); + } + + @Test + void deserialize() throws IOException { + Assertions.assertEquals(object, om.reader().readValue(json, ParquetCDFFile.class)); + } + + @Test + void serialize() throws JsonProcessingException { + Assertions.assertEquals(json, om.writer().writeValueAsString(object)); + } +} diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetFileSerializationTest.java b/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetFileSerializationTest.java new file mode 100644 index 000000000..41e6d2b4e --- /dev/null +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetFileSerializationTest.java @@ -0,0 +1,44 @@ +package io.whitefox.api.deltasharing.model.v1.parquet; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import java.io.IOException; +import java.util.Map; +import java.util.Optional; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class ParquetFileSerializationTest { + String json = + "{\"file\":{\"url\":\"https://.s3.us-west-2.amazonaws.com/delta-exchange-test/table2/date%3D2021-04-28/part-00000-591723a8-6a27-4240-a90e-57426f4736d2.c000.snappy.parquet?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20210501T010655Z&X-Amz-SignedHeaders=host&X-Amz-Expires=900&X-Amz-Credential=AKIAISZRDL4Q4Q7AIONA%2F20210501%2Fus-west-2%2Fs3%2Faws4_request&X-Amz-Signature=dd5d3ba1a179dc7e239d257feed046dccc95000d1aa0479ea6ff36d10d90ec94\",\"id\":\"591723a8-6a27-4240-a90e-57426f4736d2\",\"partitionValues\":{\"date\":\"2021-04-28\"},\"size\":573,\"stats\":\"{\\\"numRecords\\\":1,\\\"minValues\\\":{\\\"eventTime\\\":\\\"2021-04-28T23:33:48.719Z\\\"},\\\"maxValues\\\":{\\\"eventTime\\\":\\\"2021-04-28T23:33:48.719Z\\\"},\\\"nullCount\\\":{\\\"eventTime\\\":0}}\",\"expirationTimestamp\":1652140800000}}"; + ParquetFile object = ParquetFile.builder() + .file(ParquetFile.File.builder() + .url( + "https://.s3.us-west-2.amazonaws.com/delta-exchange-test/table2/date%3D2021-04-28/part-00000-591723a8-6a27-4240-a90e-57426f4736d2.c000.snappy.parquet?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20210501T010655Z&X-Amz-SignedHeaders=host&X-Amz-Expires=900&X-Amz-Credential=AKIAISZRDL4Q4Q7AIONA%2F20210501%2Fus-west-2%2Fs3%2Faws4_request&X-Amz-Signature=dd5d3ba1a179dc7e239d257feed046dccc95000d1aa0479ea6ff36d10d90ec94") + .id("591723a8-6a27-4240-a90e-57426f4736d2") + .size(573) + .partitionValues(Map.of("date", "2021-04-28")) + .stats( + Optional.of( + "{\"numRecords\":1,\"minValues\":{\"eventTime\":\"2021-04-28T23:33:48.719Z\"},\"maxValues\":{\"eventTime\":\"2021-04-28T23:33:48.719Z\"},\"nullCount\":{\"eventTime\":0}}")) + .expirationTimestamp(Optional.of(1652140800000L)) + .build()) + .build(); + ObjectMapper om; + + ParquetFileSerializationTest() { + om = new ObjectMapper(); + om.registerModule(new Jdk8Module()); + } + + @Test + void deserialize() throws IOException { + Assertions.assertEquals(object, om.reader().readValue(json, ParquetFile.class)); + } + + @Test + void serialize() throws JsonProcessingException { + Assertions.assertEquals(json, om.writer().writeValueAsString(object)); + } +} diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetMetadataSerializationTest.java b/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetMetadataSerializationTest.java new file mode 100644 index 000000000..3cd7ff2b1 --- /dev/null +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetMetadataSerializationTest.java @@ -0,0 +1,92 @@ +package io.whitefox.api.deltasharing.model.v1.parquet; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import io.whitefox.api.deltasharing.model.v1.Format; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class ParquetMetadataSerializationTest { + + ObjectMapper om; + + ParquetMetadataSerializationTest() { + om = new ObjectMapper(); + om.registerModule(new Jdk8Module()); + } + + @Test + void serializationWithDefaultValuesTest() throws JsonProcessingException { + var sut = ParquetMetadata.builder() + .metadata(ParquetMetadata.Metadata.builder() + .id("abc") + .format(new Format()) + .partitionColumns(List.of()) + .schemaString("") + .build()) + .build(); + + Assertions.assertEquals( + "{\"metaData\":{\"id\":\"abc\",\"format\":{\"provider\":\"parquet\"},\"schemaString\":\"\",\"partitionColumns\":[]}}", + om.writer().writeValueAsString(sut)); + } + + @Test + void serializationValuesTest() throws JsonProcessingException { + var sut = ParquetMetadata.builder() + .metadata(ParquetMetadata.Metadata.builder() + .id("f8d5c169-3d01-4ca3-ad9e-7dc3355aedb2") + .format(new Format()) + .partitionColumns(List.of("date")) + .schemaString( + "{\"type\":\"struct\",\"fields\":[{\"name\":\"eventTime\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}") + .size(Optional.of(123456L)) + .numFiles(Optional.of(5L)) + .configuration(Optional.of(Map.of("enableChangeDataFeed", "true"))) + .build()) + .build(); + + var expected = + "{\"metaData\":{\"id\":\"f8d5c169-3d01-4ca3-ad9e-7dc3355aedb2\",\"format\":{\"provider\":\"parquet\"},\"schemaString\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"eventTime\\\",\\\"type\\\":\\\"timestamp\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}},{\\\"name\\\":\\\"date\\\",\\\"type\\\":\\\"date\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}]}\",\"partitionColumns\":[\"date\"],\"configuration\":{\"enableChangeDataFeed\":\"true\"},\"size\":123456,\"numFiles\":5}}"; + Assertions.assertEquals(expected, om.writer().writeValueAsString(sut)); + } + + @Test + void deserializationTest() throws IOException { + var input = + "{\"metaData\":{\"partitionColumns\":[\"date\"],\"format\":{\"provider\":\"parquet\"},\"schemaString\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"eventTime\\\",\\\"type\\\":\\\"timestamp\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}},{\\\"name\\\":\\\"date\\\",\\\"type\\\":\\\"date\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}]}\",\"id\":\"f8d5c169-3d01-4ca3-ad9e-7dc3355aedb2\",\"configuration\":{\"enableChangeDataFeed\":\"true\"},\"size\":123456,\"numFiles\":5}}"; + var expected = ParquetMetadata.builder() + .metadata(ParquetMetadata.Metadata.builder() + .id("f8d5c169-3d01-4ca3-ad9e-7dc3355aedb2") + .format(new Format()) + .partitionColumns(List.of("date")) + .schemaString( + "{\"type\":\"struct\",\"fields\":[{\"name\":\"eventTime\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}") + .size(Optional.of(123456L)) + .numFiles(Optional.of(5L)) + .configuration(Optional.of(Map.of("enableChangeDataFeed", "true"))) + .build()) + .build(); + Assertions.assertEquals(expected, om.reader().readValue(input, ParquetMetadata.class)); + } + + @Test + void deserializationWithNullValuesTest() throws IOException { + var expected = ParquetMetadata.builder() + .metadata(ParquetMetadata.Metadata.builder() + .id("abc") + .format(new Format()) + .partitionColumns(List.of()) + .schemaString("") + .build()) + .build(); + var input = + "{\"metaData\":{\"id\":\"abc\",\"format\":{\"provider\":\"parquet\"},\"schemaString\":\"\",\"partitionColumns\":[]}}"; + Assertions.assertEquals(expected, om.reader().readValue(input, ParquetMetadata.class)); + } +} diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetProtocolSerializationTest.java b/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetProtocolSerializationTest.java new file mode 100644 index 000000000..b23964b3d --- /dev/null +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetProtocolSerializationTest.java @@ -0,0 +1,25 @@ +package io.whitefox.api.deltasharing.model.v1.parquet; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class ParquetProtocolSerializationTest { + ObjectMapper om = new ObjectMapper(); + + @Test + void serializeProtocol() throws JsonProcessingException { + Assertions.assertEquals( + "{\"protocol\":{\"minReaderVersion\":1}}", + om.writer().writeValueAsString(ParquetProtocol.ofMinReaderVersion(1))); + } + + @Test + void deserializeProtocol() throws IOException { + var result = + om.reader().readValue("{\"protocol\":{\"minReaderVersion\":1}}", ParquetProtocol.class); + Assertions.assertEquals(ParquetProtocol.ofMinReaderVersion(1), result); + } +} diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetRemoveFileSerializationTest.java b/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetRemoveFileSerializationTest.java new file mode 100644 index 000000000..3cae3e910 --- /dev/null +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/parquet/ParquetRemoveFileSerializationTest.java @@ -0,0 +1,46 @@ +package io.whitefox.api.deltasharing.model.v1.parquet; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import java.io.IOException; +import java.util.Map; +import java.util.Optional; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class ParquetRemoveFileSerializationTest { + + String json = + "{\"remove\":{\"url\":\"https://.s3.us-west-2.amazonaws.com/delta-exchange-test/table_cdf/date%3D2021-04-28/part-00000-591723a8-6a27-4240-a90e-57426f4736d2.c000.snappy.parquet?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20210501T010655Z&X-Amz-SignedHeaders=host&X-Amz-Expires=900&X-Amz-Credential=AKIAISZRDL4Q4Q7AIONA%2F20210501%2Fus-west-2%2Fs3%2Faws4_request&X-Amz-Signature=dd5d3ba1a179dc7e239d257feed046dccc95000d1aa0479ea6ff36d10d90ec94\",\"id\":\"591723a8-6a27-4240-a90e-57426f4736d2\",\"partitionValues\":{\"date\":\"2021-04-28\"},\"size\":573,\"timestamp\":1652140800000,\"version\":1,\"expirationTimestamp\":1652144400000}}"; + + ParquetRemoveFile object = ParquetRemoveFile.builder() + .remove(ParquetRemoveFile.Remove.builder() + .url( + "https://.s3.us-west-2.amazonaws.com/delta-exchange-test/table_cdf/date%3D2021-04-28/part-00000-591723a8-6a27-4240-a90e-57426f4736d2.c000.snappy.parquet?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20210501T010655Z&X-Amz-SignedHeaders=host&X-Amz-Expires=900&X-Amz-Credential=AKIAISZRDL4Q4Q7AIONA%2F20210501%2Fus-west-2%2Fs3%2Faws4_request&X-Amz-Signature=dd5d3ba1a179dc7e239d257feed046dccc95000d1aa0479ea6ff36d10d90ec94") + .id("591723a8-6a27-4240-a90e-57426f4736d2") + .size(573) + .partitionValues(Map.of("date", "2021-04-28")) + .timestamp(1652140800000L) + .version(1) + .expirationTimestamp(Optional.of(1652144400000L)) + .build()) + .build(); + + ObjectMapper om; + + ParquetRemoveFileSerializationTest() { + om = new ObjectMapper(); + om.registerModule(new Jdk8Module()); + } + + @Test + void deserialize() throws IOException { + Assertions.assertEquals(object, om.reader().readValue(json, ParquetRemoveFile.class)); + } + + @Test + void serialize() throws JsonProcessingException { + Assertions.assertEquals(json, om.writer().writeValueAsString(object)); + } +} diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplAwsTest.java b/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplAwsTest.java index aa84edf03..1c0f3341d 100644 --- a/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplAwsTest.java +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplAwsTest.java @@ -14,17 +14,24 @@ import io.whitefox.api.OpenApiValidatorUtils; import io.whitefox.api.deltasharing.SampleTables; import io.whitefox.api.deltasharing.model.FileObjectWithoutPresignedUrl; -import io.whitefox.api.deltasharing.model.v1.generated.*; -import io.whitefox.core.*; +import io.whitefox.api.deltasharing.model.v1.Format; +import io.whitefox.api.deltasharing.model.v1.parquet.ParquetMetadata; +import io.whitefox.api.deltasharing.model.v1.parquet.ParquetProtocol; +import io.whitefox.core.Principal; import io.whitefox.core.Share; +import io.whitefox.core.SharedTable; import io.whitefox.persistence.StorageManager; import jakarta.inject.Inject; import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; -import org.junit.jupiter.api.*; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; @@ -122,20 +129,22 @@ public void icebergTableMetadata() throws IOException { .split("\n"); assertEquals(2, responseBodyLines.length); assertEquals( - new ProtocolObject().protocol(new ProtocolObjectProtocol().minReaderVersion(1)), - objectMapper.reader().readValue(responseBodyLines[0], ProtocolObject.class)); + ParquetProtocol.ofMinReaderVersion(1), + objectMapper.reader().readValue(responseBodyLines[0], ParquetProtocol.class)); assertEquals( - new MetadataObject() - .metaData(new MetadataObjectMetaData() + ParquetMetadata.builder() + .metadata(ParquetMetadata.Metadata.builder() .id("7819530050735196523") - .name("metastore.test_glue_db.icebergtable1") - .format(new FormatObject().provider("parquet")) + .name(Optional.of("metastore.test_glue_db.icebergtable1")) + .format(new Format()) .schemaString( "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":false,\"metadata\":{}}]}") .partitionColumns(List.of()) - .version(1L) - ._configuration(Map.of("write.parquet.compression-codec", "zstd"))), - objectMapper.reader().readValue(responseBodyLines[1], MetadataObject.class)); + .version(Optional.of(1L)) + .configuration(Optional.of(Map.of("write.parquet.compression-codec", "zstd"))) + .build()) + .build(), + objectMapper.reader().readValue(responseBodyLines[1], ParquetMetadata.class)); } @DisabledOnOs(OS.WINDOWS) @@ -160,10 +169,10 @@ public void queryTableCurrentVersion() throws IOException { assertEquals( s3DeltaTable1Protocol, - objectMapper.reader().readValue(responseBodyLines[0], ProtocolObject.class)); + objectMapper.reader().readValue(responseBodyLines[0], ParquetProtocol.class)); assertEquals( s3DeltaTable1Metadata, - objectMapper.reader().readValue(responseBodyLines[1], MetadataObject.class)); + objectMapper.reader().readValue(responseBodyLines[1], ParquetMetadata.class)); var files = Arrays.stream(responseBodyLines) .skip(2) .map(line -> { @@ -203,10 +212,10 @@ public void queryTableByVersion() throws IOException { assertEquals( s3DeltaTable1Protocol, - objectMapper.reader().readValue(responseBodyLines[0], ProtocolObject.class)); + objectMapper.reader().readValue(responseBodyLines[0], ParquetProtocol.class)); assertEquals( s3DeltaTable1Metadata, - objectMapper.reader().readValue(responseBodyLines[1], MetadataObject.class)); + objectMapper.reader().readValue(responseBodyLines[1], ParquetMetadata.class)); var files = Arrays.stream(responseBodyLines) .skip(2) .map(line -> { diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplTest.java b/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplTest.java index 0b268b957..87ec825b4 100644 --- a/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplTest.java +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplTest.java @@ -15,7 +15,10 @@ import io.whitefox.api.deltasharing.SampleTables; import io.whitefox.api.deltasharing.encoders.DeltaPageTokenEncoder; import io.whitefox.api.deltasharing.model.FileObjectWithoutPresignedUrl; -import io.whitefox.api.deltasharing.model.v1.generated.*; +import io.whitefox.api.deltasharing.model.v1.Format; +import io.whitefox.api.deltasharing.model.v1.parquet.ParquetFile; +import io.whitefox.api.deltasharing.model.v1.parquet.ParquetMetadata; +import io.whitefox.api.deltasharing.model.v1.parquet.ParquetProtocol; import io.whitefox.core.services.ContentAndToken; import io.whitefox.persistence.StorageManager; import jakarta.inject.Inject; @@ -24,6 +27,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; @@ -193,20 +197,22 @@ public void deltaTableMetadata() throws IOException { .split("\n"); assertEquals(2, responseBodyLines.length); assertEquals( - new ProtocolObject().protocol(new ProtocolObjectProtocol().minReaderVersion(1)), - objectMapper.reader().readValue(responseBodyLines[0], ProtocolObject.class)); + ParquetProtocol.ofMinReaderVersion(1), + objectMapper.reader().readValue(responseBodyLines[0], ParquetProtocol.class)); assertEquals( - new MetadataObject() - .metaData(new MetadataObjectMetaData() + ParquetMetadata.builder() + .metadata(ParquetMetadata.Metadata.builder() .id("56d48189-cdbc-44f2-9b0e-2bded4c79ed7") - .name("table1") - .format(new FormatObject().provider("parquet")) + .name(Optional.of("table1")) + .format(new Format()) .schemaString( "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}") .partitionColumns(List.of()) - .version(0L) - ._configuration(Map.of())), - objectMapper.reader().readValue(responseBodyLines[1], MetadataObject.class)); + .version(Optional.of(0L)) + .configuration(Optional.of(Map.of())) + .build()) + .build(), + objectMapper.reader().readValue(responseBodyLines[1], ParquetMetadata.class)); } @Test @@ -243,20 +249,22 @@ public void icebergTableMetadata() throws IOException { .split("\n"); assertEquals(2, responseBodyLines.length); assertEquals( - new ProtocolObject().protocol(new ProtocolObjectProtocol().minReaderVersion(1)), - objectMapper.reader().readValue(responseBodyLines[0], ProtocolObject.class)); + ParquetProtocol.ofMinReaderVersion(1), + objectMapper.reader().readValue(responseBodyLines[0], ParquetProtocol.class)); assertEquals( - new MetadataObject() - .metaData(new MetadataObjectMetaData() + ParquetMetadata.builder() + .metadata(ParquetMetadata.Metadata.builder() .id("3369848726892806393") - .name("metastore.test_db.icebergtable1") - .format(new FormatObject().provider("parquet")) + .name(Optional.of("metastore.test_db.icebergtable1")) + .format(new Format()) .schemaString( "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":false,\"metadata\":{}}]}") .partitionColumns(List.of()) - .version(1L) - ._configuration(Map.of("write.parquet.compression-codec", "zstd"))), - objectMapper.reader().readValue(responseBodyLines[1], MetadataObject.class)); + .version(Optional.of(1L)) + .configuration(Optional.of(Map.of("write.parquet.compression-codec", "zstd"))) + .build()) + .build(), + objectMapper.reader().readValue(responseBodyLines[1], ParquetMetadata.class)); } @Test @@ -440,10 +448,10 @@ public void queryTableCurrentVersion() throws IOException { assertEquals( deltaTable1Protocol, - objectMapper.reader().readValue(responseBodyLines[0], ProtocolObject.class)); + objectMapper.reader().readValue(responseBodyLines[0], ParquetProtocol.class)); assertEquals( deltaTable1Metadata, - objectMapper.reader().readValue(responseBodyLines[1], MetadataObject.class)); + objectMapper.reader().readValue(responseBodyLines[1], ParquetMetadata.class)); var files = Arrays.stream(responseBodyLines) .skip(2) .map(line -> { @@ -484,10 +492,10 @@ public void queryTableByVersion() throws IOException { assertEquals( deltaTable1Protocol, - objectMapper.reader().readValue(responseBodyLines[0], ProtocolObject.class)); + objectMapper.reader().readValue(responseBodyLines[0], ParquetProtocol.class)); assertEquals( deltaTable1Metadata, - objectMapper.reader().readValue(responseBodyLines[1], MetadataObject.class)); + objectMapper.reader().readValue(responseBodyLines[1], ParquetMetadata.class)); var files = Arrays.stream(responseBodyLines) .skip(2) .map(line -> { @@ -528,15 +536,15 @@ public void queryTableByTs() throws IOException { assertEquals( deltaTable1Protocol, - objectMapper.reader().readValue(responseBodyLines[0], ProtocolObject.class)); + objectMapper.reader().readValue(responseBodyLines[0], ParquetProtocol.class)); assertEquals( deltaTableWithHistory1Metadata, - objectMapper.reader().readValue(responseBodyLines[1], MetadataObject.class)); + objectMapper.reader().readValue(responseBodyLines[1], ParquetMetadata.class)); assertDoesNotThrow(() -> Arrays.stream(responseBodyLines) .skip(2) .map(line -> { try { - return objectMapper.reader().readValue(line, FileObject.class); + return objectMapper.reader().readValue(line, ParquetFile.class); } catch (IOException e) { throw new RuntimeException(e); } From 70df5b477af47afe17810d6c7734aba617e88288 Mon Sep 17 00:00:00 2001 From: Antonio Murgia Date: Wed, 31 Jan 2024 15:39:16 +0100 Subject: [PATCH 3/5] Add Delta format and protocol. still missing files --- protocol/delta-sharing-protocol-api.yml | 235 ------------------ .../api/deltasharing/model/v1/Format.java | 22 +- .../model/v1/delta/DeltaFiles.yaml | 146 +++++++++++ .../model/v1/delta/DeltaInternalFormat.java | 41 +++ .../model/v1/delta/DeltaInternalMetadata.java | 70 ++++++ .../model/v1/delta/DeltaMetadata.java | 53 ++++ .../server/DeltaSharesApiImpl.java | 1 + .../model/v1/FormatSerializationTest.java | 8 + .../DeltaInternalFormatSerializationTest.java | 39 +++ ...eltaInternalMetadataSerializationTest.java | 40 +++ .../delta/DeltaMetadataSerializationTest.java | 48 ++++ .../server/DeltaSharesApiImplTest.java | 4 +- 12 files changed, 467 insertions(+), 240 deletions(-) create mode 100644 server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaFiles.yaml create mode 100644 server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaInternalFormat.java create mode 100644 server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaInternalMetadata.java create mode 100644 server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaMetadata.java create mode 100644 server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaInternalFormatSerializationTest.java create mode 100644 server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaInternalMetadataSerializationTest.java create mode 100644 server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaMetadataSerializationTest.java diff --git a/protocol/delta-sharing-protocol-api.yml b/protocol/delta-sharing-protocol-api.yml index b88fc043a..2a57040c6 100644 --- a/protocol/delta-sharing-protocol-api.yml +++ b/protocol/delta-sharing-protocol-api.yml @@ -767,241 +767,6 @@ components: type: string message: type: string - - DeltaTableQueryResponseObject: - type: object - properties: - protocol: - # it refers to ./delta-sharing-protocol.md#protocol - $ref: '#/components/schemas/DeltaProtocolObject' - metadata: - # it refers to ./delta-sharing-protocol.md#metadata - $ref: '#/components/schemas/DeltaMetadataObject' - files: - type: array - items: - # it refers to ./delta-sharing-protocol.md#file - $ref: '#/components/schemas/DeltaFileObject' - DeltaProtocolObject: - type: object - properties: - protocol: - type: object - properties: - deltaProtocol: - type: object - properties: - minReaderVersion: - type: integer - format: int32 - minWriterVersion: - type: integer - format: int32 - DeltaFormatObject: - type: object - properties: - provider: - type: string - options: - type: object - additionalProperties: - type: string - required: - - provider - DeltaMetadata: - type: object - description: see https://github.com/delta-io/delta/blob/master/PROTOCOL.md#change-metadata - required: - - id - - format - - schemaString - - partitionColumns - - configuration - properties: - id: - type: string - name: - type: string - description: - type: string - format: - $ref: '#/components/schemas/DeltaFormatObject' - schemaString: - type: string - partitionColumns: - type: array - items: - type: string - createdTime: - type: integer - format: int64 - configuration: - type: object - additionalProperties: - type: string - DeltaMetadataObject: - type: object - properties: - metaData: - type: object - properties: - version: - type: integer - format: int64 - size: - type: integer - format: int64 - numFiles: - type: integer - format: int64 - deltaMetadata: - $ref: '#/components/schemas/DeltaMetadata' - required: [ deltaMetadata ] - DeltaFileObject: - required: - - id - - deltaSingleAction - properties: - id: - type: string - deletionVectorFileId: - type: string - version: - type: integer - format: int64 - timestamp: - type: integer - format: int64 - expirationTimestamp: - type: integer - format: int64 - deltaSingleAction: - $ref: '#/components/schemas/DeltaSingleAction' - DeltaSingleAction: - type: object - description: only one field can be not null, container of delta actions such as file, add, cdf or remove see https://github.com/delta-io/delta/tree/master/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions - properties: - add: - $ref: '#/components/schemas/DeltaAddFileForCDFAction' - cdf: - $ref: '#/components/schemas/DeltaAddCDCFileAction' - file: - $ref: '#/components/schemas/DeltaAddFileAction' - remove: - $ref: '#/components/schemas/DeltaRemoveFileAction' - DeltaAddFileForCDFAction: - type: object - description: see io.delta.sharing.server.model.AddFileForCDF - properties: - url: - type: string - id: - type: string - partitionValues: - type: object - additionalProperties: - type: string - size: - type: integer - format: int64 - expirationTimestamp: - type: integer - format: int64 - version: - type: integer - format: int64 - timestamp: - type: integer - format: int64 - stats: - type: string - DeltaAddCDCFileAction: - type: object - description: see io.delta.sharing.server.model.AddCDCFile - properties: - url: - type: string - id: - type: string - partitionValues: - type: object - additionalProperties: - type: string - size: - type: integer - format: int64 - expirationTimestamp: - type: integer - format: int64 - timestamp: - type: integer - format: int64 - version: - type: integer - format: int64 - DeltaAddFileAction: - type: object - description: see io.delta.sharing.server.model.AddFile - properties: - url: - type: string - id: - type: string - partitionValues: - type: object - additionalProperties: - type: string - size: - type: integer - format: int64 - stats: - type: string - expirationTimestamp: - type: integer - format: int64 - timestamp: - type: integer - format: int64 - version: - type: integer - format: int64 - DeltaRemoveFileAction: - type: object - properties: - url: - type: string - id: - type: string - partitionValues: - type: object - additionalProperties: - type: string - size: - type: integer - format: int64 - expirationTimestamp: - type: integer - format: int64 - timestamp: - type: integer - format: int64 - version: - type: integer - format: int64 - DeltaEndStreamAction: - description: An action that is returned as the last line of the streaming response. It allows the server to include additional data that might be dynamically generated while the streaming message is sent - type: object - properties: - refreshToken: - type: string - description: a token used to refresh pre-signed urls for a long running query - nextPageToken: - type: string - description: a token used to retrieve the subsequent page of a query - minUrlExpirationTimestamp: - description: the minimum url expiration timestamp of the urls returned in current response - type: integer - format: int64 responses: "400": description: The request is malformed diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/Format.java b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/Format.java index c83dcf540..04ba2d488 100644 --- a/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/Format.java +++ b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/Format.java @@ -1,10 +1,26 @@ package io.whitefox.api.deltasharing.model.v1; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import lombok.Value; +import lombok.EqualsAndHashCode; -@Value +@EqualsAndHashCode public class Format { + private static final String PARQUET = "parquet"; + @JsonProperty - String provider = "parquet"; + public String provider() { + return PARQUET; + } + + public Format() { + this(PARQUET); + } + + @JsonCreator + private Format(@JsonProperty("provider") String provider) { + if (!"parquet".equalsIgnoreCase(provider)) { + throw new IllegalArgumentException("Provider must be " + PARQUET); + } + } } diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaFiles.yaml b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaFiles.yaml new file mode 100644 index 000000000..89daab376 --- /dev/null +++ b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaFiles.yaml @@ -0,0 +1,146 @@ +DeltaFileObject: + required: + - id + - deltaSingleAction + properties: + id: + type: string + deletionVectorFileId: + type: string + version: + type: integer + format: int64 + timestamp: + type: integer + format: int64 + expirationTimestamp: + type: integer + format: int64 + deltaSingleAction: + $ref: '#/components/schemas/DeltaSingleAction' +DeltaSingleAction: + type: object + description: only one field can be not null, container of delta actions such as file, add, cdf or remove see https://github.com/delta-io/delta/tree/master/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions + properties: + add: + $ref: '#/components/schemas/DeltaAddFileForCDFAction' + cdf: + $ref: '#/components/schemas/DeltaAddCDCFileAction' + file: + $ref: '#/components/schemas/DeltaAddFileAction' + remove: + $ref: '#/components/schemas/DeltaRemoveFileAction' +DeltaAddFileForCDFAction: + type: object + description: see io.delta.sharing.server.model.AddFileForCDF + properties: + url: + type: string + id: + type: string + partitionValues: + type: object + additionalProperties: + type: string + size: + type: integer + format: int64 + expirationTimestamp: + type: integer + format: int64 + version: + type: integer + format: int64 + timestamp: + type: integer + format: int64 + stats: + type: string +DeltaAddCDCFileAction: + type: object + description: see io.delta.sharing.server.model.AddCDCFile + properties: + url: + type: string + id: + type: string + partitionValues: + type: object + additionalProperties: + type: string + size: + type: integer + format: int64 + expirationTimestamp: + type: integer + format: int64 + timestamp: + type: integer + format: int64 + version: + type: integer + format: int64 +DeltaAddFileAction: + type: object + description: see io.delta.sharing.server.model.AddFile + properties: + url: + type: string + id: + type: string + partitionValues: + type: object + additionalProperties: + type: string + size: + type: integer + format: int64 + stats: + type: string + expirationTimestamp: + type: integer + format: int64 + timestamp: + type: integer + format: int64 + version: + type: integer + format: int64 +DeltaRemoveFileAction: + type: object + properties: + url: + type: string + id: + type: string + partitionValues: + type: object + additionalProperties: + type: string + size: + type: integer + format: int64 + expirationTimestamp: + type: integer + format: int64 + timestamp: + type: integer + format: int64 + version: + type: integer + format: int64 +# This is undocumented, need to add to the protocol +#DeltaEndStreamAction: +# description: An action that is returned as the last line of the streaming response. It allows the server to include additional data that might be dynamically generated while the streaming message is sent +# type: object +# properties: +# refreshToken: +# type: string +# description: a token used to refresh pre-signed urls for a long running query +# nextPageToken: +# type: string +# description: a token used to retrieve the subsequent page of a query +# minUrlExpirationTimestamp: +# description: the minimum url expiration timestamp of the urls returned in current response +# type: integer +# format: int64 \ No newline at end of file diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaInternalFormat.java b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaInternalFormat.java new file mode 100644 index 000000000..00cb0287c --- /dev/null +++ b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaInternalFormat.java @@ -0,0 +1,41 @@ +package io.whitefox.api.deltasharing.model.v1.delta; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Map; +import java.util.Optional; +import lombok.EqualsAndHashCode; + +@EqualsAndHashCode +@JsonInclude(JsonInclude.Include.NON_ABSENT) +public class DeltaInternalFormat { + private static final String PARQUET = "parquet"; + + private final Optional> options; + + @JsonProperty + public Optional> options() { + return options; + } + + @JsonProperty + public String provider() { + return PARQUET; + } + + @JsonCreator + private DeltaInternalFormat( + @JsonProperty("provider") String provider, + @JsonProperty("options") Map options) { + + if (!PARQUET.equalsIgnoreCase(provider)) { + throw new IllegalArgumentException("Provider must be " + PARQUET); + } + this.options = Optional.ofNullable(options).map(Map::copyOf); + } + + public DeltaInternalFormat(Optional> options) { + this(PARQUET, options.orElse(null)); + } +} diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaInternalMetadata.java b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaInternalMetadata.java new file mode 100644 index 000000000..da4549cc0 --- /dev/null +++ b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaInternalMetadata.java @@ -0,0 +1,70 @@ +package io.whitefox.api.deltasharing.model.v1.delta; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; +import lombok.experimental.SuperBuilder; +import lombok.extern.jackson.Jacksonized; + +@SuperBuilder +@Jacksonized +@Value +@JsonInclude(JsonInclude.Include.NON_ABSENT) +public class DeltaInternalMetadata { + + /** + * Unique identifier for this table. + */ + @JsonProperty + @NonNull String id; + + /** + * User-provided identifier for this table. + */ + @JsonProperty + @Builder.Default + Optional name = Optional.empty(); + + /** + * User-provided description for this table. + */ + @JsonProperty + @Builder.Default + Optional description = Optional.empty(); + + /** + * Specification of the encoding for the files stored in the table. + */ + @JsonProperty + @NonNull DeltaInternalFormat format; + + /** + * Schema of the table. + */ + @JsonProperty + @NonNull String schemaString; + + /** + * An array containing the names of columns by which the data should be partitioned. + */ + @JsonProperty + @NonNull List partitionColumns; + + /** + * The time when this metadata action is created, in milliseconds since the Unix epoch. + */ + @JsonProperty + @Builder.Default + Optional createdTime = Optional.empty(); + + /** + * A map containing configuration options for the metadata action. + */ + @JsonProperty + @NonNull Map configuration; +} diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaMetadata.java b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaMetadata.java new file mode 100644 index 000000000..bf09d1c88 --- /dev/null +++ b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaMetadata.java @@ -0,0 +1,53 @@ +package io.whitefox.api.deltasharing.model.v1.delta; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Optional; +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; +import lombok.experimental.SuperBuilder; +import lombok.extern.jackson.Jacksonized; + +@Value +@SuperBuilder +@Jacksonized +class DeltaMetadata { + @JsonProperty("metaData") + @NonNull Metadata metadata; + + @Value + @SuperBuilder + @Jacksonized + @JsonInclude(JsonInclude.Include.NON_ABSENT) + public static class Metadata { + + /** + * Need to be parsed by a delta library as delta metadata. + */ + @JsonProperty + @NonNull DeltaInternalMetadata deltaMetadata; + + /** + * The table version the metadata corresponds to, returned when querying table data with a version or timestamp parameter, + * or cdf query with includeHistoricalMetadata set to true. + */ + @JsonProperty + @Builder.Default + Optional version = Optional.empty(); + + /** + * The size of the table in bytes, will be returned if available in the delta log. + */ + @JsonProperty + @Builder.Default + Optional size = Optional.empty(); + + /** + * The number of files in the table, will be returned if available in the delta log. + */ + @JsonProperty + @Builder.Default + Optional numFiles = Optional.empty(); + } +} diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImpl.java b/server/app/src/main/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImpl.java index e037bab94..feb1366ac 100644 --- a/server/app/src/main/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImpl.java +++ b/server/app/src/main/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImpl.java @@ -45,6 +45,7 @@ public DeltaSharesApiImpl( @Override public Response getShare(String share) { + // TODO split core and http models return wrapExceptions( () -> optionalToNotFound(shareService.getShare(share), s -> Response.ok(s).build()), diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/FormatSerializationTest.java b/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/FormatSerializationTest.java index d0cd55f62..c59f2f861 100644 --- a/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/FormatSerializationTest.java +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/FormatSerializationTest.java @@ -2,6 +2,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.exc.ValueInstantiationException; import java.io.IOException; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -21,4 +22,11 @@ void deserializationTest() throws IOException { var result = om.reader().readValue(input, Format.class); Assertions.assertEquals(new Format(), result); } + + @Test + void deserializationFailTest() throws IOException { + var input = "{\"provider\":\"pippo\"}"; + Assertions.assertThrows( + ValueInstantiationException.class, () -> om.reader().readValue(input, Format.class)); + } } diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaInternalFormatSerializationTest.java b/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaInternalFormatSerializationTest.java new file mode 100644 index 000000000..761b1a0e2 --- /dev/null +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaInternalFormatSerializationTest.java @@ -0,0 +1,39 @@ +package io.whitefox.api.deltasharing.model.v1.delta; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import java.io.IOException; +import java.util.Map; +import java.util.Optional; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class DeltaInternalFormatSerializationTest { + String json = "{\"provider\":\"parquet\",\"options\":{\"key\":\"value\"}}"; + DeltaInternalFormat object = new DeltaInternalFormat(Optional.of(Map.of("key", "value"))); + + ObjectMapper om; + + DeltaInternalFormatSerializationTest() { + om = new ObjectMapper(); + om.registerModule(new Jdk8Module()); + } + + @Test + void deserialize() throws IOException { + Assertions.assertEquals(object, om.reader().readValue(json, DeltaInternalFormat.class)); + } + + @Test + void deserializeWithoutOptions() throws IOException { + var json = "{\"provider\":\"parquet\"}"; + var object = new DeltaInternalFormat(Optional.empty()); + Assertions.assertEquals(object, om.reader().readValue(json, DeltaInternalFormat.class)); + } + + @Test + void serialize() throws JsonProcessingException { + Assertions.assertEquals(json, om.writer().writeValueAsString(object)); + } +} diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaInternalMetadataSerializationTest.java b/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaInternalMetadataSerializationTest.java new file mode 100644 index 000000000..ae919f34a --- /dev/null +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaInternalMetadataSerializationTest.java @@ -0,0 +1,40 @@ +package io.whitefox.api.deltasharing.model.v1.delta; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class DeltaInternalMetadataSerializationTest { + String json = + "{\"id\":\"f8d5c169-3d01-4ca3-ad9e-7dc3355aedb2\",\"format\":{\"provider\":\"parquet\"},\"schemaString\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"eventTime\\\",\\\"type\\\":\\\"timestamp\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}},{\\\"name\\\":\\\"date\\\",\\\"type\\\":\\\"date\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}]}\",\"partitionColumns\":[\"date\"],\"configuration\":{\"enableChangeDataFeed\":\"true\"}}"; + DeltaInternalMetadata object = DeltaInternalMetadata.builder() + .partitionColumns(List.of("date")) + .format(new DeltaInternalFormat(Optional.empty())) + .schemaString( + "{\"type\":\"struct\",\"fields\":[{\"name\":\"eventTime\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}") + .id("f8d5c169-3d01-4ca3-ad9e-7dc3355aedb2") + .configuration(Map.of("enableChangeDataFeed", "true")) + .build(); + ObjectMapper om; + + DeltaInternalMetadataSerializationTest() { + om = new ObjectMapper(); + om.registerModule(new Jdk8Module()); + } + + @Test + void deserialize() throws IOException { + Assertions.assertEquals(object, om.reader().readValue(json, DeltaInternalMetadata.class)); + } + + @Test + void serialize() throws JsonProcessingException { + Assertions.assertEquals(json, om.writer().writeValueAsString(object)); + } +} diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaMetadataSerializationTest.java b/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaMetadataSerializationTest.java new file mode 100644 index 000000000..ea6a355bb --- /dev/null +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaMetadataSerializationTest.java @@ -0,0 +1,48 @@ +package io.whitefox.api.deltasharing.model.v1.delta; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class DeltaMetadataSerializationTest { + + String json = + "{\"metaData\":{\"deltaMetadata\":{\"id\":\"f8d5c169-3d01-4ca3-ad9e-7dc3355aedb2\",\"format\":{\"provider\":\"parquet\"},\"schemaString\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"eventTime\\\",\\\"type\\\":\\\"timestamp\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}},{\\\"name\\\":\\\"date\\\",\\\"type\\\":\\\"date\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}]}\",\"partitionColumns\":[\"date\"],\"configuration\":{\"enableChangeDataFeed\":\"true\"}},\"version\":20,\"size\":123456,\"numFiles\":5}}"; + DeltaMetadata object = DeltaMetadata.builder() + .metadata(DeltaMetadata.Metadata.builder() + .version(Optional.of(20L)) + .size(Optional.of(123456L)) + .numFiles(Optional.of(5L)) + .deltaMetadata(DeltaInternalMetadata.builder() + .partitionColumns(List.of("date")) + .format(new DeltaInternalFormat(Optional.empty())) + .schemaString( + "{\"type\":\"struct\",\"fields\":[{\"name\":\"eventTime\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}") + .id("f8d5c169-3d01-4ca3-ad9e-7dc3355aedb2") + .configuration(Map.of("enableChangeDataFeed", "true")) + .build()) + .build()) + .build(); + ObjectMapper om; + + DeltaMetadataSerializationTest() { + om = new ObjectMapper(); + om.registerModule(new Jdk8Module()); + } + + @Test + void deserialize() throws IOException { + Assertions.assertEquals(object, om.reader().readValue(json, DeltaMetadata.class)); + } + + @Test + void serialize() throws JsonProcessingException { + Assertions.assertEquals(json, om.writer().writeValueAsString(object)); + } +} diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplTest.java b/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplTest.java index 87ec825b4..b0c9ea4e1 100644 --- a/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplTest.java +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplTest.java @@ -404,10 +404,10 @@ public void queryTableCurrentVersionWithPredicates() throws IOException { assertEquals( deltaTable1Protocol, - objectMapper.reader().readValue(responseBodyLines[0], ProtocolObject.class)); + objectMapper.reader().readValue(responseBodyLines[0], ParquetProtocol.class)); assertEquals( deltaTable1Metadata, - objectMapper.reader().readValue(responseBodyLines[1], MetadataObject.class)); + objectMapper.reader().readValue(responseBodyLines[1], ParquetMetadata.class)); var files = Arrays.stream(responseBodyLines) .skip(2) .map(line -> { From ec01c35f1704ffe84a8f5ca550e957a379eda42b Mon Sep 17 00:00:00 2001 From: Antonio Murgia Date: Fri, 2 Feb 2024 16:23:09 +0100 Subject: [PATCH 4/5] Add DeltaFile --- .../model/v1/delta/DeltaFile.java | 73 +++++++++ .../model/v1/delta/DeltaFiles.yaml | 146 ------------------ .../v1/delta/DeltaFileSerializationTest.java | 44 ++++++ 3 files changed, 117 insertions(+), 146 deletions(-) create mode 100644 server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaFile.java delete mode 100644 server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaFiles.yaml create mode 100644 server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaFileSerializationTest.java diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaFile.java b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaFile.java new file mode 100644 index 000000000..b0426f406 --- /dev/null +++ b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaFile.java @@ -0,0 +1,73 @@ +package io.whitefox.api.deltasharing.model.v1.delta; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; +import java.util.Optional; +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; +import lombok.experimental.SuperBuilder; +import lombok.extern.jackson.Jacksonized; + +@Value +@SuperBuilder +@Jacksonized +public class DeltaFile { + + @JsonProperty + @NonNull File file; + + @Value + @SuperBuilder + @Jacksonized + @JsonInclude(JsonInclude.Include.NON_ABSENT) + public static class File { + + /** + * A unique string for the file in a table. + * The same file is guaranteed to have the same id across multiple requests. + * A client may cache the file content and use this id as a key to decide whether to use the cached file content. + */ + @JsonProperty + @NonNull String id; + + /** + * A unique string for the deletion vector file in a table. + * The same deletion vector file is guaranteed to have the same id across multiple requests. + * A client may cache the file content and use this id as a key to decide whether to use the cached file content. + */ + @JsonProperty + @Builder.Default + Optional deletionVectorFileId = Optional.empty(); + + /** + * The table version of the file, returned when querying a table data with a version or timestamp parameter. + */ + @JsonProperty + @Builder.Default + Optional version = Optional.empty(); + + /** + * The unix timestamp corresponding to the table version of the file, in milliseconds, + * returned when querying a table data with a version or timestamp parameter. + */ + @JsonProperty + @Builder.Default + Optional timestamp = Optional.empty(); + + /** + * The unix timestamp corresponding to the expiration of the url, in milliseconds, + * returned when the server supports the feature. + */ + @JsonProperty + @Builder.Default + Optional expirationTimestamp = Optional.empty(); + + /** + * Need to be parsed by a delta library as a delta single action, the path field is replaced by pr-signed url. + */ + @JsonProperty + @NonNull JsonNode deltaSingleAction; + } +} diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaFiles.yaml b/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaFiles.yaml deleted file mode 100644 index 89daab376..000000000 --- a/server/app/src/main/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaFiles.yaml +++ /dev/null @@ -1,146 +0,0 @@ -DeltaFileObject: - required: - - id - - deltaSingleAction - properties: - id: - type: string - deletionVectorFileId: - type: string - version: - type: integer - format: int64 - timestamp: - type: integer - format: int64 - expirationTimestamp: - type: integer - format: int64 - deltaSingleAction: - $ref: '#/components/schemas/DeltaSingleAction' -DeltaSingleAction: - type: object - description: only one field can be not null, container of delta actions such as file, add, cdf or remove see https://github.com/delta-io/delta/tree/master/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions - properties: - add: - $ref: '#/components/schemas/DeltaAddFileForCDFAction' - cdf: - $ref: '#/components/schemas/DeltaAddCDCFileAction' - file: - $ref: '#/components/schemas/DeltaAddFileAction' - remove: - $ref: '#/components/schemas/DeltaRemoveFileAction' -DeltaAddFileForCDFAction: - type: object - description: see io.delta.sharing.server.model.AddFileForCDF - properties: - url: - type: string - id: - type: string - partitionValues: - type: object - additionalProperties: - type: string - size: - type: integer - format: int64 - expirationTimestamp: - type: integer - format: int64 - version: - type: integer - format: int64 - timestamp: - type: integer - format: int64 - stats: - type: string -DeltaAddCDCFileAction: - type: object - description: see io.delta.sharing.server.model.AddCDCFile - properties: - url: - type: string - id: - type: string - partitionValues: - type: object - additionalProperties: - type: string - size: - type: integer - format: int64 - expirationTimestamp: - type: integer - format: int64 - timestamp: - type: integer - format: int64 - version: - type: integer - format: int64 -DeltaAddFileAction: - type: object - description: see io.delta.sharing.server.model.AddFile - properties: - url: - type: string - id: - type: string - partitionValues: - type: object - additionalProperties: - type: string - size: - type: integer - format: int64 - stats: - type: string - expirationTimestamp: - type: integer - format: int64 - timestamp: - type: integer - format: int64 - version: - type: integer - format: int64 -DeltaRemoveFileAction: - type: object - properties: - url: - type: string - id: - type: string - partitionValues: - type: object - additionalProperties: - type: string - size: - type: integer - format: int64 - expirationTimestamp: - type: integer - format: int64 - timestamp: - type: integer - format: int64 - version: - type: integer - format: int64 -# This is undocumented, need to add to the protocol -#DeltaEndStreamAction: -# description: An action that is returned as the last line of the streaming response. It allows the server to include additional data that might be dynamically generated while the streaming message is sent -# type: object -# properties: -# refreshToken: -# type: string -# description: a token used to refresh pre-signed urls for a long running query -# nextPageToken: -# type: string -# description: a token used to retrieve the subsequent page of a query -# minUrlExpirationTimestamp: -# description: the minimum url expiration timestamp of the urls returned in current response -# type: integer -# format: int64 \ No newline at end of file diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaFileSerializationTest.java b/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaFileSerializationTest.java new file mode 100644 index 000000000..a233f1ce7 --- /dev/null +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/model/v1/delta/DeltaFileSerializationTest.java @@ -0,0 +1,44 @@ +package io.whitefox.api.deltasharing.model.v1.delta; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import java.io.IOException; +import java.util.Optional; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class DeltaFileSerializationTest { + String singleActionJson; + String json; + + DeltaFile object; + ObjectMapper om; + + public DeltaFileSerializationTest() throws JsonProcessingException { + om = new ObjectMapper(); + om.registerModule(new Jdk8Module()); + singleActionJson = + "{\"add\":{\"dataChange\":false,\"modificationTime\":0,\"partitionValues\":{\"date\":\"2021-04-28\"},\"path\":\"https://s3-bucket-name.s3.us-west-2.amazonaws.com/delta-exchange-test/table2/date%3D2021-04-28/part-00000-591723a8-6a27-4240-a90e-57426f4736d2.c000.snappy.parquet?...\",\"pathAsUri\":\"https://s3-bucket-name.s3.us-west-2.amazonaws.com/delta-exchange-test/table2/date%3D2021-04-28/part-00000-591723a8-6a27-4240-a90e-57426f4736d2.c000.snappy.parquet?...\",\"size\":0,\"stats\":\"{\\\"numRecords\\\":1,\\\"minValues\\\":{\\\"eventTime\\\":\\\"2021-04-28T23:33:48.719Z\\\"},\\\"maxValues\\\":{\\\"eventTime\\\":\\\"2021-04-28T23:33:48.719Z\\\"},\\\"nullCount\\\":{\\\"eventTime\\\":0}}\"}}"; + json = + "{\"file\":{\"id\":\"591723a8-6a27-4240-a90e-57426f4736d2\",\"expirationTimestamp\":1652140800000,\"deltaSingleAction\":" + + singleActionJson + "}}"; + object = DeltaFile.builder() + .file(DeltaFile.File.builder() + .id("591723a8-6a27-4240-a90e-57426f4736d2") + .expirationTimestamp(Optional.of(1652140800000L)) + .deltaSingleAction(om.readTree(singleActionJson)) + .build()) + .build(); + } + + @Test + void serialize() throws IOException { + Assertions.assertEquals(json, om.writer().writeValueAsString(object)); + } + + @Test + void deserialize() throws IOException { + Assertions.assertEquals(object, om.reader().readValue(json, DeltaFile.class)); + } +} From f10e484d13413a1bfe239b61565c29da773e0f64 Mon Sep 17 00:00:00 2001 From: Antonio Murgia Date: Fri, 9 Feb 2024 10:49:03 +0100 Subject: [PATCH 5/5] Remove TODO --- .../io/whitefox/api/deltasharing/server/DeltaSharesApiImpl.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImpl.java b/server/app/src/main/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImpl.java index feb1366ac..e037bab94 100644 --- a/server/app/src/main/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImpl.java +++ b/server/app/src/main/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImpl.java @@ -45,7 +45,6 @@ public DeltaSharesApiImpl( @Override public Response getShare(String share) { - // TODO split core and http models return wrapExceptions( () -> optionalToNotFound(shareService.getShare(share), s -> Response.ok(s).build()),