Skip to content

Commit da97dd1

Browse files
committed
[parquet] Add an option to disable internal buffer for the sink
1 parent 2d900a4 commit da97dd1

File tree

2 files changed

+80
-15
lines changed

2 files changed

+80
-15
lines changed

parquet/src/file/properties.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ pub const DEFAULT_STATISTICS_TRUNCATE_LENGTH: Option<usize> = Some(64);
6262
pub const DEFAULT_OFFSET_INDEX_DISABLED: bool = false;
6363
/// Default values for [`WriterProperties::coerce_types`]
6464
pub const DEFAULT_COERCE_TYPES: bool = false;
65+
/// Default values for [`WriterProperties::internal_buffer_enabled`]
66+
pub const DEFAULT_INTERNAL_BUFFER_ENABLE: bool = true;
6567

6668
/// Parquet writer version.
6769
///
@@ -169,6 +171,7 @@ pub struct WriterProperties {
169171
column_index_truncate_length: Option<usize>,
170172
statistics_truncate_length: Option<usize>,
171173
coerce_types: bool,
174+
internal_buffer_enabled: bool,
172175
#[cfg(feature = "encryption")]
173176
pub(crate) file_encryption_properties: Option<FileEncryptionProperties>,
174177
}
@@ -335,6 +338,13 @@ impl WriterProperties {
335338
self.coerce_types
336339
}
337340

341+
/// Returns `true` if internal buffer is enabled.
342+
///
343+
/// For more details see [`WriterPropertiesBuilder::set_internal_buffer_enabled`]
344+
pub fn internal_buffer_enabled(&self) -> bool {
345+
self.internal_buffer_enabled
346+
}
347+
338348
/// Returns encoding for a data page, when dictionary encoding is enabled.
339349
///
340350
/// This is not configurable.
@@ -458,6 +468,7 @@ pub struct WriterPropertiesBuilder {
458468
column_index_truncate_length: Option<usize>,
459469
statistics_truncate_length: Option<usize>,
460470
coerce_types: bool,
471+
internal_buffer_enabled: bool,
461472
#[cfg(feature = "encryption")]
462473
file_encryption_properties: Option<FileEncryptionProperties>,
463474
}
@@ -481,6 +492,7 @@ impl Default for WriterPropertiesBuilder {
481492
column_index_truncate_length: DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
482493
statistics_truncate_length: DEFAULT_STATISTICS_TRUNCATE_LENGTH,
483494
coerce_types: DEFAULT_COERCE_TYPES,
495+
internal_buffer_enabled: DEFAULT_INTERNAL_BUFFER_ENABLE,
484496
#[cfg(feature = "encryption")]
485497
file_encryption_properties: None,
486498
}
@@ -506,6 +518,7 @@ impl WriterPropertiesBuilder {
506518
column_index_truncate_length: self.column_index_truncate_length,
507519
statistics_truncate_length: self.statistics_truncate_length,
508520
coerce_types: self.coerce_types,
521+
internal_buffer_enabled: self.internal_buffer_enabled,
509522
#[cfg(feature = "encryption")]
510523
file_encryption_properties: self.file_encryption_properties,
511524
}
@@ -700,6 +713,21 @@ impl WriterPropertiesBuilder {
700713
self
701714
}
702715

716+
/// Enables an internal buffer to improve small write operations.
717+
///
718+
/// If `true` (default), small writes will be collected into an internal
719+
/// buffer before calling the underlying `Write::write` method. This is
720+
/// essential for maintaining good performance when the underlying writer
721+
/// is an I/O-sensitive sink (like a network socket or raw file handle).
722+
///
723+
/// Buffering is often redundant and can be slightly less performant when
724+
/// the underlying writer is already buffered (e.g., writing to a `Vec<u8>`
725+
/// or a `std::io::BufWriter`). In such cases, this option can be set to `false`.
726+
pub fn set_internal_buffer_enabled(mut self, internal_buffer_enabled: bool) -> Self {
727+
self.internal_buffer_enabled = internal_buffer_enabled;
728+
self
729+
}
730+
703731
/// Sets FileEncryptionProperties (defaults to `None`)
704732
#[cfg(feature = "encryption")]
705733
pub fn with_file_encryption_properties(
@@ -958,6 +986,7 @@ impl From<WriterProperties> for WriterPropertiesBuilder {
958986
sorting_columns: props.sorting_columns,
959987
column_index_truncate_length: props.column_index_truncate_length,
960988
statistics_truncate_length: props.statistics_truncate_length,
989+
internal_buffer_enabled: props.internal_buffer_enabled,
961990
coerce_types: props.coerce_types,
962991
#[cfg(feature = "encryption")]
963992
file_encryption_properties: props.file_encryption_properties,

parquet/src/file/writer.rs

Lines changed: 51 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -45,20 +45,35 @@ use crate::file::PARQUET_MAGIC_ENCR_FOOTER;
4545
use crate::file::{metadata::*, PARQUET_MAGIC};
4646
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, TypePtr};
4747

48+
/// WriterMode for the [`TrackedWrite`]
49+
enum WriterMode<W: Write> {
50+
Buffered(BufWriter<W>),
51+
Unbuffered(W),
52+
}
53+
4854
/// A wrapper around a [`Write`] that keeps track of the number
49-
/// of bytes that have been written. The given [`Write`] is wrapped
50-
/// with a [`BufWriter`] to optimize writing performance.
55+
/// of bytes that have been written. In buffered mode the given
56+
/// [`Write`] is wrapped with a [`BufWriter`] to optimize writing
57+
/// performance for I/O-sensitive sinks.
5158
pub struct TrackedWrite<W: Write> {
52-
inner: BufWriter<W>,
59+
inner: WriterMode<W>,
5360
bytes_written: usize,
5461
}
5562

5663
impl<W: Write> TrackedWrite<W> {
57-
/// Create a new [`TrackedWrite`] from a [`Write`]
64+
/// Create a new [`TrackedWrite`] from a [`Write`] with buffer
5865
pub fn new(inner: W) -> Self {
5966
let buf_write = BufWriter::new(inner);
6067
Self {
61-
inner: buf_write,
68+
inner: WriterMode::Buffered(buf_write),
69+
bytes_written: 0,
70+
}
71+
}
72+
73+
/// Create a new unbuffered [`TrackedWrite`] from a [`Write`]
74+
pub fn new_unbuffered(inner: W) -> Self {
75+
Self {
76+
inner: WriterMode::Unbuffered(inner),
6277
bytes_written: 0,
6378
}
6479
}
@@ -70,47 +85,64 @@ impl<W: Write> TrackedWrite<W> {
7085

7186
/// Returns a reference to the underlying writer.
7287
pub fn inner(&self) -> &W {
73-
self.inner.get_ref()
88+
match &self.inner {
89+
WriterMode::Buffered(w) => w.get_ref(),
90+
WriterMode::Unbuffered(w) => w,
91+
}
7492
}
7593

7694
/// Returns a mutable reference to the underlying writer.
7795
///
7896
/// It is inadvisable to directly write to the underlying writer, doing so
7997
/// will likely result in data corruption
8098
pub fn inner_mut(&mut self) -> &mut W {
81-
self.inner.get_mut()
99+
match &mut self.inner {
100+
WriterMode::Buffered(w) => w.get_mut(),
101+
WriterMode::Unbuffered(w) => w,
102+
}
82103
}
83104

84105
/// Returns the underlying writer.
85106
pub fn into_inner(self) -> Result<W> {
86-
self.inner.into_inner().map_err(|err| {
87-
ParquetError::General(format!("fail to get inner writer: {:?}", err.to_string()))
88-
})
107+
match self.inner {
108+
WriterMode::Buffered(w) => w.into_inner().map_err(|err| {
109+
ParquetError::General(format!("fail to get inner writer: {:?}", err.to_string()))
110+
}),
111+
WriterMode::Unbuffered(w) => Ok(w),
112+
}
113+
}
114+
115+
/// Returns the selected writer for write operations
116+
fn writer(&mut self) -> &mut dyn Write {
117+
match &mut self.inner {
118+
WriterMode::Buffered(w) => w,
119+
WriterMode::Unbuffered(w) => w,
120+
}
89121
}
90122
}
91123

92124
impl<W: Write> Write for TrackedWrite<W> {
93125
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
94-
let bytes = self.inner.write(buf)?;
126+
let bytes = self.writer().write(buf)?;
95127
self.bytes_written += bytes;
96128
Ok(bytes)
97129
}
98130

99131
fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> std::io::Result<usize> {
100-
let bytes = self.inner.write_vectored(bufs)?;
132+
let bytes = self.writer().write_vectored(bufs)?;
101133
self.bytes_written += bytes;
102134
Ok(bytes)
103135
}
104136

105137
fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
106-
self.inner.write_all(buf)?;
138+
self.writer().write_all(buf)?;
107139
self.bytes_written += buf.len();
108140

109141
Ok(())
110142
}
111143

112144
fn flush(&mut self) -> std::io::Result<()> {
113-
self.inner.flush()
145+
self.writer().flush()
114146
}
115147
}
116148

@@ -185,7 +217,11 @@ impl<W: Write> Debug for SerializedFileWriter<W> {
185217
impl<W: Write + Send> SerializedFileWriter<W> {
186218
/// Creates new file writer.
187219
pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result<Self> {
188-
let mut buf = TrackedWrite::new(buf);
220+
let mut buf = if properties.internal_buffer_enabled() {
221+
TrackedWrite::new(buf)
222+
} else {
223+
TrackedWrite::new_unbuffered(buf)
224+
};
189225

190226
let schema_descriptor = SchemaDescriptor::new(schema.clone());
191227

0 commit comments

Comments
 (0)