Skip to content

Commit 06fd2da

Browse files
committed
ARROW-6077: [C++][Parquet] Build Arrow "schema tree" from Parquet schema to help with nested data implementation
Introduces auxiliary internal `SchemaManifest` and `SchemaField` data structures. This also permits dictionary-encoded subfields in a slightly more principled way (the dictionary type creation is resolved one time, so this removes the `FixSchema` hacks that were there before). I rewrote the nested schema conversion logic to hopefully be slightly easier to follow though it could still use some work. I added comments within to explain the 3 different styles of list encoding There are a couple of API changes: * The `FileReader::GetSchema(indices, &schema)` method has been removed. The way that "projected" schemas were being constructed was pretty hacky, and this function is non-essential to the operation of the class. I had to remove bindings in the GLib and R libraries for this function, but as far as I can tell these bindings were non-essential to operation, and were added only because the function was there to wrap. * Added `FileWriter::Make` factory method, making constructor private This patch was pretty unpleasant to do -- it removes some hacky functions used to create Arrow fields with leaf nodes trimmed. There is little functional change; it is an attempt to bring a cleaner structure for full-fledged nested data reading I'm going to get on with seeing through user-facing dictionary-encoding functionality in Python Closes #4971 from wesm/parquet-arrow-schema-tree and squashes the following commits: e1f19c0 <Wes McKinney> Code review feedback e2c117a <Wes McKinney> Factor out list nesting into helper function Authored-by: Wes McKinney <[email protected]> Signed-off-by: Wes McKinney <[email protected]>
1 parent e4febfb commit 06fd2da

23 files changed

+1408
-1429
lines changed

c_glib/parquet-glib/arrow-file-reader.cpp

Lines changed: 1 addition & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -231,15 +231,8 @@ gparquet_arrow_file_reader_get_schema(GParquetArrowFileReader *reader,
231231
{
232232
auto parquet_arrow_file_reader = gparquet_arrow_file_reader_get_raw(reader);
233233

234-
const auto n_columns =
235-
parquet_arrow_file_reader->parquet_reader()->metadata()->num_columns();
236-
std::vector<int> indices(n_columns);
237-
for (int i = 0; i < n_columns; ++i) {
238-
indices[i] = i;
239-
}
240-
241234
std::shared_ptr<arrow::Schema> arrow_schema;
242-
auto status = parquet_arrow_file_reader->GetSchema(indices, &arrow_schema);
235+
auto status = parquet_arrow_file_reader->GetSchema(&arrow_schema);
243236
if (garrow_error_check(error,
244237
status,
245238
"[parquet][arrow][file-reader][get-schema]")) {
@@ -249,42 +242,6 @@ gparquet_arrow_file_reader_get_schema(GParquetArrowFileReader *reader,
249242
}
250243
}
251244

252-
/**
253-
* gparquet_arrow_file_reader_select_schema:
254-
* @reader: A #GParquetArrowFileReader.
255-
* @column_indexes: (array length=n_column_indexes):
256-
* The array of column indexes to be selected.
257-
* @n_column_indexes: The length of `column_indexes`.
258-
* @error: (nullable): Return locatipcn for a #GError or %NULL.
259-
*
260-
* Returns: (transfer full) (nullable): A selected #GArrowSchema.
261-
*
262-
* Since: 0.12.0
263-
*/
264-
GArrowSchema *
265-
gparquet_arrow_file_reader_select_schema(GParquetArrowFileReader *reader,
266-
gint *column_indexes,
267-
gsize n_column_indexes,
268-
GError **error)
269-
{
270-
auto parquet_arrow_file_reader = gparquet_arrow_file_reader_get_raw(reader);
271-
272-
std::vector<int> indices(n_column_indexes);
273-
for (gsize i = 0; i < n_column_indexes; ++i) {
274-
indices[i] = column_indexes[i];
275-
}
276-
277-
std::shared_ptr<arrow::Schema> arrow_schema;
278-
auto status = parquet_arrow_file_reader->GetSchema(indices, &arrow_schema);
279-
if (garrow_error_check(error,
280-
status,
281-
"[parquet][arrow][file-reader][select-schema]")) {
282-
return garrow_schema_new_raw(&arrow_schema);
283-
} else {
284-
return NULL;
285-
}
286-
}
287-
288245
/**
289246
* gparquet_arrow_file_reader_read_column_data:
290247
* @reader: A #GParquetArrowFileReader.

c_glib/parquet-glib/arrow-file-reader.h

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,6 @@ gparquet_arrow_file_reader_read_table(GParquetArrowFileReader *reader,
4848
GArrowSchema *
4949
gparquet_arrow_file_reader_get_schema(GParquetArrowFileReader *reader,
5050
GError **error);
51-
GArrowSchema *
52-
gparquet_arrow_file_reader_select_schema(GParquetArrowFileReader *reader,
53-
gint *column_indexes,
54-
gsize n_column_indexes,
55-
GError **error);
5651

5752
GARROW_AVAILABLE_IN_1_0
5853
GArrowChunkedArray *

c_glib/test/parquet/test-arrow-file-reader.rb

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,19 +39,6 @@ def test_schema
3939
SCHEMA
4040
end
4141

42-
def test_select_schema
43-
assert_equal(<<-SCHEMA.chomp, @reader.select_schema([0]).to_s)
44-
a: string
45-
SCHEMA
46-
assert_equal(<<-SCHEMA.chomp, @reader.select_schema([1]).to_s)
47-
b: int32
48-
SCHEMA
49-
assert_equal(<<-SCHEMA.chomp, @reader.select_schema([0, 1]).to_s)
50-
a: string
51-
b: int32
52-
SCHEMA
53-
end
54-
5542
def test_read_column
5643
assert_equal([
5744
Arrow::ChunkedArray.new([@a_array]),

cpp/src/parquet/arrow/arrow-reader-writer-test.cc

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
#include "parquet/api/writer.h"
4141

4242
#include "parquet/arrow/reader.h"
43+
#include "parquet/arrow/reader_internal.h"
4344
#include "parquet/arrow/schema.h"
4445
#include "parquet/arrow/test-util.h"
4546
#include "parquet/arrow/writer.h"
@@ -597,12 +598,16 @@ class TestParquetIO : public ::testing::Test {
597598
std::shared_ptr<::arrow::Schema> arrow_schema;
598599
ArrowReaderProperties props;
599600
ASSERT_OK_NO_THROW(FromParquetSchema(&descriptor, props, &arrow_schema));
600-
FileWriter writer(::arrow::default_memory_pool(), MakeWriter(schema), arrow_schema);
601-
ASSERT_OK_NO_THROW(writer.NewRowGroup(values->length()));
602-
ASSERT_OK_NO_THROW(writer.WriteColumnChunk(*values));
603-
ASSERT_OK_NO_THROW(writer.Close());
604-
// writer.Close() should be idempotent
605-
ASSERT_OK_NO_THROW(writer.Close());
601+
602+
std::unique_ptr<FileWriter> writer;
603+
ASSERT_OK_NO_THROW(FileWriter::Make(::arrow::default_memory_pool(),
604+
MakeWriter(schema), arrow_schema,
605+
default_arrow_writer_properties(), &writer));
606+
ASSERT_OK_NO_THROW(writer->NewRowGroup(values->length()));
607+
ASSERT_OK_NO_THROW(writer->WriteColumnChunk(*values));
608+
ASSERT_OK_NO_THROW(writer->Close());
609+
// writer->Close() should be idempotent
610+
ASSERT_OK_NO_THROW(writer->Close());
606611
}
607612

608613
void ResetSink() { sink_ = CreateOutputStream(); }
@@ -789,13 +794,17 @@ TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) {
789794
std::shared_ptr<::arrow::Schema> arrow_schema;
790795
ArrowReaderProperties props;
791796
ASSERT_OK_NO_THROW(FromParquetSchema(&descriptor, props, &arrow_schema));
792-
FileWriter writer(default_memory_pool(), this->MakeWriter(schema), arrow_schema);
797+
798+
std::unique_ptr<FileWriter> writer;
799+
ASSERT_OK_NO_THROW(FileWriter::Make(::arrow::default_memory_pool(),
800+
this->MakeWriter(schema), arrow_schema,
801+
default_arrow_writer_properties(), &writer));
793802
for (int i = 0; i < 4; i++) {
794-
ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
803+
ASSERT_OK_NO_THROW(writer->NewRowGroup(chunk_size));
795804
std::shared_ptr<Array> sliced_array = values->Slice(i * chunk_size, chunk_size);
796-
ASSERT_OK_NO_THROW(writer.WriteColumnChunk(*sliced_array));
805+
ASSERT_OK_NO_THROW(writer->WriteColumnChunk(*sliced_array));
797806
}
798-
ASSERT_OK_NO_THROW(writer.Close());
807+
ASSERT_OK_NO_THROW(writer->Close());
799808

800809
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values));
801810
}
@@ -859,14 +868,17 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) {
859868
std::shared_ptr<::arrow::Schema> arrow_schema;
860869
ArrowReaderProperties props;
861870
ASSERT_OK_NO_THROW(FromParquetSchema(&descriptor, props, &arrow_schema));
862-
FileWriter writer(::arrow::default_memory_pool(), this->MakeWriter(schema),
863-
arrow_schema);
871+
872+
std::unique_ptr<FileWriter> writer;
873+
ASSERT_OK_NO_THROW(FileWriter::Make(::arrow::default_memory_pool(),
874+
this->MakeWriter(schema), arrow_schema,
875+
default_arrow_writer_properties(), &writer));
864876
for (int i = 0; i < 4; i++) {
865-
ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
877+
ASSERT_OK_NO_THROW(writer->NewRowGroup(chunk_size));
866878
std::shared_ptr<Array> sliced_array = values->Slice(i * chunk_size, chunk_size);
867-
ASSERT_OK_NO_THROW(writer.WriteColumnChunk(*sliced_array));
879+
ASSERT_OK_NO_THROW(writer->WriteColumnChunk(*sliced_array));
868880
}
869-
ASSERT_OK_NO_THROW(writer.Close());
881+
ASSERT_OK_NO_THROW(writer->Close());
870882

871883
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values));
872884
}
@@ -2624,11 +2636,15 @@ TEST(TestArrowReaderAdHoc, DISABLED_LargeStringColumn) {
26242636
GroupNode::Make("schema", Repetition::REQUIRED, {schm->group_node()->field(0)}));
26252637

26262638
auto writer = ParquetFileWriter::Open(sink, schm_node);
2627-
FileWriter arrow_writer(default_memory_pool(), std::move(writer), table->schema());
2639+
2640+
std::unique_ptr<FileWriter> arrow_writer;
2641+
ASSERT_OK_NO_THROW(FileWriter::Make(::arrow::default_memory_pool(), std::move(writer),
2642+
table->schema(), default_arrow_writer_properties(),
2643+
&arrow_writer));
26282644
for (int i : {0, 1}) {
2629-
ASSERT_OK_NO_THROW(arrow_writer.WriteTable(*table, table->num_rows())) << i;
2645+
ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table, table->num_rows())) << i;
26302646
}
2631-
ASSERT_OK_NO_THROW(arrow_writer.Close());
2647+
ASSERT_OK_NO_THROW(arrow_writer->Close());
26322648

26332649
std::shared_ptr<Buffer> tables_buffer;
26342650
ASSERT_OK_NO_THROW(sink->Finish(&tables_buffer));

0 commit comments

Comments
 (0)