|
36 | 36 | from pyiceberg.catalog.sql import SqlCatalog
|
37 | 37 | from pyiceberg.exceptions import NoSuchTableError
|
38 | 38 | from pyiceberg.schema import Schema
|
39 |
| -from pyiceberg.table import TableProperties, _dataframe_to_data_files |
| 39 | +from pyiceberg.table import SetPropertiesUpdate, TableProperties, _dataframe_to_data_files |
40 | 40 | from pyiceberg.types import (
|
41 | 41 | BinaryType,
|
42 | 42 | BooleanType,
|
@@ -356,31 +356,50 @@ def test_data_files(spark: SparkSession, session_catalog: Catalog, arrow_table_w
|
356 | 356 |
|
357 | 357 |
|
358 | 358 | @pytest.mark.integration
|
359 |
| -@pytest.mark.parametrize("format_version", [1, 2]) |
360 |
| -def test_write_multiple_data_files( |
361 |
| - spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int |
362 |
| -) -> None: |
363 |
| - identifier = "default.write_multiple_arrow_data_files" |
364 |
| - tbl = _create_table(session_catalog, identifier, {"format-version": format_version}, []) |
| 359 | +def test_write_bin_pack_data_files(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: |
| 360 | + identifier = "default.write_bin_pack_data_files" |
| 361 | + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, []) |
365 | 362 |
|
366 | 363 | def get_data_files_count(identifier: str) -> int:
|
367 | 364 | return spark.sql(
|
368 | 365 | f"""
|
369 | 366 | SELECT *
|
370 |
| - FROM {identifier}.all_data_files |
| 367 | + FROM {identifier}.files |
371 | 368 | """
|
372 | 369 | ).count()
|
373 | 370 |
|
374 |
| - # writes to 1 data file since the table is small |
| 371 | + def set_table_properties(tbl: Table, properties: Properties) -> Table: |
| 372 | + with tbl.transaction() as transaction: |
| 373 | + transaction._apply((SetPropertiesUpdate(updates=properties),)) |
| 374 | + return tbl |
| 375 | + |
| 376 | + # writes 1 data file since the table is smaller than default target file size |
| 377 | + assert arrow_table_with_null.nbytes < TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT |
375 | 378 | tbl.overwrite(arrow_table_with_null)
|
376 | 379 | assert get_data_files_count(identifier) == 1
|
377 | 380 |
|
378 |
| - # writes to 1 data file as long as table is smaller than default target file size |
| 381 | + # writes 1 data file as long as table is smaller than default target file size |
379 | 382 | bigger_arrow_tbl = pa.concat_tables([arrow_table_with_null] * 10)
|
380 |
| - tbl.overwrite(bigger_arrow_tbl) |
381 | 383 | assert bigger_arrow_tbl.nbytes < TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT
|
| 384 | + tbl.overwrite(bigger_arrow_tbl) |
382 | 385 | assert get_data_files_count(identifier) == 1
|
383 | 386 |
|
| 387 | + # writes multiple data files once target file size is overridden |
| 388 | + target_file_size = arrow_table_with_null.nbytes |
| 389 | + tbl = set_table_properties(tbl, {TableProperties.WRITE_TARGET_FILE_SIZE_BYTES: str(target_file_size)}) |
| 390 | + assert str(target_file_size) == tbl.properties.get(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES) |
| 391 | + assert target_file_size < bigger_arrow_tbl.nbytes |
| 392 | + tbl.overwrite(bigger_arrow_tbl) |
| 393 | + assert get_data_files_count(identifier) == 10 |
| 394 | + |
| 395 | + # writes half the number of data files when target file size doubles |
| 396 | + target_file_size = arrow_table_with_null.nbytes * 2 |
| 397 | + tbl = set_table_properties(tbl, {TableProperties.WRITE_TARGET_FILE_SIZE_BYTES: str(target_file_size)}) |
| 398 | + assert str(target_file_size) == tbl.properties.get(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES) |
| 399 | + assert target_file_size < bigger_arrow_tbl.nbytes |
| 400 | + tbl.overwrite(bigger_arrow_tbl) |
| 401 | + assert get_data_files_count(identifier) == 5 |
| 402 | + |
384 | 403 |
|
385 | 404 | @pytest.mark.integration
|
386 | 405 | @pytest.mark.parametrize("format_version", [1, 2])
|
|
0 commit comments