Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 199 additions & 1 deletion arrow-row/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,15 @@ impl RowConverter {
columns.len()
)));
}
for colum in columns.iter().skip(1) {
if colum.len() != columns[0].len() {
return Err(ArrowError::InvalidArgumentError(format!(
"RowConverter columns must all have the same length, expected {} got {}",
columns[0].len(),
colum.len()
)));
}
}
Comment on lines +693 to +701
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another check added. This guards against passing ill-formed data from the user.
Without this, it's possible to get a panic inside the RowEncoder.


let encoders = columns
.iter()
Expand Down Expand Up @@ -758,7 +767,20 @@ impl RowConverter {
// SAFETY
// We have validated that the rows came from this [`RowConverter`]
// and therefore must be valid
unsafe { self.convert_raw(&mut rows, validate_utf8) }
let result = unsafe { self.convert_raw(&mut rows, validate_utf8) }?;

if cfg!(test) {
for (i, row) in rows.iter().enumerate() {
if !row.is_empty() {
return Err(ArrowError::InvalidArgumentError(format!(
"Codecs {codecs:?} did not consume all bytes for row {i}, remaining bytes: {row:?}",
codecs = &self.codecs
)));
}
}
}

Ok(result)
}

/// Returns an empty [`Rows`] with capacity for `row_capacity` rows with
Expand Down Expand Up @@ -2549,6 +2571,182 @@ mod tests {
assert_eq!(&back[0], &list);
}

#[test]
fn test_two_fixed_size_lists() {
let mut first = FixedSizeListBuilder::new(UInt8Builder::new(), 1);
// 0: [100]
first.values().append_value(100);
first.append(true);
// 1: [101]
first.values().append_value(101);
first.append(true);
// 2: [102]
first.values().append_value(102);
first.append(true);
// 3: [null]
first.values().append_null();
first.append(true);
// 4: null
first.values().append_null(); // MASKED
first.append(false);
let first = Arc::new(first.finish()) as ArrayRef;
let first_type = first.data_type().clone();

let mut second = FixedSizeListBuilder::new(UInt8Builder::new(), 1);
// 0: [200]
second.values().append_value(200);
second.append(true);
// 1: [201]
second.values().append_value(201);
second.append(true);
// 2: [202]
second.values().append_value(202);
second.append(true);
// 3: [null]
second.values().append_null();
second.append(true);
// 4: null
second.values().append_null(); // MASKED
second.append(false);
let second = Arc::new(second.finish()) as ArrayRef;
let second_type = second.data_type().clone();

let converter = RowConverter::new(vec![
SortField::new(first_type.clone()),
SortField::new(second_type.clone()),
])
.unwrap();

let rows = converter
.convert_columns(&[Arc::clone(&first), Arc::clone(&second)])
.unwrap();

let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 2);
back[0].to_data().validate_full().unwrap();
assert_eq!(&back[0], &first);
back[1].to_data().validate_full().unwrap();
assert_eq!(&back[1], &second);
}

#[test]
fn test_fixed_size_list_with_variable_width_content() {
let mut first = FixedSizeListBuilder::new(
StructBuilder::from_fields(
vec![
Field::new(
"timestamp",
DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::from("UTC"))),
false,
),
Field::new("offset_minutes", DataType::Int16, false),
Field::new("time_zone", DataType::Utf8, false),
],
1,
),
1,
);
// 0: null
first
.values()
.field_builder::<TimestampMicrosecondBuilder>(0)
.unwrap()
.append_null();
first
.values()
.field_builder::<Int16Builder>(1)
.unwrap()
.append_null();
first
.values()
.field_builder::<StringBuilder>(2)
.unwrap()
.append_null();
first.values().append(false);
first.append(false);
// 1: [null]
first
.values()
.field_builder::<TimestampMicrosecondBuilder>(0)
.unwrap()
.append_null();
first
.values()
.field_builder::<Int16Builder>(1)
.unwrap()
.append_null();
first
.values()
.field_builder::<StringBuilder>(2)
.unwrap()
.append_null();
first.values().append(false);
first.append(true);
// 2: [1970-01-01 00:00:00.000000 UTC]
first
.values()
.field_builder::<TimestampMicrosecondBuilder>(0)
.unwrap()
.append_value(0);
first
.values()
.field_builder::<Int16Builder>(1)
.unwrap()
.append_value(0);
first
.values()
.field_builder::<StringBuilder>(2)
.unwrap()
.append_value("UTC");
first.values().append(true);
first.append(true);
// 3: [2005-09-10 13:30:00.123456 Europe/Warsaw]
first
.values()
.field_builder::<TimestampMicrosecondBuilder>(0)
.unwrap()
.append_value(1126351800123456);
first
.values()
.field_builder::<Int16Builder>(1)
.unwrap()
.append_value(120);
first
.values()
.field_builder::<StringBuilder>(2)
.unwrap()
.append_value("Europe/Warsaw");
first.values().append(true);
first.append(true);
let first = Arc::new(first.finish()) as ArrayRef;
let first_type = first.data_type().clone();

let mut second = StringBuilder::new();
second.append_value("somewhere near");
second.append_null();
second.append_value("Greenwich");
second.append_value("Warsaw");
let second = Arc::new(second.finish()) as ArrayRef;
let second_type = second.data_type().clone();

let converter = RowConverter::new(vec![
SortField::new(first_type.clone()),
SortField::new(second_type.clone()),
])
.unwrap();

let rows = converter
.convert_columns(&[Arc::clone(&first), Arc::clone(&second)])
.unwrap();

let back = converter.convert_rows(&rows).unwrap();
assert_eq!(back.len(), 2);
back[0].to_data().validate_full().unwrap();
assert_eq!(&back[0], &first);
back[1].to_data().validate_full().unwrap();
assert_eq!(&back[1], &second);
}

fn generate_primitive_array<K>(len: usize, valid_percent: f64) -> PrimitiveArray<K>
where
K: ArrowPrimitiveType,
Expand Down
10 changes: 3 additions & 7 deletions arrow-row/src/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,20 +225,15 @@ pub fn encode_fixed_size_list(
data[*offset] = 0x01;
*offset += 1;
for child_idx in (idx * value_length)..(idx + 1) * value_length {
//dbg!(child_idx);
let row = rows.row(child_idx);
let end_offset = *offset + row.as_ref().len();
data[*offset..end_offset].copy_from_slice(row.as_ref());
*offset = end_offset;
}
}
false => {
let null_sentinels = 1;
//+ value_length; // 1 for self + for values too
for i in 0..null_sentinels {
data[*offset + i] = null_sentinel;
}
*offset += null_sentinels;
data[*offset] = null_sentinel;
*offset += 1;
}
};
})
Expand Down Expand Up @@ -292,6 +287,7 @@ pub unsafe fn decode_fixed_size_list(
row_offset = next_offset;
}
}
*row = &row[row_offset..]; // Update row for the next decoder
}

let children = converter.convert_raw(&mut child_rows, validate_utf8)?;
Expand Down
Loading