Skip to content

Added logic to include partition metadata for no-copy #51

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 130 additions & 2 deletions ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ private static List<DataFile> processFile(
throw new BadRequestException(
file + " cannot be added to catalog without copy"); // TODO: explain
}

long dataFileSizeInBytes;

var start = System.currentTimeMillis();
Expand All @@ -419,6 +420,14 @@ private static List<DataFile> processFile(
return Collections.emptyList();
}
dataFileSizeInBytes = inputFile.getLength();
// check if partition spec is defined.
if (tableSpec.isPartitioned()) {
// For partitioned tables with no-copy, we need to create DataFile objects
// that reference the original file with the table's partition spec
MetricsConfig metricsConfig = MetricsConfig.forTable(table);
Metrics metrics = ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsConfig);
return createNoCopyPartitionedDataFiles(dataFile, tableSpec, inputFile, metrics);
}
} else if (options.s3CopyObject()) {
if (!dataFile.startsWith("s3://") || !table.location().startsWith("s3://")) {
throw new BadRequestException("--s3-copy-object is only supported between s3:// buckets");
Expand Down Expand Up @@ -486,13 +495,132 @@ private static List<DataFile> processFile(
DataFile dataFileObj =
new DataFiles.Builder(tableSpec)
.withPath(dataFile)
.withFormat("PARQUET")
.withFormat(FileFormat.PARQUET)
.withFileSizeInBytes(dataFileSizeInBytes)
.withMetrics(metrics)
.build();
return Collections.singletonList(dataFileObj);
}

/**
* Creates DataFile objects for no-copy scenarios with partitioning. This function extracts
* partition keys from the InputFile and creates DataFile objects with partition metadata.
*/
private static List<DataFile> createNoCopyPartitionedDataFiles(
String file, PartitionSpec tableSpec, InputFile inputFile, Metrics metrics) {

List<DataFile> dataFiles = new ArrayList<>();

try {
// Extract partition keys from the InputFile
PartitionKey partitionKey = extractPartitionKeyFromFile(inputFile, tableSpec);

// Validate that we have a valid partition key
if (partitionKey != null && tableSpec.fields().size() > 0) {
DataFile dataFile =
DataFiles.builder(tableSpec)
.withPartition(partitionKey)
.withPath(file)
.withFormat(FileFormat.PARQUET)
.withFileSizeInBytes(inputFile.getLength())
.withMetrics(metrics)
.build();

dataFiles.add(dataFile);
logger.info("{}: created partitioned data file with partition key: {}", file, partitionKey);
} else {
throw new IOException("Invalid partition key extracted from file");
}

} catch (Exception e) {
logger.warn(
"{}: could not extract partition key from file, creating unpartitioned data file",
file,
e);

// Fallback to unpartitioned data file
DataFile dataFile =
DataFiles.builder(tableSpec)
.withPath(file)
.withFormat(FileFormat.PARQUET)
.withFileSizeInBytes(inputFile.getLength())
.withMetrics(metrics)
.build();

dataFiles.add(dataFile);
}

return dataFiles;
}

/** Extracts partition key from InputFile by reading a sample of records. */
private static PartitionKey extractPartitionKeyFromFile(
InputFile inputFile, PartitionSpec tableSpec) throws IOException {
// Read the first record to extract partition values
Parquet.ReadBuilder readBuilder =
Parquet.read(inputFile)
.createReaderFunc(s -> GenericParquetReaders.buildReader(tableSpec.schema(), s))
.project(tableSpec.schema());

try (CloseableIterable<Record> records = readBuilder.build()) {
for (Record record : records) {
// Create partition key from the first record
PartitionKey partitionKey = new PartitionKey(tableSpec, tableSpec.schema());

logger.debug(
"Extracting partition key from record with {} partition fields",
tableSpec.fields().size());

for (PartitionField field : tableSpec.fields()) {
try {
String fieldName = tableSpec.schema().findField(field.sourceId()).name();
Object value = record.getField(fieldName);

logger.debug(
"Field: {}, SourceId: {}, Value: {}", field.name(), field.sourceId(), value);

if (value != null) {
// Convert value based on partition transform
Object convertedValue = convertValueForPartition(value, field);
partitionKey.set(field.sourceId(), convertedValue);
logger.debug("Set partition value for field {}: {}", field.name(), convertedValue);
}
} catch (Exception e) {
logger.warn(
"Could not extract partition value for field {}: {}", field.name(), e.getMessage());
// Continue with other fields instead of failing completely
}
}

return partitionKey;
}
}

throw new IOException("No records found in file to extract partition key");
}

/** Converts a field value to the appropriate type for partition key based on transform. */
private static Object convertValueForPartition(Object value, PartitionField field) {
String transformName = field.transform().toString();

switch (transformName) {
case "identity":
return value;
case "day":
case "month":
case "hour":
// For time-based partitions, convert to micros
try {
return toPartitionMicros(value);
} catch (Exception e) {
logger.warn("Could not convert value '{}' for field '{}' to micros", value, field.name());
return value;
}
default:
return value;
}
}

private static List<DataFile> copyParquetWithPartition(
String file,
Schema tableSchema,
Expand Down Expand Up @@ -680,7 +808,7 @@ private static DataFile copyParquetWithSortOrder(

return new DataFiles.Builder(tableSpec)
.withPath(dstDataFile)
.withFormat("PARQUET")
.withFormat(FileFormat.PARQUET)
.withFileSizeInBytes(fileSizeInBytes)
.withMetrics(metrics)
.build();
Expand Down