diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 9e99dc4ca..b84eebc00 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -34,7 +34,10 @@ use itertools::Itertools; use parquet::{ arrow::ArrowWriter, basic::Encoding, - file::{FOOTER_SIZE, properties::WriterProperties}, + file::{ + FOOTER_SIZE, properties::WriterProperties, reader::FileReader, + serialized_reader::SerializedFileReader, + }, format::SortingColumn, schema::types::ColumnPath, }; @@ -409,7 +412,7 @@ impl Stream { .map(|file| file.path()) .filter(|file| { file.extension().is_some_and(|ext| ext.eq("parquet")) - && std::fs::metadata(file).is_ok_and(|meta| meta.len() > FOOTER_SIZE as u64) + && Self::is_valid_parquet_file(file, &self.stream_name) }) .collect() } @@ -649,7 +652,7 @@ impl Stream { continue; } - if let Err(e) = self.finalize_parquet_file(&part_path, &parquet_path) { + if let Err(e) = std::fs::rename(&part_path, &parquet_path) { error!("Couldn't rename part file: {part_path:?} -> {parquet_path:?}, error = {e}"); } else { self.cleanup_arrow_files_and_dir(&arrow_files); @@ -682,12 +685,10 @@ impl Stream { } writer.close()?; - if part_file.metadata().expect("File was just created").len() - < parquet::file::FOOTER_SIZE as u64 - { + if !Self::is_valid_parquet_file(part_path, &self.stream_name) { error!( - "Invalid parquet file {part_path:?} detected for stream {}, removing it", - &self.stream_name + "Invalid parquet file {part_path:?} detected for stream {stream_name}, removing it", + stream_name = &self.stream_name ); remove_file(part_path).expect("File should be removable if it is invalid"); return Ok(false); @@ -696,8 +697,47 @@ impl Stream { Ok(true) } - fn finalize_parquet_file(&self, part_path: &Path, parquet_path: &Path) -> std::io::Result<()> { - std::fs::rename(part_path, parquet_path) + /// function to validate parquet files + fn is_valid_parquet_file(path: &Path, stream_name: &str) -> bool { + // First check file size as a quick validation + match path.metadata() { + Ok(meta) if meta.len() < FOOTER_SIZE as u64 => { + error!( + "Invalid parquet file {path:?} detected for stream {stream_name}, size: {} bytes", + meta.len() + ); + return false; + } + Err(e) => { + error!( + "Cannot read metadata for parquet file {path:?} for stream {stream_name}: {e}" + ); + return false; + } + _ => {} // File size is adequate, continue validation + } + + // Try to open and read the parquet file metadata to verify it's valid + match std::fs::File::open(path) { + Ok(file) => match SerializedFileReader::new(file) { + Ok(reader) => { + if reader.metadata().file_metadata().num_rows() == 0 { + error!("Invalid parquet file {path:?} for stream {stream_name}"); + false + } else { + true + } + } + Err(e) => { + error!("Failed to read parquet file {path:?} for stream {stream_name}: {e}"); + false + } + }, + Err(e) => { + error!("Failed to open parquet file {path:?} for stream {stream_name}: {e}"); + false + } + } } fn cleanup_arrow_files_and_dir(&self, arrow_files: &[PathBuf]) { @@ -951,7 +991,10 @@ impl Stream { shutdown_signal: bool, ) -> Result<(), StagingError> { let start_flush = Instant::now(); - self.flush(shutdown_signal); + // Force flush for init or shutdown signals to convert all .part files to .arrows + // For regular cycles, use false to only flush non-current writers + let forced = init_signal || shutdown_signal; + self.flush(forced); trace!( "Flushing stream ({}) took: {}s", self.stream_name, diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 6b64c7e1d..41639d025 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -106,6 +106,12 @@ async fn upload_single_parquet_file( .to_str() .expect("filename is valid string"); + // Get the local file size for validation + let local_file_size = path + .metadata() + .map_err(|e| ObjectStorageError::Custom(format!("Failed to get local file metadata: {e}")))? + .len(); + // Upload the file store .upload_multipart(&RelativePathBuf::from(&stream_relative_path), &path) @@ -115,6 +121,27 @@ async fn upload_single_parquet_file( ObjectStorageError::Custom(format!("Failed to upload {filename}: {e}")) })?; + // Validate the uploaded file size matches local file + let upload_is_valid = validate_uploaded_parquet_file( + &store, + &stream_relative_path, + local_file_size, + &stream_name, + ) + .await?; + + if !upload_is_valid { + // Upload validation failed, clean up the uploaded file and return error + let _ = store + .delete_object(&RelativePathBuf::from(&stream_relative_path)) + .await; + error!("Upload size validation failed for file {filename:?}, deleted from object storage"); + return Ok(UploadResult { + file_path: path, + manifest_file: None, // Preserve staging file for retry; no manifest created + }); + } + // Update storage metrics update_storage_metrics(&path, &stream_name, filename)?; @@ -177,6 +204,44 @@ async fn calculate_stats_if_enabled( } } +/// Validates that a parquet file uploaded to object storage matches the staging file size +async fn validate_uploaded_parquet_file( + store: &Arc, + stream_relative_path: &str, + expected_size: u64, + stream_name: &str, +) -> Result { + // Verify the file exists and has the expected size + match store + .head(&RelativePathBuf::from(stream_relative_path)) + .await + { + Ok(metadata) => { + if metadata.size as u64 != expected_size { + warn!( + "Uploaded file size mismatch for stream {}: expected {}, got {}", + stream_name, expected_size, metadata.size + ); + Ok(false) + } else { + tracing::trace!( + "Uploaded parquet file size validated successfully for stream {}, size: {}", + stream_name, + expected_size + ); + Ok(true) + } + } + Err(e) => { + error!( + "Failed to get metadata for uploaded file in stream {}: {e}", + stream_name + ); + Ok(false) + } + } +} + pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug + Send + Sync { fn get_datafusion_runtime(&self) -> RuntimeEnvBuilder; fn construct_client(&self) -> Arc; @@ -880,14 +945,15 @@ async fn collect_upload_results( if let Some(manifest_file) = upload_result.manifest_file { uploaded_files.push((upload_result.file_path, manifest_file)); } else { - // File failed to upload, clean up - if let Err(e) = remove_file(upload_result.file_path) { - warn!("Failed to remove staged file: {e}"); - } + // File failed in upload size validation, preserve staging file for retry + error!( + "Parquet file upload size validation failed for {:?}, preserving in staging for retry", + upload_result.file_path + ); } } Ok(Err(e)) => { - error!("Error processing parquet file: {e}"); + error!("Error uploading parquet file: {e}"); return Err(e); } Err(e) => {