diff --git a/patterns/parquet.hexpat b/patterns/parquet.hexpat new file mode 100644 index 00000000..6306c728 --- /dev/null +++ b/patterns/parquet.hexpat @@ -0,0 +1,709 @@ +/* +Apache Parquet File Format +With help from Claude AI + +Known Limits: +- Not all metadata fields named in Pattern Data +- DataPageHeaderV2 not supported + +References: +https://parquet.apache.org/docs/file-format/ß +https://issues.apache.org/jira/secure/attachment/12399869/compact-proto-spec-2.txt +https://issues.apache.org/jira/secure/attachment/12399879/thrift-110-v12.patch +https://raw.githubusercontent.com/apache/parquet-format/refs/heads/master/src/main/thrift/parquet.thrift +*/ + +#pragma description Apache Parquet File Format +#pragma endian little +#pragma MIME application/x-thrift-compact + +import std.mem; +import std.sys; +import std.core; +import type.leb128; + +s16 last_field_id = 0; +std::mem::Section last_field_id_stack = + std::mem::create_section("last_field_id_stack"); +u16 last_field_id_stack_size = 0 [[export]]; // Should be 0 at end of parse +u16 last_field_id_stack_size_max = 0; +fn push_last_field_id() { + s16 last_field_id_stack_top @ + last_field_id_stack_size * sizeof(s16) in last_field_id_stack; + last_field_id_stack_top = last_field_id; + last_field_id_stack_size += 1; + if (last_field_id_stack_size_max < last_field_id_stack_size) + last_field_id_stack_size_max = last_field_id_stack_size; +}; +fn pop_last_field_id() { + last_field_id_stack_size -= 1; + s16 last_field_id_stack_top @ + last_field_id_stack_size * sizeof(s16) in last_field_id_stack; + last_field_id = last_field_id_stack_top; +}; + +std::mem::Section column_offset_list = std::mem::create_section("column_offset_list"); +auto column_offset_list_size = 0; +fn push_column_offset(s64 page_offset) { + s64 column_offset_list_end @ column_offset_list_size * sizeof(s64) in column_offset_list; + column_offset_list_end = page_offset; + column_offset_list_size += 1; +}; + +using CompactI16; +using CompactI32; +using CompactI64; +using CompactBinary; +using CompactList; +using CompactMap; +using ThriftStruct; + +// TCompactProtocol Type Constants +enum TCompactType : u8 { + CT_STOP = 0x00, + CT_BOOLEAN_TRUE = 0x01, + CT_BOOLEAN_FALSE = 0x02, + CT_BYTE = 0x03, + CT_I16 = 0x04, + CT_I32 = 0x05, + CT_I64 = 0x06, + CT_DOUBLE = 0x07, + CT_BINARY = 0x08, + CT_LIST = 0x09, + CT_SET = 0x0A, +// CT_MAP = 0x0B, // Thrift Map not used in Parquet Metadata + CT_STRUCT = 0x0C, + CT_EXTENDED = 0x0F +}; + +using VarInt = type::LEB128; + +// ZigZag decode for signed integers +fn zigzag_decode_32(u32 n) { + return s32((n >> 1) ^ (-(n & 1))); +}; + +fn zigzag_decode_64(u64 n) { + return s64((n >> 1) ^ (-(n & 1))); +}; + +/// Field header structure +/// Do not place ThriftFieldHeader directly +/// Always place ThriftStruct +/// Because ThriftFieldHeader depends on global last_field_id_stack +struct ThriftFieldHeader { + u8 type_and_delta; + // Extract type (lower 4 bits) + TCompactType field_type = type_and_delta & 0x0F [[export]]; + if (type_and_delta == 0x0) break; + + // Extract field ID delta (upper 4 bits) + u8 field_id_delta = (type_and_delta & 0xF0) >> 4 [[export]]; + + // If delta is 0, field ID follows as varint + if (field_id_delta == 0) { + VarInt field_id_varint; + s16 field_id = s16(zigzag_decode_32(u32(field_id_varint))) [[export]]; + last_field_id = field_id; + } else { + // Field ID is previous_field_id + delta + s16 field_id = last_field_id + field_id_delta [[export]]; + last_field_id = field_id; + } +} [[format("field_header_format")]]; + +fn field_header_format(ThriftFieldHeader header) { + if (header.type_and_delta == 0) return "STOP field"; + if (header.field_id_delta == 0) { + return std::format("Field ID: {}, Type: {:#02x} {}", + header.field_id, u8(header.field_type), header.field_type); + } else { + return std::format("Field ID Delta: {}, Type: {:#02x} {}", + header.field_id_delta, u8(header.field_type), header.field_type); + } +}; + +// Variable-length string/binary +struct CompactBinary { + VarInt length; + char data[length]; +} [[format("compact_binary_format")]]; + +fn compact_binary_format(CompactBinary bin) { + return std::format("Length: {}, Data: {}", bin.length, bin.data); +}; + +// Variable-length integer types +struct CompactI32 { + VarInt raw_varint; + s32 value = zigzag_decode_32(u32(raw_varint)) [[export]]; +} [[format("compact_i32_format")]]; + +fn compact_i32_format(CompactI32 val) { + return std::format("I32: {}", val.value); +}; + +struct CompactI64 { + VarInt raw_varint; + s64 value = zigzag_decode_64(raw_varint) [[export]]; +} [[format("compact_i64_format")]]; + +fn compact_i64_format(CompactI64 val) { + return std::format("I64: {}", val.value); +}; + +struct CompactI16 { + VarInt raw_varint; + s16 value = s16(zigzag_decode_32(u32(raw_varint.value))) [[export]]; +} [[format("compact_i16_format")]]; + +fn compact_i16_format(CompactI16 val) { + return std::format("I16: {}", val.value); +}; + +// List/Set structure +struct CompactList { + u8 size_and_type; + + TCompactType element_type = size_and_type & 0x0F [[export]]; + u8 size_info = (size_and_type & 0xF0) >> 4; + + // If size_info >= 15, actual size follows as varint + u32 size = 0 [[export]]; + if (size_info == 0x0F) { + VarInt size_varint; + size = u32(size_varint); + } else { + size = size_info; + } + + match (element_type) { + (TCompactType::CT_BOOLEAN_TRUE): { + bool value = true [[export]]; + } + (TCompactType::CT_BOOLEAN_FALSE): { + bool value = false [[export]]; + } + (TCompactType::CT_BYTE): { + s8 value[size]; + } + (TCompactType::CT_I16): { + CompactI16 value[size]; + } + (TCompactType::CT_I32): { + CompactI32 value[size]; + } + (TCompactType::CT_I64): { + CompactI64 value[size]; + } + (TCompactType::CT_DOUBLE): { + double value[size]; + } + (TCompactType::CT_BINARY): { + CompactBinary value[size]; + } + (TCompactType::CT_LIST): { + CompactList value[size]; + } + (TCompactType::CT_SET): { + CompactList value[size]; // Same encoding as list + } + (TCompactType::CT_STRUCT): { + ThriftStruct value[size]; + } + } +} [[format("compact_list_format")]]; + +fn compact_list_format(ref CompactList list) { + return std::format("List: {} elements of type {:#02x}", + list.size, u8(list.element_type)); +}; + +/// Thrift field structure +/// Do not place ThriftField directly +/// Either place ThriftStruct or place its value +/// Because FieldHeader depends on global last_field_id_stack +struct ThriftField { + ThriftFieldHeader header; + if (header.field_type == TCompactType::CT_STOP) break; + + // Only parse value if not STOP + if (header.field_type != TCompactType::CT_STOP) { + match (header.field_type) { + (TCompactType::CT_BOOLEAN_TRUE): { + bool value; + } + (TCompactType::CT_BOOLEAN_FALSE): { + bool value; + } + (TCompactType::CT_BYTE): { + s8 value; + } + (TCompactType::CT_I16): { + CompactI16 value; + } + (TCompactType::CT_I32): { + CompactI32 value; + } + (TCompactType::CT_I64): { + CompactI64 value; + } + (TCompactType::CT_DOUBLE): { + double value; + } + (TCompactType::CT_BINARY): { + CompactBinary value; + } + (TCompactType::CT_LIST): { + CompactList value; + } + (TCompactType::CT_SET): { + CompactList value; // Same encoding as list + } + (TCompactType::CT_STRUCT): { + ThriftStruct value; + } + } + } +} [[format("thrift_field_format")]]; + +fn thrift_field_format(ref ThriftField field) { + if (field.header.field_type == TCompactType::CT_STOP) { + return "STOP field"; + } else { + return std::format("Field ID: {}, Value: {}", + field.header.field_id, field.value); + } +}; + +// Thrift struct +struct ThriftStruct { + push_last_field_id(); + last_field_id = 0; + + ThriftField fields[while(!std::mem::eof())]; + + pop_last_field_id(); +} [[format("thrift_struct_format")]]; + +fn thrift_struct_format(ref ThriftStruct thrift_struct) { + return std::format("Thrift Struct with {} fields", + std::core::member_count(thrift_struct.fields)); +}; + +fn ptr_field_value_by_id(ref ThriftStruct s, s16 field_id) { + for (auto i = 0, i < std::core::member_count(s.fields), i += 1) { + if (s.fields[i].header.field_id == field_id) { + return addressof(s.fields[i].value); + } + } + std::error("Cannot find field with id {} in {}", field_id, s); +}; + +fn idx_field_by_id(ref ThriftStruct s, s16 field_id, s16 since_idx = 0) { + for (auto i = since_idx, i < std::core::member_count(s.fields), i += 1) { + if (s.fields[i].header.type_and_delta == 0x0) { + // std::print("is STOP field"); + continue; + } + if (s.fields[i].header.field_id == field_id) { + return i; + } + } + std::error(std::format("Cannot find field with id {} in {}", field_id, s)); +}; + + +/* +struct SchemaElement { + 1: optional Type type; + 2: optional i32 type_length; + 3: optional FieldRepetitionType repetition_type; + 4: required string name; + 5: optional i32 num_children; + 6: optional ConvertedType converted_type; + 7: optional i32 scale + 8: optional i32 precision + 9: optional i32 field_id; + 10: optional LogicalType logicalType +} +*/ +fn set_field_names_SchemaElement(ref auto fields) { + for (auto i = 0, i < std::core::member_count(fields), i += 1) { + if (fields[i].header.type_and_delta == 0) { + std::core::set_display_name(fields[i], "STOP"); + break; + } + match (fields[i].header.field_id) { + (1): std::core::set_display_name(fields[i], "type"); + (2): std::core::set_display_name(fields[i], "type_length"); + (3): std::core::set_display_name(fields[i], "repetition_type"); + (4): std::core::set_display_name(fields[i], "name"); + (5): std::core::set_display_name(fields[i], "num_children"); + (6): std::core::set_display_name(fields[i], "converted_type"); + (7): std::core::set_display_name(fields[i], "scale"); + (8): std::core::set_display_name(fields[i], "precision"); + (9): std::core::set_display_name(fields[i], "field_id"); + (10): std::core::set_display_name(fields[i], "logicalType"); + } + } +}; + +/* +struct ColumnMetaData { + 1: required Type type + 2: required list encodings + 3: required list path_in_schema + 4: required CompressionCodec codec + 5: required i64 num_values + 6: required i64 total_uncompressed_size + 7: required i64 total_compressed_size + 8: optional list key_value_metadata + 9: required i64 data_page_offset + 10: optional i64 index_page_offset + 11: optional i64 dictionary_page_offset + 12: optional Statistics statistics; + 13: optional list encoding_stats; + 14: optional i64 bloom_filter_offset; + 15: optional i32 bloom_filter_length; + 16: optional SizeStatistics size_statistics; + 17: optional GeospatialStatistics geospatial_statistics; +} +*/ +fn set_field_names_ColumnMetaData(ref auto fields) { + for (auto i = 0, i < std::core::member_count(fields), i += 1) { + if (fields[i].header.type_and_delta == 0) { + std::core::set_display_name(fields[i], "STOP"); + break; + } + match (fields[i].header.field_id) { + (1): std::core::set_display_name(fields[i], "type"); + (2): std::core::set_display_name(fields[i], "encodings"); + (3): std::core::set_display_name(fields[i], "path_in_schema"); + (4): std::core::set_display_name(fields[i], "codec"); + (5): std::core::set_display_name(fields[i], "num_values"); + (6): std::core::set_display_name(fields[i], "total_uncompressed_size"); + (7): std::core::set_display_name(fields[i], "total_compressed_size"); + (8): std::core::set_display_name(fields[i], "key_value_metadata"); + (9): std::core::set_display_name(fields[i], "data_page_offset"); + (10): std::core::set_display_name(fields[i], "index_page_offset"); + (11): std::core::set_display_name(fields[i], "dictionary_page_offset"); + (12): std::core::set_display_name(fields[i], "statistics"); + (13): std::core::set_display_name(fields[i], "encoding_stats"); + (14): std::core::set_display_name(fields[i], "bloom_filter_offset"); + (15): std::core::set_display_name(fields[i], "bloom_filter_length"); + (16): std::core::set_display_name(fields[i], "size_statistics"); + (17): std::core::set_display_name(fields[i], "geospatial_statistics"); + } + } +}; + +/* +struct ColumnChunk { + 1: optional string file_path + 2: required i64 file_offset = 0 + 3: optional ColumnMetaData meta_data // actually required + 4: optional i64 offset_index_offset + 5: optional i32 offset_index_length + 6: optional i64 column_index_offset + 7: optional i32 column_index_length + 8: optional ColumnCryptoMetaData crypto_metadata + 9: optional binary encrypted_column_metadata +} +*/ +fn set_field_names_ColumnChunk(ref auto fields) { + for (auto i = 0, i < std::core::member_count(fields), i += 1) { + if (fields[i].header.type_and_delta == 0) { + std::core::set_display_name(fields[i], "STOP"); + break; + } + match (fields[i].header.field_id) { + (1): std::core::set_display_name(fields[i], "file_path"); + (2): std::core::set_display_name(fields[i], "file_offset"); + (3): std::core::set_display_name(fields[i], "meta_data"); + (4): std::core::set_display_name(fields[i], "offset_index_offset"); + (5): std::core::set_display_name(fields[i], "offset_index_length"); + (6): std::core::set_display_name(fields[i], "column_index_offset"); + (7): std::core::set_display_name(fields[i], "column_index_length"); + (8): std::core::set_display_name(fields[i], "crypto_metadata"); + (9): std::core::set_display_name(fields[i], "encrypted_column_metadata"); + } + if (fields[i].header.field_id == 3) { + set_field_names_ColumnMetaData(fields[i].value.fields); + } + } +}; + +/* +struct RowGroup { + 1: required list columns + 2: required i64 total_byte_size + 3: required i64 num_rows + 4: optional list sorting_columns + 5: optional i64 file_offset + 6: optional i64 total_compressed_size + 7: optional i16 ordinal +} +*/ +fn set_field_names_RowGroup(ref auto fields) { + for (auto i = 0, i < std::core::member_count(fields), i += 1) { + if (fields[i].header.type_and_delta == 0) { + std::core::set_display_name(fields[i], "STOP"); + break; + } + match (fields[i].header.field_id) { + (1): std::core::set_display_name(fields[i], "columns"); + (2): std::core::set_display_name(fields[i], "total_byte_size"); + (3): std::core::set_display_name(fields[i], "num_rows"); + (4): std::core::set_display_name(fields[i], "sorting_columns"); + (5): std::core::set_display_name(fields[i], "file_offset"); + (6): std::core::set_display_name(fields[i], "total_compressed_size"); + (7): std::core::set_display_name(fields[i], "ordinal"); + } + if (fields[i].header.field_id == 1) { + auto n_fields = std::core::member_count(fields[i].value.value); + for (auto j = 0, j < n_fields, j += 1) { + set_field_names_ColumnChunk(fields[i].value.value[j].fields); + } + } + } +}; + +/* +struct FileMetaData { + 1: required i32 version + 2: required list schema; + 3: required i64 num_rows + 4: required list row_groups + 5: optional list key_value_metadata + 6: optional string created_by + 7: optional list column_orders; + 8: optional EncryptionAlgorithm encryption_algorithm + 9: optional binary footer_signing_key_metadata +} +*/ +fn set_field_names_FileMetadata(ref auto fields) { + for (auto i = 0, i < std::core::member_count(fields), i += 1) { + if (fields[i].header.type_and_delta == 0) { + std::core::set_display_name(fields[i], "STOP"); + break; + } + // STOP should always be the last field + match (fields[i].header.field_id) { + (1): std::core::set_display_name(fields[i], "version"); + (2): std::core::set_display_name(fields[i], "schema"); + (3): std::core::set_display_name(fields[i], "num_rows"); + (4): std::core::set_display_name(fields[i], "row_groups"); + (5): std::core::set_display_name(fields[i], "key_value_metadata"); + (6): std::core::set_display_name(fields[i], "created_by"); + (7): std::core::set_display_name(fields[i], "column_orders"); + (8): std::core::set_display_name(fields[i], "encryption_algorithm"); + (9): std::core::set_display_name(fields[i], "footer_signing_key_metadata"); + } + if (fields[i].header.field_id == 2) { + auto n_fields = std::core::member_count(fields[i].value.value); + for (auto j = 0, j < n_fields, j += 1) { + set_field_names_SchemaElement(fields[i].value.value[j].fields); + } + } + if (fields[i].header.field_id == 4) { + auto n_fields = std::core::member_count(fields[i].value.value); + for (auto j = 0, j < n_fields, j += 1) { + set_field_names_RowGroup(fields[i].value.value[j].fields); + } + } + } +}; + +struct FileMetadata : ThriftStruct { + //std::core::set_display_name(fields[0], "version"); + set_field_names_FileMetadata(fields); +}; + +fn extract_column_offset_list(ref ThriftStruct file_metadata_struct) { + // Get index for row_groups id 4 + auto idx_row_groups = idx_field_by_id(file_metadata_struct, 4); + //std::print("idx_row_groups: {}", idx_row_groups); + + // For each RowGroup in row_groups + auto n_row_groups = std::core::member_count( + file_metadata_struct + .fields[idx_row_groups].value.value); + //std::print("n_row_groups: {}", n_row_groups); + for (u32 i = 0, i < n_row_groups, i += 1) { + // Get index for columns id 1 + auto idx_columns = idx_field_by_id( + file_metadata_struct + .fields[idx_row_groups].value.value[i], + 1); + //std::print("idx_columns: {}", idx_columns); + + // For each ColumnChunk in columns + auto n_columns = std::core::member_count( + file_metadata_struct + .fields[idx_row_groups].value.value[i] + .fields[idx_columns].value.value); + //std::print("n_columns: {}", n_columns); + for (u32 j = 0, j < n_columns, j += 1) { + + // Get index for meta_data id 3 + auto idx_meta_data = idx_field_by_id( + file_metadata_struct + .fields[idx_row_groups].value.value[i] + .fields[idx_columns].value.value[j], + 3); + //std::print("idx_meta_data: {}", idx_meta_data); + + // For ColumnMetadata in meta_data + // First PageHeader is at: + // dictionary_page_offset if present + // else data_page_offset + + try { + // Get index for dictionary_page_offset id 11 + auto idx_dictionary_page_offset = idx_field_by_id( + file_metadata_struct + .fields[idx_row_groups].value.value[i] + .fields[idx_columns].value.value[j] + .fields[idx_meta_data].value, + 11); + + auto dictionary_page_offset = + file_metadata_struct + .fields[idx_row_groups].value.value[i] + .fields[idx_columns].value.value[j] + .fields[idx_meta_data].value + .fields[idx_dictionary_page_offset].value.value; + + push_column_offset(dictionary_page_offset); + } catch { + + // Get index for data_page_offset id 9 + auto idx_data_page_offset = idx_field_by_id( + file_metadata_struct + .fields[idx_row_groups].value.value[i] + .fields[idx_columns].value.value[j] + .fields[idx_meta_data].value, + 9); + + auto data_page_offset = + file_metadata_struct + .fields[idx_row_groups].value.value[i] + .fields[idx_columns].value.value[j] + .fields[idx_meta_data].value + .fields[idx_data_page_offset].value.value; + + push_column_offset(data_page_offset); + } + } + } +}; + +/* +struct DataPageHeader { + 1: required i32 num_values + 2: required Encoding encoding + 3: required Encoding definition_level_encoding; + 4: required Encoding repetition_level_encoding; + 5: optional Statistics statistics; +} +*/ +fn set_field_names_DataPageHeader(ref auto fields) { + for (auto i = 0, i < std::core::member_count(fields), i += 1) { + if (fields[i].header.type_and_delta == 0) { + std::core::set_display_name(fields[i], "STOP"); + break; + } + match (fields[i].header.field_id) { + (1): std::core::set_display_name(fields[i], "num_values"); + (2): std::core::set_display_name(fields[i], "encoding"); + (3): std::core::set_display_name(fields[i], "definition_level_encoding"); + (4): std::core::set_display_name(fields[i], "repetition_level_encoding"); + (5): std::core::set_display_name(fields[i], "statistics"); + } + } +}; + +/* +struct PageHeader { + 1: required PageType type + 2: required i32 uncompressed_page_size + 3: required i32 compressed_page_size + 4: optional i32 crc + 5: optional DataPageHeader data_page_header; + 6: optional IndexPageHeader index_page_header; + 7: optional DictionaryPageHeader dictionary_page_header; + 8: optional DataPageHeaderV2 data_page_header_v2; +} +*/ +fn set_field_names_PageHeader(ref auto fields) { + for (auto i = 0, i < std::core::member_count(fields), i += 1) { + if (fields[i].header.type_and_delta == 0) { + std::core::set_display_name(fields[i], "STOP"); + break; + } + match (fields[i].header.field_id) { + (1): std::core::set_display_name(fields[i], "type"); + (2): std::core::set_display_name(fields[i], "uncompressed_page_size"); + (3): std::core::set_display_name(fields[i], "compressed_page_size"); + (4): std::core::set_display_name(fields[i], "crc"); + (5): std::core::set_display_name(fields[i], "data_page_header"); + (6): std::core::set_display_name(fields[i], "index_page_header"); + (7): std::core::set_display_name(fields[i], "dictionary_page_header"); + (8): std::core::set_display_name(fields[i], "data_page_header_v2"); + } + if (fields[i].header.field_id == 5) { + // std::print("{}", fields[i].value); + set_field_names_DataPageHeader(fields[i].value.fields); + } + } +}; + + +fn get_compressed_page_size(ref ThriftStruct page_header) { + auto idx = idx_field_by_id(page_header, 3); + return page_header.fields[idx].value.value; +}; +struct DataPage { + ThriftStruct page_header; + try { + auto compressed_page_size = get_compressed_page_size(page_header); + u8 page_data[compressed_page_size]; + } + set_field_names_PageHeader(page_header.fields); +}; + +struct ColumnChunk { + DataPage data_pages[while($ column_chunk @ column_offset_list_cur; +}; + +struct ParquetFile { + char header_magic[4]; + char footer_magic[4] @ sizeof($) - 4; + s32 footer_length @ sizeof($) - 8; + auto footer_begin = sizeof($) - 8 - footer_length; + + FileMetadata file_metadata_struct @ footer_begin; + + extract_column_offset_list(file_metadata_struct); + + ColumnChunkPlacer column_chunks[column_offset_list_size] @ 0x0; + + s16 last_field_id_stack_view[last_field_id_stack_size_max] @ 0x0 in last_field_id_stack; +}; +ParquetFile parquet_file @ 0x0; +//std::print("{}", parquet_file); diff --git a/tests/patterns/test_data/parquet.hexpat.parquet b/tests/patterns/test_data/parquet.hexpat.parquet new file mode 100644 index 00000000..993956ca Binary files /dev/null and b/tests/patterns/test_data/parquet.hexpat.parquet differ