Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 54 additions & 11 deletions src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ use itertools::Itertools;
use parquet::{
arrow::ArrowWriter,
basic::Encoding,
file::{FOOTER_SIZE, properties::WriterProperties},
file::{
FOOTER_SIZE, properties::WriterProperties, reader::FileReader,
serialized_reader::SerializedFileReader,
},
format::SortingColumn,
schema::types::ColumnPath,
};
Expand Down Expand Up @@ -409,7 +412,7 @@ impl Stream {
.map(|file| file.path())
.filter(|file| {
file.extension().is_some_and(|ext| ext.eq("parquet"))
&& std::fs::metadata(file).is_ok_and(|meta| meta.len() > FOOTER_SIZE as u64)
&& Self::is_valid_parquet_file(file, &self.stream_name)
})
.collect()
}
Expand Down Expand Up @@ -649,7 +652,7 @@ impl Stream {
continue;
}

if let Err(e) = self.finalize_parquet_file(&part_path, &parquet_path) {
if let Err(e) = std::fs::rename(&part_path, &parquet_path) {
error!("Couldn't rename part file: {part_path:?} -> {parquet_path:?}, error = {e}");
} else {
self.cleanup_arrow_files_and_dir(&arrow_files);
Expand Down Expand Up @@ -682,12 +685,10 @@ impl Stream {
}
writer.close()?;

if part_file.metadata().expect("File was just created").len()
< parquet::file::FOOTER_SIZE as u64
{
if !Self::is_valid_parquet_file(part_path, &self.stream_name) {
error!(
"Invalid parquet file {part_path:?} detected for stream {}, removing it",
&self.stream_name
"Invalid parquet file {part_path:?} detected for stream {stream_name}, removing it",
stream_name = &self.stream_name
);
remove_file(part_path).expect("File should be removable if it is invalid");
return Ok(false);
Expand All @@ -696,8 +697,47 @@ impl Stream {
Ok(true)
}

fn finalize_parquet_file(&self, part_path: &Path, parquet_path: &Path) -> std::io::Result<()> {
std::fs::rename(part_path, parquet_path)
/// function to validate parquet files
fn is_valid_parquet_file(path: &Path, stream_name: &str) -> bool {
// First check file size as a quick validation
match path.metadata() {
Ok(meta) if meta.len() < FOOTER_SIZE as u64 => {
error!(
"Invalid parquet file {path:?} detected for stream {stream_name}, size: {} bytes",
meta.len()
);
return false;
}
Err(e) => {
error!(
"Cannot read metadata for parquet file {path:?} for stream {stream_name}: {e}"
);
return false;
}
_ => {} // File size is adequate, continue validation
}

// Try to open and read the parquet file metadata to verify it's valid
match std::fs::File::open(path) {
Ok(file) => match SerializedFileReader::new(file) {
Ok(reader) => {
if reader.metadata().file_metadata().num_rows() == 0 {
error!("Invalid parquet file {path:?} for stream {stream_name}");
false
} else {
true
}
}
Err(e) => {
error!("Failed to read parquet file {path:?} for stream {stream_name}: {e}");
false
}
},
Err(e) => {
error!("Failed to open parquet file {path:?} for stream {stream_name}: {e}");
false
}
}
}

fn cleanup_arrow_files_and_dir(&self, arrow_files: &[PathBuf]) {
Expand Down Expand Up @@ -951,7 +991,10 @@ impl Stream {
shutdown_signal: bool,
) -> Result<(), StagingError> {
let start_flush = Instant::now();
self.flush(shutdown_signal);
// Force flush for init or shutdown signals to convert all .part files to .arrows
// For regular cycles, use false to only flush non-current writers
let forced = init_signal || shutdown_signal;
self.flush(forced);
trace!(
"Flushing stream ({}) took: {}s",
self.stream_name,
Expand Down
76 changes: 71 additions & 5 deletions src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ async fn upload_single_parquet_file(
.to_str()
.expect("filename is valid string");

// Get the local file size for validation
let local_file_size = path
.metadata()
.map_err(|e| ObjectStorageError::Custom(format!("Failed to get local file metadata: {e}")))?
.len();

// Upload the file
store
.upload_multipart(&RelativePathBuf::from(&stream_relative_path), &path)
Expand All @@ -115,6 +121,27 @@ async fn upload_single_parquet_file(
ObjectStorageError::Custom(format!("Failed to upload {filename}: {e}"))
})?;

// Validate the uploaded file size matches local file
let upload_is_valid = validate_uploaded_parquet_file(
&store,
&stream_relative_path,
local_file_size,
&stream_name,
)
.await?;

if !upload_is_valid {
// Upload validation failed, clean up the uploaded file and return error
let _ = store
.delete_object(&RelativePathBuf::from(&stream_relative_path))
.await;
error!("Upload size validation failed for file {filename:?}, deleted from object storage");
return Ok(UploadResult {
file_path: path,
manifest_file: None, // Preserve staging file for retry; no manifest created
});
}

// Update storage metrics
update_storage_metrics(&path, &stream_name, filename)?;

Expand Down Expand Up @@ -177,6 +204,44 @@ async fn calculate_stats_if_enabled(
}
}

/// Validates that a parquet file uploaded to object storage matches the staging file size
async fn validate_uploaded_parquet_file(
store: &Arc<dyn ObjectStorage>,
stream_relative_path: &str,
expected_size: u64,
stream_name: &str,
) -> Result<bool, ObjectStorageError> {
// Verify the file exists and has the expected size
match store
.head(&RelativePathBuf::from(stream_relative_path))
.await
{
Ok(metadata) => {
if metadata.size as u64 != expected_size {
warn!(
"Uploaded file size mismatch for stream {}: expected {}, got {}",
stream_name, expected_size, metadata.size
);
Ok(false)
} else {
tracing::trace!(
"Uploaded parquet file size validated successfully for stream {}, size: {}",
stream_name,
expected_size
);
Ok(true)
}
}
Err(e) => {
error!(
"Failed to get metadata for uploaded file in stream {}: {e}",
stream_name
);
Ok(false)
}
}
}

pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug + Send + Sync {
fn get_datafusion_runtime(&self) -> RuntimeEnvBuilder;
fn construct_client(&self) -> Arc<dyn ObjectStorage>;
Expand Down Expand Up @@ -880,14 +945,15 @@ async fn collect_upload_results(
if let Some(manifest_file) = upload_result.manifest_file {
uploaded_files.push((upload_result.file_path, manifest_file));
} else {
// File failed to upload, clean up
if let Err(e) = remove_file(upload_result.file_path) {
warn!("Failed to remove staged file: {e}");
}
// File failed in upload size validation, preserve staging file for retry
error!(
"Parquet file upload size validation failed for {:?}, preserving in staging for retry",
upload_result.file_path
);
}
}
Ok(Err(e)) => {
error!("Error processing parquet file: {e}");
error!("Error uploading parquet file: {e}");
return Err(e);
}
Err(e) => {
Expand Down
Loading