diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java index 9e88bdc..497747a 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java @@ -410,6 +410,7 @@ private static List processFile( throw new BadRequestException( file + " cannot be added to catalog without copy"); // TODO: explain } + long dataFileSizeInBytes; var start = System.currentTimeMillis(); @@ -419,6 +420,14 @@ private static List processFile( return Collections.emptyList(); } dataFileSizeInBytes = inputFile.getLength(); + // check if partition spec is defined. + if (tableSpec.isPartitioned()) { + // For partitioned tables with no-copy, we need to create DataFile objects + // that reference the original file with the table's partition spec + MetricsConfig metricsConfig = MetricsConfig.forTable(table); + Metrics metrics = ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsConfig); + return createNoCopyPartitionedDataFiles(dataFile, tableSpec, inputFile, metrics); + } } else if (options.s3CopyObject()) { if (!dataFile.startsWith("s3://") || !table.location().startsWith("s3://")) { throw new BadRequestException("--s3-copy-object is only supported between s3:// buckets"); @@ -486,13 +495,132 @@ private static List processFile( DataFile dataFileObj = new DataFiles.Builder(tableSpec) .withPath(dataFile) - .withFormat("PARQUET") + .withFormat(FileFormat.PARQUET) .withFileSizeInBytes(dataFileSizeInBytes) .withMetrics(metrics) .build(); return Collections.singletonList(dataFileObj); } + /** + * Creates DataFile objects for no-copy scenarios with partitioning. This function extracts + * partition keys from the InputFile and creates DataFile objects with partition metadata. + */ + private static List createNoCopyPartitionedDataFiles( + String file, PartitionSpec tableSpec, InputFile inputFile, Metrics metrics) { + + List dataFiles = new ArrayList<>(); + + try { + // Extract partition keys from the InputFile + PartitionKey partitionKey = extractPartitionKeyFromFile(inputFile, tableSpec); + + // Validate that we have a valid partition key + if (partitionKey != null && tableSpec.fields().size() > 0) { + DataFile dataFile = + DataFiles.builder(tableSpec) + .withPartition(partitionKey) + .withPath(file) + .withFormat(FileFormat.PARQUET) + .withFileSizeInBytes(inputFile.getLength()) + .withMetrics(metrics) + .build(); + + dataFiles.add(dataFile); + logger.info("{}: created partitioned data file with partition key: {}", file, partitionKey); + } else { + throw new IOException("Invalid partition key extracted from file"); + } + + } catch (Exception e) { + logger.warn( + "{}: could not extract partition key from file, creating unpartitioned data file", + file, + e); + + // Fallback to unpartitioned data file + DataFile dataFile = + DataFiles.builder(tableSpec) + .withPath(file) + .withFormat(FileFormat.PARQUET) + .withFileSizeInBytes(inputFile.getLength()) + .withMetrics(metrics) + .build(); + + dataFiles.add(dataFile); + } + + return dataFiles; + } + + /** Extracts partition key from InputFile by reading a sample of records. */ + private static PartitionKey extractPartitionKeyFromFile( + InputFile inputFile, PartitionSpec tableSpec) throws IOException { + // Read the first record to extract partition values + Parquet.ReadBuilder readBuilder = + Parquet.read(inputFile) + .createReaderFunc(s -> GenericParquetReaders.buildReader(tableSpec.schema(), s)) + .project(tableSpec.schema()); + + try (CloseableIterable records = readBuilder.build()) { + for (Record record : records) { + // Create partition key from the first record + PartitionKey partitionKey = new PartitionKey(tableSpec, tableSpec.schema()); + + logger.debug( + "Extracting partition key from record with {} partition fields", + tableSpec.fields().size()); + + for (PartitionField field : tableSpec.fields()) { + try { + String fieldName = tableSpec.schema().findField(field.sourceId()).name(); + Object value = record.getField(fieldName); + + logger.debug( + "Field: {}, SourceId: {}, Value: {}", field.name(), field.sourceId(), value); + + if (value != null) { + // Convert value based on partition transform + Object convertedValue = convertValueForPartition(value, field); + partitionKey.set(field.sourceId(), convertedValue); + logger.debug("Set partition value for field {}: {}", field.name(), convertedValue); + } + } catch (Exception e) { + logger.warn( + "Could not extract partition value for field {}: {}", field.name(), e.getMessage()); + // Continue with other fields instead of failing completely + } + } + + return partitionKey; + } + } + + throw new IOException("No records found in file to extract partition key"); + } + + /** Converts a field value to the appropriate type for partition key based on transform. */ + private static Object convertValueForPartition(Object value, PartitionField field) { + String transformName = field.transform().toString(); + + switch (transformName) { + case "identity": + return value; + case "day": + case "month": + case "hour": + // For time-based partitions, convert to micros + try { + return toPartitionMicros(value); + } catch (Exception e) { + logger.warn("Could not convert value '{}' for field '{}' to micros", value, field.name()); + return value; + } + default: + return value; + } + } + private static List copyParquetWithPartition( String file, Schema tableSchema, @@ -680,7 +808,7 @@ private static DataFile copyParquetWithSortOrder( return new DataFiles.Builder(tableSpec) .withPath(dstDataFile) - .withFormat("PARQUET") + .withFormat(FileFormat.PARQUET) .withFileSizeInBytes(fileSizeInBytes) .withMetrics(metrics) .build();