diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index f7e3c7c082..3c077cdfc1 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -2254,6 +2254,7 @@ def data_file_statistics_from_parquet_metadata( parquet_metadata: pq.FileMetaData, stats_columns: Dict[int, StatisticsCollector], parquet_column_mapping: Dict[str, int], + check_schema: bool = True, ) -> DataFileStatistics: """ Compute and return DataFileStatistics that includes the following. @@ -2299,7 +2300,15 @@ def data_file_statistics_from_parquet_metadata( for pos in range(parquet_metadata.num_columns): column = row_group.column(pos) - field_id = parquet_column_mapping[column.path_in_schema] + try: + field_id = parquet_column_mapping[column.path_in_schema] + except KeyError: + if check_schema: + raise + else: + logger.warning("PyArrow column %d missing in current schema", pos) + continue + stats_col = stats_columns[field_id] @@ -2464,7 +2473,7 @@ def _check_pyarrow_schema_compatible( _check_schema_compatible(requested_schema, provided_schema) -def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_paths: Iterator[str]) -> Iterator[DataFile]: +def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_paths: Iterator[str], check_schema: bool = True, partition_deductor: Callable[[str], Record] | None = None) -> Iterator[DataFile]: for file_path in file_paths: input_file = io.new_input(file_path) with input_file.open() as input_stream: @@ -2475,18 +2484,25 @@ def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_ f"Cannot add file {file_path} because it has field IDs. `add_files` only supports addition of files without field_ids" ) schema = table_metadata.schema() - _check_pyarrow_schema_compatible(schema, parquet_metadata.schema.to_arrow_schema()) + if check_schema: + _check_pyarrow_schema_compatible(schema, parquet_metadata.schema.to_arrow_schema()) statistics = data_file_statistics_from_parquet_metadata( parquet_metadata=parquet_metadata, stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), + check_schema=check_schema, ) + if partition_deductor is None: + partition = statistics.partition(table_metadata.spec(), table_metadata.schema()) + else: + partition = partition_deductor(file_path) + data_file = DataFile( content=DataFileContent.DATA, file_path=file_path, file_format=FileFormat.PARQUET, - partition=statistics.partition(table_metadata.spec(), table_metadata.schema()), + partition=partition, file_size_in_bytes=len(input_file), sort_order_id=None, spec_id=table_metadata.default_spec_id, diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index ee50a8b5bf..a80d8589be 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -686,7 +686,7 @@ def delete( warnings.warn("Delete operation did not match any records") def add_files( - self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True + self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True, check_schema: bool = True, partition_deductor: Callable[[str], Record] | None = None, ) -> None: """ Shorthand API for adding files as data files to the table transaction. @@ -717,7 +717,7 @@ def add_files( ) with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot: data_files = _parquet_files_to_data_files( - table_metadata=self.table_metadata, file_paths=file_paths, io=self._table.io + table_metadata=self.table_metadata, file_paths=file_paths, io=self._table.io, check_schema = check_schema, partition_deductor = partition_deductor ) for data_file in data_files: update_snapshot.append_data_file(data_file) @@ -1283,7 +1283,7 @@ def delete( tx.delete(delete_filter=delete_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties) def add_files( - self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True + self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True, check_schema: bool = True, partition_deductor: Callable[[str], Record] | None = None, ) -> None: """ Shorthand API for adding files as data files to the table. @@ -1296,8 +1296,7 @@ def add_files( """ with self.transaction() as tx: tx.add_files( - file_paths=file_paths, snapshot_properties=snapshot_properties, check_duplicate_files=check_duplicate_files - ) + file_paths=file_paths, snapshot_properties=snapshot_properties, check_duplicate_files=check_duplicate_files, check_schema=check_schema, partition_deductor=partition_deductor) def update_spec(self, case_sensitive: bool = True) -> UpdateSpec: return UpdateSpec(Transaction(self, autocommit=True), case_sensitive=case_sensitive) @@ -1883,7 +1882,7 @@ class AddFileTask: partition_field_value: Record -def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List[str], io: FileIO) -> Iterable[DataFile]: +def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List[str], io: FileIO, check_schema: bool = True, partition_deductor: Callable[[str], Record] | None = None) -> Iterable[DataFile]: """Convert a list files into DataFiles. Returns: @@ -1891,4 +1890,4 @@ def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List """ from pyiceberg.io.pyarrow import parquet_files_to_data_files - yield from parquet_files_to_data_files(io=io, table_metadata=table_metadata, file_paths=iter(file_paths)) + yield from parquet_files_to_data_files(io=io, table_metadata=table_metadata, file_paths=iter(file_paths), check_schema=check_schema, partition_deductor=partition_deductor)