Skip to content

Commit eba1ae8

Browse files
committed
protocol not compliant
1 parent ceea491 commit eba1ae8

File tree

15 files changed

+286
-189
lines changed

15 files changed

+286
-189
lines changed

server/app/src/main/java/io/whitefox/api/deltasharing/DeltaMappers.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import io.whitefox.core.Schema;
77
import io.whitefox.core.Share;
88
import java.util.Arrays;
9+
import java.util.List;
910
import java.util.Map;
1011
import java.util.Optional;
1112
import java.util.stream.Collectors;
@@ -31,16 +32,17 @@ public static ReadTableRequest api2ReadTableRequest(QueryRequest request) {
3132
if (request.getVersion() != null && request.getTimestamp() == null) {
3233
return new ReadTableRequest.ReadTableVersion(
3334
request.getPredicateHints(),
34-
jsonPredicateHints, Optional.ofNullable(request.getLimitHint()),
35+
List.of(), Optional.ofNullable(request.getLimitHint()),
3536
request.getVersion());
3637
} else if (request.getVersion() == null && request.getTimestamp() != null) {
3738
return new ReadTableRequest.ReadTableAsOfTimestamp(
3839
request.getPredicateHints(),
40+
List.of(),
3941
Optional.ofNullable(request.getLimitHint()),
4042
CommonMappers.parseTimestamp(request.getTimestamp()));
4143
} else if (request.getVersion() == null && request.getTimestamp() == null) {
4244
return new ReadTableRequest.ReadTableCurrentVersion(
43-
request.getPredicateHints(), Optional.ofNullable(request.getLimitHint()));
45+
request.getPredicateHints(), List.of(), Optional.ofNullable(request.getLimitHint()));
4446
} else {
4547
throw new IllegalArgumentException("Cannot specify both version and timestamp");
4648
}

server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplTest.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,56 @@ public void getTableVersionBadTimestamp() {
292292
.statusCode(502);
293293
}
294294

295+
@DisabledOnOs(OS.WINDOWS)
296+
@Test
297+
public void queryTableCurrentVersionWithPredicates() throws IOException {
298+
var responseBodyLines = given()
299+
.when()
300+
.filter(deltaFilter)
301+
.body("{\"jsonPredicateHints\": {\"op\":\"and\",\"children\":\"[" +
302+
" {op:not,children:[" +
303+
" {op:isNull\",\"children\":[\n" +
304+
" {\"op\":\"column\",\"name\":\"birthday\",\"valueType\":\"date\"}]}]},\n" +
305+
" {\"op\":\"equal\",\"children\":[\n" +
306+
" {\"op\":\"column\",\"name\":\"birthday\",\"valueType\":\"date\"},\n" +
307+
" {\"op\":\"literal\",\"value\":\"2020-01-01\",\"valueType\":\"date\"}]}\n" +
308+
"]\"}}")
309+
.header(new Header("Content-Type", "application/json"))
310+
.post(
311+
"delta-api/v1/shares/{share}/schemas/{schema}/tables/{table}/query",
312+
"name",
313+
"default",
314+
"table1")
315+
.then()
316+
.statusCode(200)
317+
.extract()
318+
.body()
319+
.asString()
320+
.split("\n");
321+
322+
assertEquals(
323+
deltaTable1Protocol,
324+
objectMapper.reader().readValue(responseBodyLines[0], ProtocolObject.class));
325+
assertEquals(
326+
deltaTable1Metadata,
327+
objectMapper.reader().readValue(responseBodyLines[1], MetadataObject.class));
328+
var files = Arrays.stream(responseBodyLines)
329+
.skip(2)
330+
.map(line -> {
331+
try {
332+
return objectMapper
333+
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
334+
.reader()
335+
.readValue(line, FileObjectWithoutPresignedUrl.class);
336+
} catch (IOException e) {
337+
throw new RuntimeException(e);
338+
}
339+
})
340+
.collect(Collectors.toSet());
341+
assertEquals(7, responseBodyLines.length);
342+
assertEquals(deltaTable1FilesWithoutPresignedUrl, files); // TOD
343+
}
344+
295345
@DisabledOnOs(OS.WINDOWS)
296346
@Test
297347
public void queryTableCurrentVersion() throws IOException {

server/core/src/main/java/io/whitefox/core/PredicateUtils.java

Lines changed: 44 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,24 @@
22

33
import com.fasterxml.jackson.core.JsonProcessingException;
44
import com.fasterxml.jackson.databind.ObjectMapper;
5-
import io.delta.standalone.Snapshot;
65
import io.delta.standalone.actions.AddFile;
76
import io.whitefox.core.types.DataType;
87
import io.whitefox.core.types.predicates.*;
9-
108
import java.util.List;
119
import java.util.Optional;
12-
1310
import net.sf.jsqlparser.JSQLParserException;
1411
import net.sf.jsqlparser.expression.BinaryExpression;
1512
import net.sf.jsqlparser.expression.Expression;
1613
import net.sf.jsqlparser.expression.StringValue;
1714
import net.sf.jsqlparser.expression.operators.relational.IsNullExpression;
1815
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
1916
import org.apache.commons.lang3.tuple.Pair;
17+
import org.apache.log4j.Logger;
2018

2119
public class PredicateUtils {
2220

21+
private static final Logger logger = Logger.getLogger(PredicateUtils.class);
22+
2323
private static final ObjectMapper objectMapper = DeltaObjectMapper.getInstance();
2424

2525
public static BaseOp parseJsonPredicate(String predicate) throws PredicateParsingException {
@@ -30,49 +30,71 @@ public static BaseOp parseJsonPredicate(String predicate) throws PredicateParsin
3030
}
3131
}
3232

33-
public static BaseOp parseSqlPredicate(String predicate, EvalContext ctx, Metadata metadata) throws PredicateException {
33+
public static boolean evaluateJsonPredicate(String predicate, EvalContext ctx, AddFile f) {
34+
try {
35+
var parsedPredicate = PredicateUtils.parseJsonPredicate(predicate);
36+
return parsedPredicate.evalExpectBoolean(ctx);
37+
} catch (PredicateException e) {
38+
logger.debug("Caught exception for predicate: " + predicate + " - " + e.getMessage());
39+
logger.info("File: " + f.getPath()
40+
+ " will be used in processing due to failure in parsing or processing the predicate: "
41+
+ predicate);
42+
return true;
43+
}
44+
}
45+
46+
public static boolean evaluateSqlPredicate(
47+
String predicate, EvalContext ctx, AddFile f, Metadata metadata) {
48+
try {
49+
var parsedPredicate = PredicateUtils.parseSqlPredicate(predicate, ctx, metadata);
50+
return parsedPredicate.evalExpectBoolean(ctx);
51+
} catch (PredicateException e) {
52+
logger.debug("Caught exception for predicate: " + predicate + " - " + e.getMessage());
53+
logger.info("File: " + f.getPath()
54+
+ " will be used in processing due to failure in parsing or processing the predicate: "
55+
+ predicate);
56+
return true;
57+
}
58+
}
59+
60+
public static BaseOp parseSqlPredicate(String predicate, EvalContext ctx, Metadata metadata)
61+
throws PredicateException {
3462
try {
3563
var expression = CCJSqlParserUtil.parseCondExpression(predicate);
36-
if (expression instanceof IsNullExpression){
64+
if (expression instanceof IsNullExpression) {
3765
var isNullExpression = (IsNullExpression) expression;
38-
String column = isNullExpression.getLeftExpression().getASTNode().jjtGetFirstToken().toString();
66+
String column =
67+
isNullExpression.getLeftExpression().getASTNode().jjtGetFirstToken().toString();
3968
var dataType = metadata.tableSchema().structType().get(column).getDataType();
4069
var colOp = new ColumnOp(column, dataType);
4170
var children = List.of((LeafOp) colOp);
4271
var operator = "isnull";
4372
return NonLeafOp.createPartitionFilter(children, operator);
44-
}
45-
else if (expression instanceof BinaryExpression) {
73+
} else if (expression instanceof BinaryExpression) {
4674
BinaryExpression binaryExpression = (BinaryExpression) expression;
4775
String column = binaryExpression.getLeftExpression().toString();
4876
String operator = binaryExpression.getStringExpression();
4977
Expression value = binaryExpression.getRightExpression();
5078
if (value instanceof StringValue) {
5179
StringValue stringValue = (StringValue) value;
52-
var dataType = metadata.tableSchema().structType().get(column).getDataType();
53-
var colOp = new ColumnOp(column, dataType);
54-
var litOp = new LiteralOp(stringValue.getValue(), dataType);
55-
var children = List.of(colOp, litOp);
56-
return NonLeafOp.createPartitionFilter(children, operator);
57-
}
58-
else {
80+
var dataType = metadata.tableSchema().structType().get(column).getDataType();
81+
var colOp = new ColumnOp(column, dataType);
82+
var litOp = new LiteralOp(stringValue.getValue(), dataType);
83+
var children = List.of(colOp, litOp);
84+
return NonLeafOp.createPartitionFilter(children, operator);
85+
} else {
5986
var dataType = metadata.tableSchema().structType().get(column).getDataType();
6087
var colOp = new ColumnOp(column, dataType);
6188
var litOp = new LiteralOp(value.toString(), dataType);
6289
var children = List.of(colOp, litOp);
6390
return NonLeafOp.createPartitionFilter(children, operator);
6491
}
65-
}
66-
else
67-
throw new ExpressionNotSupportedException(predicate);
68-
}
69-
catch (JSQLParserException e) {
92+
} else throw new ExpressionNotSupportedException(predicate);
93+
} catch (JSQLParserException e) {
7094
throw new PredicateParsingException(e);
7195
}
7296
}
7397

74-
75-
7698
public static ColumnRange createColumnRange(String name, EvalContext ctx, DataType valueType)
7799
throws NonExistingColumnException {
78100
var fileStats = ctx.getStatsValues();

server/core/src/main/java/io/whitefox/core/ReadTableRequest.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@ public static class ReadTableVersion implements ReadTableRequest {
1515
private final Long version;
1616

1717
public ReadTableVersion(
18-
List<String> predicateHints, List<String> jsonPredicateHints, Optional<Integer> limitHint, Long version) {
18+
List<String> predicateHints,
19+
List<String> jsonPredicateHints,
20+
Optional<Integer> limitHint,
21+
Long version) {
1922
this.predicateHints = predicateHints;
2023
this.jsonPredicateHints = jsonPredicateHints;
2124
this.limitHint = limitHint;
@@ -74,7 +77,10 @@ public static class ReadTableAsOfTimestamp implements ReadTableRequest {
7477
private final Long timestamp;
7578

7679
public ReadTableAsOfTimestamp(
77-
List<String> predicateHints, List<String> jsonPredicateHints, Optional<Integer> limitHint, Long timestamp) {
80+
List<String> predicateHints,
81+
List<String> jsonPredicateHints,
82+
Optional<Integer> limitHint,
83+
Long timestamp) {
7884
this.predicateHints = predicateHints;
7985
this.jsonPredicateHints = jsonPredicateHints;
8086
this.limitHint = limitHint;
@@ -131,7 +137,8 @@ public static class ReadTableCurrentVersion implements ReadTableRequest {
131137
private final List<String> jsonPredicateHints;
132138
private final Optional<Integer> limitHint;
133139

134-
public ReadTableCurrentVersion(List<String> predicateHints, List<String> jsonPredicateHints, Optional<Integer> limitHint) {
140+
public ReadTableCurrentVersion(
141+
List<String> predicateHints, List<String> jsonPredicateHints, Optional<Integer> limitHint) {
135142
this.predicateHints = predicateHints;
136143
this.jsonPredicateHints = jsonPredicateHints;
137144
this.limitHint = limitHint;

server/core/src/main/java/io/whitefox/core/services/DeltaSharedTable.java

Lines changed: 13 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package io.whitefox.core.services;
22

3+
import static io.whitefox.core.PredicateUtils.evaluateJsonPredicate;
4+
import static io.whitefox.core.PredicateUtils.evaluateSqlPredicate;
5+
36
import io.delta.standalone.DeltaLog;
47
import io.delta.standalone.Snapshot;
58
import io.delta.standalone.actions.AddFile;
69
import io.whitefox.core.*;
710
import io.whitefox.core.Metadata;
811
import io.whitefox.core.TableSchema;
9-
import io.whitefox.core.types.predicates.EvalContext;
1012
import io.whitefox.core.types.predicates.PredicateException;
1113
import java.sql.Timestamp;
1214
import java.time.OffsetDateTime;
@@ -86,33 +88,8 @@ public Optional<Long> getTableVersion(Optional<String> startingTimestamp) {
8688
return getSnapshot(startingTimestamp).map(Snapshot::getVersion);
8789
}
8890

89-
private boolean evaluateJsonPredicate(String predicate, EvalContext ctx, AddFile f) {
90-
try {
91-
var parsedPredicate = PredicateUtils.parseJsonPredicate(predicate);
92-
return parsedPredicate.evalExpectBoolean(ctx);
93-
} catch (PredicateException e) {
94-
logger.debug("Caught exception for predicate: " + predicate + " - " + e.getMessage());
95-
logger.info("File: " + f.getPath()
96-
+ " will be used in processing due to failure in parsing or processing the predicate: "
97-
+ predicate);
98-
return true;
99-
}
100-
}
101-
102-
private boolean evaluateSqlPredicate(String predicate, EvalContext ctx, AddFile f, Metadata metadata) {
103-
try {
104-
var parsedPredicate = PredicateUtils.parseSqlPredicate(predicate, ctx, metadata);
105-
return parsedPredicate.evalExpectBoolean(ctx);
106-
} catch (PredicateException e) {
107-
logger.debug("Caught exception for predicate: " + predicate + " - " + e.getMessage());
108-
logger.info("File: " + f.getPath()
109-
+ " will be used in processing due to failure in parsing or processing the predicate: "
110-
+ predicate);
111-
return true;
112-
}
113-
}
114-
115-
public boolean filterFilesBasedOnSqlPredicates(List<String> predicates, AddFile f, Metadata metadata) {
91+
public boolean filterFilesBasedOnSqlPredicates(
92+
List<String> predicates, AddFile f, Metadata metadata) {
11693
// if there are no predicates return all possible files
11794
if (predicates == null) {
11895
return true;
@@ -123,7 +100,7 @@ public boolean filterFilesBasedOnSqlPredicates(List<String> predicates, AddFile
123100
} catch (PredicateException e) {
124101
logger.debug("Caught exception: " + e.getMessage());
125102
logger.info("File: " + f.getPath()
126-
+ " will be used in processing due to failure in parsing or processing the predicate");
103+
+ " will be used in processing due to failure in parsing or processing the predicate");
127104
return true;
128105
}
129106
}
@@ -150,12 +127,15 @@ public ReadTableResultToBeSigned queryTable(ReadTableRequest readTableRequest) {
150127
Snapshot snapshot;
151128
if (readTableRequest instanceof ReadTableRequest.ReadTableCurrentVersion) {
152129
snapshot = deltaLog.snapshot();
153-
predicates = ((ReadTableRequest.ReadTableCurrentVersion) readTableRequest).jsonPredicateHints();
154-
sqlPredicates = ((ReadTableRequest.ReadTableCurrentVersion) readTableRequest).predicateHints();
130+
predicates =
131+
((ReadTableRequest.ReadTableCurrentVersion) readTableRequest).jsonPredicateHints();
132+
sqlPredicates =
133+
((ReadTableRequest.ReadTableCurrentVersion) readTableRequest).predicateHints();
155134
} else if (readTableRequest instanceof ReadTableRequest.ReadTableAsOfTimestamp) {
156135
snapshot = deltaLog.getSnapshotForTimestampAsOf(
157136
((ReadTableRequest.ReadTableAsOfTimestamp) readTableRequest).timestamp());
158-
predicates = ((ReadTableRequest.ReadTableAsOfTimestamp) readTableRequest).jsonPredicateHints();
137+
predicates =
138+
((ReadTableRequest.ReadTableAsOfTimestamp) readTableRequest).jsonPredicateHints();
159139
sqlPredicates = ((ReadTableRequest.ReadTableAsOfTimestamp) readTableRequest).predicateHints();
160140
} else if (readTableRequest instanceof ReadTableRequest.ReadTableVersion) {
161141
snapshot = deltaLog.getSnapshotForVersionAsOf(
@@ -168,7 +148,7 @@ public ReadTableResultToBeSigned queryTable(ReadTableRequest readTableRequest) {
168148
var metadata = metadataFromSnapshot(snapshot);
169149
return new ReadTableResultToBeSigned(
170150
new Protocol(Optional.of(1)),
171-
metadata,
151+
metadata,
172152
snapshot.getAllFiles().stream()
173153
.filter(f -> filterFilesBasedOnJsonPredicates(predicates, f))
174154
.filter(f -> filterFilesBasedOnSqlPredicates(sqlPredicates, f, metadata))

0 commit comments

Comments
 (0)