Skip to content

Conversation

jqin61
Copy link
Owner

@jqin61 jqin61 commented Mar 20, 2024

Section 1: Background
When we are doing a static overwrite, we could choose to overwrite the full table or overwrite some partitions of the table.

Example spark sql counterpart in iceberg spark static overwrite for full table overwrite is

INSERT OVERWRITE prod.my_app.logs
SELECT uuid, first(level), first(ts), first(message)
FROM prod.my_app.logs
WHERE level = 'INFO'
GROUP BY uuid

And example spark sql counterpart for specified partition overwrite is

INSERT OVERWRITE prod.my_app.logs
SELECT uuid, first(level), first(ts), first(message)
PARTITION (level = 'INFO')
FROM prod.my_app.logs
WHERE level = 'INFO'
GROUP BY uuid

Section 2: Goal of the Pull Request
When we overwrite the table, we could provide an expression as overwrite_filter in accord with these 2 cases. The filter expression should conform to certain rules such as it has to be on a partition column that does not use hidden partitioning and the fields in the filter have to be in accord with the input arrow table in a certain way so that the new partition has the partition value auto-filled in accord with the removed partition. This pull request aims at logics of:

  1. Validate the filter and the filter-arrowtable relationship against the iceberg table schema.
  2. Populate the arrow table for the partition column with specified partition value in the filter.

Section 3: Rules and Test Cases

  1. Rule : The expression could only use IsNull or EqualTo as building blocks and concatenated by And
    Tests : test__validate_static_overwrite_filter_expr_type parametrize 1-8

  2. Rule : The building block predicates (IsNull and EqualTo) should not have conflicting values.
    Tests : test__validate_static_overwrite_filter_expr_type parametrize 9-11

  3. Rule : The terms (fields) should refer to existing fields in the iceberg schema, and also the literal in the predicate (if any) should match the iceberg field type. These mean the expression could be bound with table schema successfully.
    Tests :
    test__bind_and_validate_static_overwrite_filter_predicate_fails_on_non_schema_fields_in_filter
    test__bind_and_validate_static_overwrite_filter_predicate_fails_to_bind_due_to_incompatible_predicate_value

  4. Rule : If expression specifies a field which is required in iceberg schema, it should not be isnull in the expression.
    Tests : test__bind_and_validate_static_overwrite_filter_predicate_fails_to_bind_due_to_non_nullable

  5. Rule : The fields in the expression should be within partition columns
    Tests : test__bind_and_validate_static_overwrite_filter_predicate_fails_on_non_part_fields_in_filter

  6. Rule : The iceberg table fields specified in the expression could not have hidden partitioning, however, the non-specified fields could.
    Tests :
    test__bind_and_validate_static_overwrite_filter_predicate_fails_on_non_identity_transorm_filter
    test__bind_and_validate_static_overwrite_filter_predicate_succeeds_on_an_identity_transform_field_although_table_has_other_hidden_partition_fields

  7. Rule : Three-way relationship between filter, arrow table and iceberg table schema
    The fields in filter should not appear in the input arrow table. However when we remove these fields from iceberg schema, the remaining schema should match the arrow table schema precisely in terms of field names, nullability and type.
    Three-set Venn diagram - Color (5)

    Tests :
    case 2: test__check_schema_with_filter_fails_on_missing_field
    case 3: test__check_schema_with_filter_fails_due_to_filter_and_dataframe_holding_same_field
    case 4: test__check_schema_with_filter_fails_on_nullability_mismatch
    case 4: test__check_schema_with_filter_fails_on_type_mismatch
    case 5: test__check_schema_with_filter_succeed
    case 5: test__check_schema_with_filter_succeed_on_pyarrow_table_with_random_column_order for this change https://github.com/jqin61/iceberg-python/pull/4/files#diff-8d5e63f2a87ead8cebe2fd8ac5dcf2198d229f01e16bb9e06e21f7277c328abdR1739

Section 4: Flow and Scedo Code Logics

  1. Entire Flow:
    Static Overwrite Filter Validation Flow

  2. Rule 7 Elaboration:
    Flowchart

Section 5: Rule Necessity Justification - Spark Counterparts
To better understand these rules, let us provide spark static overwrite crash counterparts. For which, we have following set up:

# Create Spark Dataframe
from pyspark.sql.types import StructType, StructField, StringType, LongType
data_multicols = [(2, "Flamingo", "red"), (4, "Horse", "white"), (4, "Pig", "pink")]
schema = StructType([
    StructField("n_legs", LongType(), nullable=True),
    StructField("animals", StringType(), nullable=True),
    StructField("color", StringType(), nullable=True)  # Mark as non-nullable
])
df_multicols = spark.createDataFrame(data_multicols, schema)

# Create Iceberg Table
create_sql = """CREATE TABLE lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols ( 
    n_legs bigint, 
    animals string,
    color string) 
USING iceberg
PARTITIONED BY (n_legs, color)

"""
spark.sql(create_sql)

# Insert Initial data
df_multicols.createOrReplaceTempView("tmp_view")
sql_cmd = f"""INSERT INTO
    lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols
    SELECT * FROM  tmp_view
    """
spark.sql(sql_cmd)

this gives such table schema:

col_name data_type comment
n_legs bigint
animals string
color string
Partitioning
Part 0 n_legs
Part 1 color

with such data:

n_legs animals color
2 Flamingo red
4 Horse white
4 Pig pink

Now let us check the rules
Rule 1. The expression could only use IsNull or EqualTo as building blocks and concatenated by And.
For example:

And(EqualTo(Reference("foo"), "hello"), And(IsNull(Reference("baz")), EqualTo(Reference("boo"), "hello")))

or
"foo = 'hello' AND (baz IS NULL AND boo = 'hello')

Spark counterpart example:

sql_cmd = f"""INSERT OVERWRITE
    lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols
    PARTITION (n_legs > 2)
    SELECT color,animals FROM  tmp_view
    """
spark.sql(sql_cmd)

gives:

mismatched input '>' expecting {')', ','}(line 3, pos 22)

== SQL ==
INSERT OVERWRITE
    lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols
    PARTITION (n_legs > 2)
----------------------^^^
    SELECT color,animals FROM  tmp_view

Other predicates of 'in', '!=', etc and other expression such as 'Or' give similar errors.

**Rule 2. The building block predicates (IsNull and EqualTo) should not have conflicting values. **
This means

And(EqualTo(Reference("foo"), "hello"), EqualTo(Reference("foo"), "bye"))

and

And(EqualTo(Reference("foo"), "hello"), IsNull(Reference("foo"))

are not allowed.

However,

And(EqualTo(Reference("foo"), "hello"), EqualTo(Reference("foo"), "hello"))

is allowed and shall be deduplicated.

Spark counterpart example:

sql_cmd = f"""INSERT OVERWRITE
    lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols
    PARTITION (color='red', color='green')
    SELECT animals,n_legs FROM  tmp_view
    """
spark.sql(sql_cmd)

gives

ParseException: 
Found duplicate keys 'color'.(line 3, pos 4)

== SQL ==
INSERT OVERWRITE
    lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols
    PARTITION (color='red', color='green')
----^^^
    SELECT animals,n_legs FROM  tmp_view

Rule 3. The terms (fields) should refer to existing fields in the iceberg schema, and also the literal in the predicate (if any) should match the iceberg field type. These mean the expression could be bound with table schema successfully.

Spark counterpart example:

sql_cmd = f"""INSERT OVERWRITE
    lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols
    PARTITION (not_a_field='red')
    SELECT animals,n_legs FROM  tmp_view
    """
spark.sql(sql_cmd)

gives:

AnalysisException: PARTITION clause cannot contain a non-partition column name: not_a_field

Rule 4. If expression specifies a field which is required in iceberg schema, it should not be isnull in the expression.

Spark counterpart example:

# Create Spark Dataframe with non-nullable column
from pyspark.sql.types import StructType, StructField, StringType, LongType
data_multicols = [(2, "Flamingo", "red"), (4, "Horse", "white"), (4, "Pig", "pink")]
schema = StructType([
    StructField("n_legs", LongType(), nullable=True),
    StructField("animals", StringType(), nullable=True),
    StructField("color", StringType(), nullable=False)  # Mark as non-nullable
])
df_multicols = spark.createDataFrame(data_multicols, schema)

# Create Iceberg Table with non-nullable column
create_sql = """CREATE TABLE lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols ( 
    n_legs bigint, 
    animals string,
    color string not NULL) 
USING iceberg
PARTITIONED BY (n_legs, color)

"""
spark.sql(create_sql)

# Insert Initial data
df_multicols.createOrReplaceTempView("tmp_view")
sql_cmd = f"""INSERT INTO
    lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols
    SELECT * FROM  tmp_view
    """
spark.sql(sql_cmd)

sql_cmd = f"""INSERT OVERWRITE
    lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols
    PARTITION (color=null)
    SELECT animals, n_legs FROM  tmp_view
    """
spark.sql(sql_cmd)

gives:

AnalysisException: Cannot write incompatible data to table 'lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols':
- Cannot safely cast 'n_legs': string to bigint
- Cannot write nullable values to non-null column 'color'

Rule 5. The fields in the expression should be within partition columns
Spark counterpart example:

sql_cmd = f"""INSERT OVERWRITE
    lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols
    PARTITION (animals='pig')
    SELECT n_legs, color FROM  tmp_view
    """
spark.sql(sql_cmd)

gives:

AnalysisException: PARTITION clause cannot contain a non-partition column name: animals

Rule 6. The iceberg table fields specified in the expression could not have hidden partitioning, however, the non-specified fields could.

Spark counterpart example:

create_sql = """CREATE TABLE lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols_with_transforms ( 
    n_legs bigint, 
    animals string,
    color string
) 
USING iceberg
PARTITIONED BY (n_legs, truncate(color, 1))
"""
spark.sql(create_sql)

sql_cmd = f"""INSERT OVERWRITE
    lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols_with_transforms
    PARTITION (color='red')
    SELECT n_legs, animals FROM  tmp_view
    """
spark.sql(sql_cmd)

gives:

AnalysisException: PARTITION clause cannot contain a non-partition column name: color

However, if we specify the other partition column with identity transform, it works:

sql_cmd = f"""INSERT OVERWRITE
    lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols_with_transforms
    PARTITION (n_legs=1)
    SELECT color, animals FROM  tmp_view
    """
spark.sql(sql_cmd)

Rule 7. Three-way relationship between filter, arrow table and iceberg table schema

Rule 7 Case 1

from pyspark.sql.types import StructType, StructField, StringType, LongType
data_multicols_extra_col = [(2, "Flamingo", "red", "dummy"), (4, "Horse", "white", "dummy"), (4, "Pig", "pink", "dummy")]
schema_extra_col = StructType([
    StructField("n_legs", LongType(), nullable=True),
    StructField("animals", StringType(), nullable=True),
    StructField("color", StringType(), nullable=True),
    StructField("extra_col", StringType(), nullable=True)  
])
df_multicols_extra_col = spark.createDataFrame(data_multicols_extra_col, schema_extra_col)
df_multicols_extra_col.createOrReplaceTempView("tmp_view_extra_col")

gives:

AnalysisException: Cannot write to 'lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols', too many data columns:
Table columns: 'n_legs', 'animals', 'color'
Data columns: 'n_legs', 'extra_col', 'color', 'animals'

Rule 7 Case 2:

sql_cmd = f"""INSERT OVERWRITE
    lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols
    PARTITION (n_legs = 2)
    SELECT animals FROM  tmp_view_extra_col
    """
spark.sql(sql_cmd)

gives

AnalysisException: Cannot write to 'lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols', not enough data columns:
Table columns: 'n_legs', 'animals', 'color'
Data columns: 'n_legs', 'animals'

Rule 7 Case 3

sql_cmd = f"""INSERT OVERWRITE
    lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols
    PARTITION (n_legs = 2)
    SELECT n_legs, color,animals FROM  tmp_view
    """
spark.sql(sql_cmd)

gives

AnalysisException: Cannot write to 'lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols', too many data columns:
Table columns: 'n_legs', 'animals', 'color'
Data columns: 'n_legs', 'n_legs', 'color', 'animals'

Rule 7 Case 4

from pyspark.sql.types import StructType, StructField, StringType, LongType
data_multicols_type_mismatch = [("dummy", "Flamingo", "red"), ("dummy", "Horse", "white"), ("dummy", "Pig", "pink")]
schema_type_mismatch = StructType([
    StructField("n_legs", StringType(), nullable=True),
    StructField("animals", StringType(), nullable=True),
    StructField("color", StringType(), nullable=True), 
])
df_multicols_type_mismatch = spark.createDataFrame(data_multicols_type_mismatch, schema_type_mismatch)
df_multicols_type_mismatch.createOrReplaceTempView("tmp_view_type_mismatch")

sql_cmd = f"""INSERT OVERWRITE
    lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols
    PARTITION (color = 'red')
    SELECT n_legs, animals FROM  tmp_view_type_mismatch
    """
spark.sql(sql_cmd)

gives:

AnalysisException: Cannot write incompatible data to table 'lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols':
- Cannot safely cast 'n_legs': string to bigint

Special Case Rule 7
Please notice: Static overwrite with filter includes a complex scenario where filter has a subset of partition columns and the arrow table has the rest where full partition key is dynamically discovered for those non-specified partition fields.

# Static overwriter with subset of partition columns
sql_cmd = f"""INSERT OVERWRITE
    lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols
    PARTITION (n_legs = 2)
    SELECT animals,color FROM  tmp_view
    """
spark.sql(sql_cmd)

gives:
image

For 3 way comparison, we need to print the mismatches clearly, sample of rule 7 printing:
https://github.com/jqin61/iceberg-python/pull/4/files#diff-f3df6bef7f6c6d2b05df9a09e0039ff4391ebb66725d41bb0f27f26bb2eb4047R1213

@Fokko
Copy link

Fokko commented Mar 24, 2024

Hey Adrian, thanks for working on this! I've gone over the PR, and would like to thank you for the comprehensive writeup.

Looking at the PR, my main question would be, do we want to 1 on 1 copy the old Hive? I'm questioning that for the following reason: With Hive you have a single partitioning scheme at the time. With Iceberg this can be different since you've evolved the partitioning allong the way. Let's say that you have an orders table that quickly grows because your business is doing very well. You might want to change the monthly partitioning in a daily one, now you end up with partitions like: 2024-02, 2024-03, (evolve partition from month to day), 2024-03-22, 2024-03-23, 2024-03-24, ....

On the bright side. There is one important concept in Iceberg that will greatly simplify the code. The idea is that you can atomically create more snapshots during a single write operation. This means that you can do a DELETE+APPEND in an atomic way. I was playing around with this on the Java side, and it looks a bit odd there, but I think we should use this on the Python side.

My suggestion would be to decouple the delete/write. And let PyIceberg decide if we want to evaluate against the partitions or also against data files itself. We could add flags that you don't want to rewrite parquet files (only delete full files from the Iceberg metadata).

Last week I've added the StrictMetricsEvaluator that can be used to drop whole datafiles based on the column metrics, and this week I've added the strict projection that allows dropping whole manifests both on the record predicate, but also on the column metrics. Once apache#518 is in, I can wire everything up and allow for DELETE operations. The static overwrite would then be possible by first down the DELETE and then an APPEND operation.

This would mean that the partitioned writes, only would do the write part (the APPEND operation), and we can re-use the DELETE logic to efficiently remove the data that's being overwritten.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants