Skip to content

Commit ef08564

Browse files
moved stats calculation to object store sync
1 parent 938d3e2 commit ef08564

File tree

7 files changed

+347
-273
lines changed

7 files changed

+347
-273
lines changed

src/handlers/http/ingest.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ pub async fn ingest(
100100
.create_stream_if_not_exists(
101101
&stream_name,
102102
StreamType::UserDefined,
103+
None,
103104
vec![log_source_entry.clone()],
104105
)
105106
.await?;
@@ -183,6 +184,7 @@ pub async fn handle_otel_logs_ingestion(
183184
.create_stream_if_not_exists(
184185
&stream_name,
185186
StreamType::UserDefined,
187+
None,
186188
vec![log_source_entry.clone()],
187189
)
188190
.await?;
@@ -248,6 +250,7 @@ pub async fn handle_otel_metrics_ingestion(
248250
.create_stream_if_not_exists(
249251
&stream_name,
250252
StreamType::UserDefined,
253+
None,
251254
vec![log_source_entry.clone()],
252255
)
253256
.await?;
@@ -313,6 +316,7 @@ pub async fn handle_otel_traces_ingestion(
313316
.create_stream_if_not_exists(
314317
&stream_name,
315318
StreamType::UserDefined,
319+
None,
316320
vec![log_source_entry.clone()],
317321
)
318322
.await?;

src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,12 @@ pub const OBJECT_STORE_DATA_GRANULARITY: u32 = LOCAL_SYNC_INTERVAL.as_secs() as
7272
/// Describes the duration at the end of which parquets are pushed into objectstore.
7373
pub const STORAGE_UPLOAD_INTERVAL: Duration = Duration::from_secs(30);
7474

75+
/// Describes the duration during which local sync should be completed
76+
pub const LOCAL_SYNC_THRESHOLD: Duration = Duration::from_secs(30); // 30 secs
77+
78+
/// Describes the duration during which object store sync should be completed
79+
pub const OBJECT_STORE_SYNC_THRESHOLD: Duration = Duration::from_secs(15); // 15 secs
80+
7581
// A single HTTP client for all outgoing HTTP requests from the parseable server
7682
pub static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| {
7783
ClientBuilder::new()

src/parseable/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,7 @@ impl Parseable {
330330
.create_stream_if_not_exists(
331331
INTERNAL_STREAM_NAME,
332332
StreamType::Internal,
333+
None,
333334
vec![log_source_entry],
334335
)
335336
.await
@@ -354,6 +355,7 @@ impl Parseable {
354355
&self,
355356
stream_name: &str,
356357
stream_type: StreamType,
358+
custom_partition: Option<&String>,
357359
log_source: Vec<LogSourceEntry>,
358360
) -> Result<bool, PostError> {
359361
if self.streams.contains(stream_name) {
@@ -375,7 +377,7 @@ impl Parseable {
375377
stream_name.to_string(),
376378
"",
377379
None,
378-
None,
380+
custom_partition,
379381
false,
380382
Arc::new(Schema::empty()),
381383
stream_type,

0 commit comments

Comments
 (0)