Skip to content

Commit 43926be

Browse files
sync thresholds in env, stats sync on shutdown, streaming response
1 parent 5ae6366 commit 43926be

File tree

6 files changed

+195
-107
lines changed

6 files changed

+195
-107
lines changed

src/cli.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,23 @@ pub struct Options {
406406
help = "Enable/Disable collecting dataset stats"
407407
)]
408408
pub collect_dataset_stats: bool,
409+
410+
// the duration during which local sync should be completed
411+
#[arg(
412+
long,
413+
env = "P_LOCAL_SYNC_THRESHOLD",
414+
default_value = "30",
415+
help = "Local sync threshold in seconds"
416+
)]
417+
pub local_sync_threshold: u64,
418+
// the duration during which object store sync should be completed
419+
#[arg(
420+
long,
421+
env = "P_OBJECT_STORE_SYNC_THRESHOLD",
422+
default_value = "60",
423+
help = "Object store sync threshold in seconds"
424+
)]
425+
pub object_store_sync_threshold: u64,
409426
}
410427

411428
#[derive(Parser, Debug)]

src/handlers/http/health_check.rs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,35 @@ pub async fn check_shutdown_middleware(
5656

5757
// This function is called when the server is shutting down
5858
pub async fn shutdown() {
59-
// Set the shutdown flag to true
60-
let mut shutdown_flag = SIGNAL_RECEIVED.lock().await;
61-
*shutdown_flag = true;
59+
// Set shutdown flag to true
60+
set_shutdown_flag().await;
6261

6362
//sleep for 5 secs to allow any ongoing requests to finish
6463
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
64+
65+
// Perform sync operations
66+
perform_sync_operations().await;
67+
68+
// If collect_dataset_stats is enabled, perform sync operations
69+
// This is to ensure that all stats data is synced before the server shuts down
70+
if PARSEABLE.options.collect_dataset_stats {
71+
perform_sync_operations().await;
72+
}
73+
}
74+
75+
async fn set_shutdown_flag() {
76+
let mut shutdown_flag = SIGNAL_RECEIVED.lock().await;
77+
*shutdown_flag = true;
78+
}
79+
80+
async fn perform_sync_operations() {
81+
// Perform local sync
82+
perform_local_sync().await;
83+
// Perform object store sync
84+
perform_object_store_sync().await;
85+
}
86+
87+
async fn perform_local_sync() {
6588
let mut local_sync_joinset = JoinSet::new();
6689

6790
// Sync staging
@@ -76,7 +99,9 @@ pub async fn shutdown() {
7699
Err(err) => error!("Failed to join async task: {err}"),
77100
}
78101
}
102+
}
79103

104+
async fn perform_object_store_sync() {
80105
// Sync object store
81106
let mut object_store_joinset = JoinSet::new();
82107
sync_all_streams(&mut object_store_joinset);

src/lib.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,6 @@ pub const OBJECT_STORE_DATA_GRANULARITY: u32 = LOCAL_SYNC_INTERVAL.as_secs() as
7272
/// Describes the duration at the end of which parquets are pushed into objectstore.
7373
pub const STORAGE_UPLOAD_INTERVAL: Duration = Duration::from_secs(30);
7474

75-
/// Describes the duration during which local sync should be completed
76-
pub const LOCAL_SYNC_THRESHOLD: Duration = Duration::from_secs(30); // 30 secs
77-
78-
/// Describes the duration during which object store sync should be completed
79-
pub const OBJECT_STORE_SYNC_THRESHOLD: Duration = Duration::from_secs(15); // 15 secs
80-
8175
// A single HTTP client for all outgoing HTTP requests from the parseable server
8276
pub static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| {
8377
ClientBuilder::new()

src/parseable/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,11 @@ impl Parseable {
362362
return Ok(true);
363363
}
364364

365+
// validate custom partition if provided
366+
if let Some(partition) = custom_partition {
367+
validate_custom_partition(partition)?;
368+
}
369+
365370
// For distributed deployments, if the stream not found in memory map,
366371
//check if it exists in the storage
367372
//create stream and schema from storage

src/storage/object_storage.rs

Lines changed: 128 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -830,16 +830,6 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
830830
let custom_partition = stream.get_custom_partition();
831831
let schema = stream.get_schema();
832832
for path in stream.parquet_files() {
833-
if stream.get_stream_type() != StreamType::Internal
834-
&& PARSEABLE.options.collect_dataset_stats
835-
{
836-
if let Err(err) = calculate_field_stats(stream_name, &path, &schema).await {
837-
warn!(
838-
"Error calculating field stats for stream {}: {}",
839-
stream_name, err
840-
);
841-
}
842-
}
843833
let filename = path
844834
.file_name()
845835
.expect("only parquet files are returned by iterator")
@@ -889,6 +879,18 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
889879
let manifest = catalog::create_from_parquet_file(absolute_path.clone(), &path)?;
890880
catalog::update_snapshot(store, stream_name, manifest).await?;
891881

882+
// If the stream is not internal and stats collection is enabled, calculate field stats
883+
// before removing the parquet file
884+
if stream.get_stream_type() != StreamType::Internal
885+
&& PARSEABLE.options.collect_dataset_stats
886+
{
887+
if let Err(err) = calculate_field_stats(stream_name, &path, &schema).await {
888+
warn!(
889+
"Error calculating field stats for stream {}: {}",
890+
stream_name, err
891+
);
892+
}
893+
}
892894
if let Err(e) = remove_file(path) {
893895
warn!("Failed to remove staged file: {e}");
894896
}
@@ -968,7 +970,13 @@ async fn calculate_field_stats(
968970
)
969971
.await?;
970972
let ctx = SessionContext::new_with_state(QUERY_SESSION_STATE.clone());
971-
let ctx_table_name = format!("{}_{}", stream_name, parquet_path.display());
973+
let parquet_file_name = parquet_path
974+
.file_name()
975+
.expect("only parquet files are returned by iterator")
976+
.to_str()
977+
.expect("filename is valid string");
978+
let parquet_file_name = str::replace(parquet_file_name, ".", "_");
979+
let ctx_table_name = format!("{}_{}", stream_name, parquet_file_name);
972980
ctx.register_parquet(
973981
&ctx_table_name,
974982
parquet_path.to_str().expect("valid path"),
@@ -1062,81 +1070,118 @@ async fn calculate_single_field_stats(
10621070
/// This is used for fetching record count for a field and distinct count.
10631071
async fn query_single_i64(ctx: &SessionContext, sql: &str) -> Option<i64> {
10641072
let df = ctx.sql(sql).await.ok()?;
1065-
let mut stream = df.execute_stream().await.ok()?;
1066-
let mut count = 0;
1067-
while let Some(batch_result) = stream.next().await {
1068-
let batch = batch_result.ok()?;
1069-
if batch.num_rows() == 0 {
1070-
return None;
1071-
}
1072-
let array = batch.column(0).as_any().downcast_ref::<Int64Array>()?;
1073-
count += array.value(0);
1073+
let batches = df.collect().await.ok()?;
1074+
let batch = batches.first()?;
1075+
if batch.num_rows() == 0 {
1076+
return None;
10741077
}
1075-
Some(count)
1078+
let array = batch.column(0).as_any().downcast_ref::<Int64Array>()?;
1079+
1080+
Some(array.value(0))
1081+
}
1082+
1083+
macro_rules! try_downcast {
1084+
($ty:ty, $arr:expr, $body:expr) => {
1085+
if let Some(arr) = $arr.as_any().downcast_ref::<$ty>() {
1086+
$body(arr)
1087+
} else {
1088+
warn!(
1089+
"Expected {} for {:?}, but found {:?}",
1090+
stringify!($ty),
1091+
$arr.data_type(),
1092+
$arr.data_type()
1093+
);
1094+
"UNSUPPORTED".to_string()
1095+
}
1096+
};
10761097
}
10771098

1078-
/// Helper function to format an Arrow value at a given index into a string.
1079-
/// Handles null values and different data types like String, Int64, Float64, Timestamp, Date32, and Boolean.
1099+
/// Function to format an Arrow value at a given index into a string.
1100+
/// Handles null values and different data types by downcasting the array to the appropriate type.
10801101
fn format_arrow_value(array: &dyn Array, idx: usize) -> String {
10811102
if array.is_null(idx) {
10821103
return "NULL".to_string();
10831104
}
10841105

10851106
match array.data_type() {
1086-
DataType::Utf8 => array
1087-
.as_any()
1088-
.downcast_ref::<StringArray>()
1089-
.unwrap()
1107+
DataType::Utf8 => try_downcast!(StringArray, array, |arr: &StringArray| arr
10901108
.value(idx)
1091-
.to_string(),
1092-
DataType::Utf8View => array
1093-
.as_any()
1094-
.downcast_ref::<StringViewArray>()
1095-
.unwrap()
1109+
.to_string()),
1110+
DataType::Utf8View => try_downcast!(StringViewArray, array, |arr: &StringViewArray| arr
10961111
.value(idx)
1097-
.to_string(),
1098-
DataType::Binary => {
1099-
let arr = array.as_any().downcast_ref::<BinaryArray>().unwrap();
1112+
.to_string()),
1113+
DataType::Binary => try_downcast!(BinaryArray, array, |arr: &BinaryArray| {
11001114
String::from_utf8_lossy(arr.value(idx)).to_string()
1101-
}
1102-
DataType::BinaryView => {
1103-
let arr = array.as_any().downcast_ref::<BinaryViewArray>().unwrap();
1115+
}),
1116+
DataType::BinaryView => try_downcast!(BinaryViewArray, array, |arr: &BinaryViewArray| {
11041117
String::from_utf8_lossy(arr.value(idx)).to_string()
1105-
}
1106-
DataType::Int64 => array
1107-
.as_any()
1108-
.downcast_ref::<Int64Array>()
1109-
.unwrap()
1118+
}),
1119+
DataType::Int64 => try_downcast!(Int64Array, array, |arr: &Int64Array| arr
11101120
.value(idx)
1111-
.to_string(),
1112-
DataType::Float64 => array
1113-
.as_any()
1114-
.downcast_ref::<Float64Array>()
1115-
.unwrap()
1121+
.to_string()),
1122+
DataType::Int32 => try_downcast!(
1123+
arrow_array::Int32Array,
1124+
array,
1125+
|arr: &arrow_array::Int32Array| arr.value(idx).to_string()
1126+
),
1127+
DataType::Int16 => try_downcast!(
1128+
arrow_array::Int16Array,
1129+
array,
1130+
|arr: &arrow_array::Int16Array| arr.value(idx).to_string()
1131+
),
1132+
DataType::Int8 => try_downcast!(
1133+
arrow_array::Int8Array,
1134+
array,
1135+
|arr: &arrow_array::Int8Array| arr.value(idx).to_string()
1136+
),
1137+
DataType::UInt64 => try_downcast!(
1138+
arrow_array::UInt64Array,
1139+
array,
1140+
|arr: &arrow_array::UInt64Array| arr.value(idx).to_string()
1141+
),
1142+
DataType::UInt32 => try_downcast!(
1143+
arrow_array::UInt32Array,
1144+
array,
1145+
|arr: &arrow_array::UInt32Array| arr.value(idx).to_string()
1146+
),
1147+
DataType::UInt16 => try_downcast!(
1148+
arrow_array::UInt16Array,
1149+
array,
1150+
|arr: &arrow_array::UInt16Array| arr.value(idx).to_string()
1151+
),
1152+
DataType::UInt8 => try_downcast!(
1153+
arrow_array::UInt8Array,
1154+
array,
1155+
|arr: &arrow_array::UInt8Array| arr.value(idx).to_string()
1156+
),
1157+
DataType::Float64 => try_downcast!(Float64Array, array, |arr: &Float64Array| arr
11161158
.value(idx)
1117-
.to_string(),
1118-
DataType::Timestamp(TimeUnit::Millisecond, _) => {
1119-
let arr = array
1120-
.as_any()
1121-
.downcast_ref::<TimestampMillisecondArray>()
1122-
.unwrap();
1123-
let timestamp = arr.value(idx);
1124-
DateTime::from_timestamp_millis(timestamp)
1125-
.map(|dt| dt.to_string())
1126-
.unwrap_or_else(|| "INVALID_TIMESTAMP".to_string())
1127-
}
1128-
DataType::Date32 => array
1129-
.as_any()
1130-
.downcast_ref::<Date32Array>()
1131-
.unwrap()
1159+
.to_string()),
1160+
DataType::Float32 => try_downcast!(
1161+
arrow_array::Float32Array,
1162+
array,
1163+
|arr: &arrow_array::Float32Array| arr.value(idx).to_string()
1164+
),
1165+
DataType::Timestamp(TimeUnit::Millisecond, _) => try_downcast!(
1166+
TimestampMillisecondArray,
1167+
array,
1168+
|arr: &TimestampMillisecondArray| {
1169+
let timestamp = arr.value(idx);
1170+
chrono::DateTime::from_timestamp_millis(timestamp)
1171+
.map(|dt| dt.to_string())
1172+
.unwrap_or_else(|| "INVALID_TIMESTAMP".to_string())
1173+
}
1174+
),
1175+
DataType::Date32 => try_downcast!(Date32Array, array, |arr: &Date32Array| arr
11321176
.value(idx)
1133-
.to_string(),
1134-
DataType::Boolean => array
1135-
.as_any()
1136-
.downcast_ref::<BooleanArray>()
1137-
.unwrap()
1177+
.to_string()),
1178+
DataType::Boolean => try_downcast!(BooleanArray, array, |arr: &BooleanArray| if arr
11381179
.value(idx)
1139-
.to_string(),
1180+
{
1181+
"true".to_string()
1182+
} else {
1183+
"false".to_string()
1184+
}),
11401185
DataType::Null => "NULL".to_string(),
11411186
_ => {
11421187
warn!(
@@ -1162,20 +1207,20 @@ async fn query_distinct_stats(
11621207
);
11631208
let mut distinct_stats = Vec::new();
11641209
if let Ok(df) = ctx.sql(&sql).await {
1165-
if let Ok(batches) = df.collect().await {
1166-
for rb in batches {
1167-
let Some(counts) = rb.column(0).as_any().downcast_ref::<Int64Array>() else {
1168-
warn!("Unexpected type for count column in stats query");
1169-
continue;
1170-
};
1171-
let values = rb.column(1).as_ref();
1172-
for i in 0..rb.num_rows() {
1173-
let value = format_arrow_value(values, i);
1174-
distinct_stats.push(DistinctStat {
1175-
distinct_value: value,
1176-
count: counts.value(i),
1177-
});
1178-
}
1210+
let mut stream = df.execute_stream().await.expect("Failed to execute stream");
1211+
while let Some(batch_result) = stream.next().await {
1212+
let rb = batch_result.expect("Failed to execute stream");
1213+
let Some(counts) = rb.column(0).as_any().downcast_ref::<Int64Array>() else {
1214+
warn!("Unexpected type for count column in stats query");
1215+
continue;
1216+
};
1217+
let values = rb.column(1).as_ref();
1218+
for i in 0..rb.num_rows() {
1219+
let value = format_arrow_value(values, i);
1220+
distinct_stats.push(DistinctStat {
1221+
distinct_value: value,
1222+
count: counts.value(i),
1223+
});
11791224
}
11801225
}
11811226
}

0 commit comments

Comments
 (0)