Skip to content

Manual deduction of partitions #1743

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

afiodorov
Copy link

@afiodorov afiodorov commented Feb 28, 2025

I want to

a) be able to add files that a partitioned by the filename convention, e.g. s3://bucket/table/year=2025/month=12
b) add files even if they have extra columns without having to migrate the table

This comes from a common pattern of having existing hive tables and the need to migrate them to iceberg.

I propose we can achieve this by doing

pattern = re.compile(r"([^/]+)=([^/]+)")

def deduct_partition(path: str) -> Record:
    return Record(**dict(pattern.findall(path))

table.add_files(['s3://bucket/table/year=2025/month=12/file.parquet'], check_schema=False, partition_deductor=deduct_partition)

* Sometimes partition values have to be deduced from Hive s3 file path
* Allowing for adding parquet files with additional columns that are not
  in the glue schema
@kevinjqliu
Copy link
Contributor

Hi @afiodorov thanks for the PR!

For adding hive partitioned files to Iceberg, there's a specific way we can do so using column projections, https://iceberg.apache.org/spec/#column-projection. We've implemented the read side in #1443. We'd want to implement the write side as well by overridding the partition field in data_file object in the manifest.

I think we need to define an API that does not involve regex. The example above is confusing as a user

pattern = re.compile(r"([^/]+)=([^/]+)")

def deduct_partition(path: str) -> Record:
    return Record(**dict(pattern.findall(path))

table.add_files(['s3://bucket/table/year=2025/month=12/file.parquet'], check_schema=False, partition_deductor=deduct_partition)

IMO the add_files API should not infer the hive-style partition scheme. Perhaps we can create a different API to "migrate" hive-style partition scheme to an Iceberg table. As long as we can create the proper DataFile object with the right partition field set, we can register these to the Iceberg Table.

pyarrow has a helper function pyarrow.dataset.HivePartitioning which can parse the hive-style partition scheme without relying in regex

Please let me know what you think!

@afiodorov
Copy link
Author

afiodorov commented Mar 2, 2025

Hey! The regex was just an example, it's not part of the proposed API though - the partition deduction function is. Thanks for showing me the HivePartioning function though - makes sense to switch to that when possible.

The issue I am having at a work is that our pipelines keep writing hive-partitioned parquet files to s3 and we have 100s of tables and many of the pipelines that I don't maintain. However we need a quick conversion (and upkeep) of those tables to iceberg.

add_files is almost what we need for both initial migration and the subsequent upkeep however it assumes the partition columns are written in the parquet files - which isn't the case. We don't want to rewrite all parquet files nor touch pipelines at all, we just need a quick hassle-free inplace migration of as many hive-partitioned tables to iceberg as possible. Current idea is to just add the add_files step to our scheduler to register new partitions after regular runs.

If you add another API that deals with DataFile directly as opposed to add_files that assumes certain things that'd work for us. Until then I might just have to deploy my fork to solve our problem.

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @afiodorov, I think there is room to make this more flexible, however, I left some concerns in the comments.

Comment on lines +2487 to +2488
if check_schema:
_check_pyarrow_schema_compatible(schema, parquet_metadata.schema.to_arrow_schema())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At Iceberg, we're pretty concerned at making sure that everything is compatible at write time. Instead, we could also change the _check_pyarrow_schema_compatible to allow for additional columns in the Parquet column.

It is okay to skip optional columns but not required ones.

if partition_deductor is None:
partition = statistics.partition(table_metadata.spec(), table_metadata.schema())
else:
partition = partition_deductor(file_path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While you can add keys to the Record, it is looked up by position, based on the Schema that belongs to it (in this case, the one of the active PartitionSpec.

Fokko added a commit to Fokko/iceberg-python that referenced this pull request Mar 5, 2025
This aligns the implementation with Java.

We had the keywords there mostly for the tests, but they should
not be used, and it seems like that's already the case :'(
I was undecided if the costs of this PR (all the changes), are worth
it, but I see more PRs using the Record in a bad way (example apache#1743)
that might lead to very subtle bugs where the position might sometime
change based on the ordering of the dict.
Fokko added a commit that referenced this pull request Apr 22, 2025
This aligns the implementation with Java.

We had the keywords there mostly for the tests, but they should not be
used, and it seems like that's already the case :'( I was undecided if
the costs of this PR (all the changes), are worth it, but I see more PRs
using the Record in a bad way (example
#1743) that might lead to
very subtle bugs where the position might sometime change based on the
ordering of the dict.

Blocked by Eventual-Inc/Daft#3917
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants