From 7d1c285bb957601ffa4bb94439eac1377173b7b2 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sun, 14 Sep 2025 05:09:31 -0700 Subject: [PATCH 1/3] chore: validate parquet before and after upload to object store after parquet file creation - 1. validate if file size > FOOTER_SIZE 2. read the file and validate if num_rows > 0 after parquet file upload to object store - 1. perform a head() to get the metadata of the file validate if file size = file size of the parquet from staging directory Fixes: #1430 --- src/parseable/streams.rs | 62 +++++++++++++++++++++++----- src/storage/object_storage.rs | 76 ++++++++++++++++++++++++++++++++--- 2 files changed, 123 insertions(+), 15 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 9e99dc4ca..b9a62e077 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -34,7 +34,10 @@ use itertools::Itertools; use parquet::{ arrow::ArrowWriter, basic::Encoding, - file::{FOOTER_SIZE, properties::WriterProperties}, + file::{ + FOOTER_SIZE, properties::WriterProperties, reader::FileReader, + serialized_reader::SerializedFileReader, + }, format::SortingColumn, schema::types::ColumnPath, }; @@ -409,7 +412,7 @@ impl Stream { .map(|file| file.path()) .filter(|file| { file.extension().is_some_and(|ext| ext.eq("parquet")) - && std::fs::metadata(file).is_ok_and(|meta| meta.len() > FOOTER_SIZE as u64) + && Self::is_valid_parquet_file(file, &self.stream_name) }) .collect() } @@ -649,7 +652,7 @@ impl Stream { continue; } - if let Err(e) = self.finalize_parquet_file(&part_path, &parquet_path) { + if let Err(e) = std::fs::rename(&part_path, &parquet_path) { error!("Couldn't rename part file: {part_path:?} -> {parquet_path:?}, error = {e}"); } else { self.cleanup_arrow_files_and_dir(&arrow_files); @@ -682,12 +685,10 @@ impl Stream { } writer.close()?; - if part_file.metadata().expect("File was just created").len() - < parquet::file::FOOTER_SIZE as u64 - { + if !Self::is_valid_parquet_file(part_path, &self.stream_name) { error!( - "Invalid parquet file {part_path:?} detected for stream {}, removing it", - &self.stream_name + "Invalid parquet file {part_path:?} detected for stream {stream_name}, removing it", + stream_name = &self.stream_name ); remove_file(part_path).expect("File should be removable if it is invalid"); return Ok(false); @@ -696,8 +697,49 @@ impl Stream { Ok(true) } - fn finalize_parquet_file(&self, part_path: &Path, parquet_path: &Path) -> std::io::Result<()> { - std::fs::rename(part_path, parquet_path) + /// function to validate parquet files + fn is_valid_parquet_file(path: &Path, stream_name: &str) -> bool { + // First check file size as a quick validation + match path.metadata() { + Ok(meta) if meta.len() < FOOTER_SIZE as u64 => { + error!( + "Invalid parquet file {path:?} detected for stream {stream_name}, size: {} bytes", + meta.len() + ); + return false; + } + Err(e) => { + error!( + "Cannot read metadata for parquet file {path:?} for stream {stream_name}: {e}" + ); + return false; + } + _ => {} // File size is adequate, continue validation + } + + // Try to open and read the parquet file metadata to verify it's valid + match std::fs::File::open(path) { + Ok(file) => match SerializedFileReader::new(file) { + Ok(reader) => { + if reader.metadata().file_metadata().num_rows() < 0 { + error!( + "Invalid row count in parquet file {path:?} for stream {stream_name}" + ); + false + } else { + true + } + } + Err(e) => { + error!("Failed to read parquet file {path:?} for stream {stream_name}: {e}"); + false + } + }, + Err(e) => { + error!("Failed to open parquet file {path:?} for stream {stream_name}: {e}"); + false + } + } } fn cleanup_arrow_files_and_dir(&self, arrow_files: &[PathBuf]) { diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 6b64c7e1d..df8c2a569 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -106,6 +106,12 @@ async fn upload_single_parquet_file( .to_str() .expect("filename is valid string"); + // Get the local file size for validation + let local_file_size = path + .metadata() + .map_err(|e| ObjectStorageError::Custom(format!("Failed to get local file metadata: {e}")))? + .len(); + // Upload the file store .upload_multipart(&RelativePathBuf::from(&stream_relative_path), &path) @@ -115,6 +121,27 @@ async fn upload_single_parquet_file( ObjectStorageError::Custom(format!("Failed to upload {filename}: {e}")) })?; + // Validate the uploaded file size matches local file + let upload_is_valid = validate_uploaded_parquet_file( + &store, + &stream_relative_path, + local_file_size, + &stream_name, + ) + .await?; + + if !upload_is_valid { + // Upload validation failed, clean up the uploaded file and return error + let _ = store + .delete_object(&RelativePathBuf::from(&stream_relative_path)) + .await; + error!("Upload size validation failed for file {filename:?}, deleted from object storage"); + return Ok(UploadResult { + file_path: path, + manifest_file: None, // This will trigger local file cleanup + }); + } + // Update storage metrics update_storage_metrics(&path, &stream_name, filename)?; @@ -177,6 +204,44 @@ async fn calculate_stats_if_enabled( } } +/// Validates that a parquet file uploaded to object storage matches the staging file size +async fn validate_uploaded_parquet_file( + store: &Arc, + stream_relative_path: &str, + expected_size: u64, + stream_name: &str, +) -> Result { + // Verify the file exists and has the expected size + match store + .head(&RelativePathBuf::from(stream_relative_path)) + .await + { + Ok(metadata) => { + if metadata.size as u64 != expected_size { + warn!( + "Uploaded file size mismatch for stream {}: expected {}, got {}", + stream_name, expected_size, metadata.size + ); + Ok(false) + } else { + tracing::trace!( + "Uploaded parquet file size validated successfully for stream {}, size: {}", + stream_name, + expected_size + ); + Ok(true) + } + } + Err(e) => { + error!( + "Failed to get metadata for uploaded file in stream {}: {e}", + stream_name + ); + Ok(false) + } + } +} + pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug + Send + Sync { fn get_datafusion_runtime(&self) -> RuntimeEnvBuilder; fn construct_client(&self) -> Arc; @@ -880,14 +945,15 @@ async fn collect_upload_results( if let Some(manifest_file) = upload_result.manifest_file { uploaded_files.push((upload_result.file_path, manifest_file)); } else { - // File failed to upload, clean up - if let Err(e) = remove_file(upload_result.file_path) { - warn!("Failed to remove staged file: {e}"); - } + // File failed in upload size validation, preserve staging file for retry + error!( + "Parquet file upload size validation failed for {:?}, preserving in staging for retry", + upload_result.file_path + ); } } Ok(Err(e)) => { - error!("Error processing parquet file: {e}"); + error!("Error uploading parquet file: {e}"); return Err(e); } Err(e) => { From dd67ac25833fca69166adbee9ba353e2108c1e67 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sun, 14 Sep 2025 08:20:47 -0700 Subject: [PATCH 2/3] update row count check, update comments for no manifest --- src/parseable/streams.rs | 6 ++---- src/storage/object_storage.rs | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index b9a62e077..79ca1868f 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -721,10 +721,8 @@ impl Stream { match std::fs::File::open(path) { Ok(file) => match SerializedFileReader::new(file) { Ok(reader) => { - if reader.metadata().file_metadata().num_rows() < 0 { - error!( - "Invalid row count in parquet file {path:?} for stream {stream_name}" - ); + if reader.metadata().file_metadata().num_rows() == 0 { + error!("Invalid parquet file {path:?} for stream {stream_name}"); false } else { true diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index df8c2a569..41639d025 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -138,7 +138,7 @@ async fn upload_single_parquet_file( error!("Upload size validation failed for file {filename:?}, deleted from object storage"); return Ok(UploadResult { file_path: path, - manifest_file: None, // This will trigger local file cleanup + manifest_file: None, // Preserve staging file for retry; no manifest created }); } From 55eb8d8db9602364a9b0160e6a816deb8cac65d0 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sun, 14 Sep 2025 08:43:30 -0700 Subject: [PATCH 3/3] fix server start sync for flush --- src/parseable/streams.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 79ca1868f..b84eebc00 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -991,7 +991,10 @@ impl Stream { shutdown_signal: bool, ) -> Result<(), StagingError> { let start_flush = Instant::now(); - self.flush(shutdown_signal); + // Force flush for init or shutdown signals to convert all .part files to .arrows + // For regular cycles, use false to only flush non-current writers + let forced = init_signal || shutdown_signal; + self.flush(forced); trace!( "Flushing stream ({}) took: {}s", self.stream_name,