Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.trino.plugin.iceberg.procedure.IcebergTableExecuteHandle;
import io.trino.plugin.iceberg.procedure.IcebergTableProcedureId;
import io.trino.plugin.iceberg.util.DataFileWithDeleteFiles;
import io.trino.spi.ErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.connector.Assignment;
Expand Down Expand Up @@ -374,7 +375,9 @@ public ConnectorTableHandle getTableHandle(
return null;
}
catch (TrinoException e) {
if (e.getErrorCode().equals(ICEBERG_MISSING_METADATA.toErrorCode())) {
ErrorCode errorCode = e.getErrorCode();
if (errorCode.equals(ICEBERG_MISSING_METADATA.toErrorCode())
|| errorCode.equals(ICEBERG_INVALID_METADATA.toErrorCode())) {
return new CorruptedIcebergTableHandle(tableName, e);
}
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.io.OutputFile;
Expand All @@ -45,6 +46,7 @@
import static io.trino.plugin.hive.util.HiveClassNames.FILE_INPUT_FORMAT_CLASS;
import static io.trino.plugin.hive.util.HiveClassNames.FILE_OUTPUT_FORMAT_CLASS;
import static io.trino.plugin.hive.util.HiveClassNames.LAZY_SIMPLE_SERDE_CLASS;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_MISSING_METADATA;
import static io.trino.plugin.iceberg.IcebergUtil.METADATA_FOLDER_NAME;
import static io.trino.plugin.iceberg.IcebergUtil.fixBrokenMetadataLocation;
Expand Down Expand Up @@ -235,14 +237,17 @@ protected void refreshFromMetadataLocation(String newLocation)
.withMaxRetries(20)
.withBackoff(100, 5000, MILLIS, 4.0)
.withMaxDuration(Duration.ofMinutes(10))
.abortOn(AbstractIcebergTableOperations::isNotFoundException)
.abortOn(failure -> failure instanceof ValidationException || isNotFoundException(failure))
.build())
.get(() -> TableMetadataParser.read(fileIo, io().newInputFile(newLocation)));
}
catch (Throwable failure) {
if (isNotFoundException(failure)) {
throw new TrinoException(ICEBERG_MISSING_METADATA, "Metadata not found in metadata location for table " + getSchemaTableName(), failure);
}
if (failure instanceof ValidationException) {
throw new TrinoException(ICEBERG_INVALID_METADATA, "Invalid metadata file for table " + getSchemaTableName(), failure);
}
throw failure;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
*/
package io.trino.plugin.iceberg;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.units.DataSize;
Expand Down Expand Up @@ -58,6 +62,7 @@
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.util.JsonUtil;
import org.intellij.lang.annotations.Language;
import org.testng.SkipException;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -135,6 +140,7 @@
import static io.trino.transaction.TransactionBuilder.transaction;
import static java.lang.String.format;
import static java.lang.String.join;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.time.ZoneOffset.UTC;
import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME;
import static java.util.Collections.nCopies;
Expand Down Expand Up @@ -6829,6 +6835,35 @@ public void testDropCorruptedTableWithHiveRedirection()
assertFalse(fileSystem.listFiles(tableLocation).hasNext(), "Table location should not exist");
}

@Test(timeOut = 10_000)
public void testNoRetryWhenMetadataFileInvalid()
throws Exception
{
String tableName = "test_no_retry_when_metadata_file_invalid_" + randomNameSuffix();

assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 id", 1);

String tableLocation = getTableLocation(tableName);
String metadataFileLocation = getLatestMetadataLocation(fileSystem, tableLocation);

ObjectMapper mapper = JsonUtil.mapper();
JsonNode jsonNode = mapper.readValue(fileSystem.newInputFile(Location.of(metadataFileLocation)).newStream(), JsonNode.class);
ArrayNode fieldsNode = (ArrayNode) jsonNode.get("schemas").get(0).get("fields");
ObjectNode newFieldNode = fieldsNode.get(0).deepCopy();
// Add duplicate field to produce validation error while reading the metadata file
fieldsNode.add(newFieldNode);

String modifiedJson = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(jsonNode);
try (OutputStream outputStream = fileSystem.newOutputFile(Location.of(metadataFileLocation)).createOrOverwrite()) {
// Corrupt metadata file by overwriting the invalid metadata content
outputStream.write(modifiedJson.getBytes(UTF_8));
}
assertThatThrownBy(() -> query("SELECT * FROM " + tableName))
.hasMessage("Invalid metadata file for table tpch.%s".formatted(tableName));

assertUpdate("DROP TABLE " + tableName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// try to drop table
assertUpdate("DROP TABLE " + tableName);
assertFalse(getQueryRunner().tableExists(getSession(), tableName));
assertFalse(fileSystem.listFiles(tableLocation).hasNext(), "Table location should not exist");

Do you happen to remember why we've added the test for the situation on the smoke test and not on BaseIcebergConnectorTest ?
I'm not necessarily requesting the checks to be done here as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests are also added in BaseIcebergConnectorTest.

We wanted to verify the corrupted table against all catalogs and smoke test runs against all catalogs so added smoke test.

Copy link
Contributor Author

@krvikash krvikash Jun 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more reason to add smoke connector test is to verify against the different file systems.

}

@Override
protected void verifyTableNameLengthFailurePermissible(Throwable e)
{
Expand Down