diff --git a/src/connectors/kafka/processor.rs b/src/connectors/kafka/processor.rs index 218af7b25..191396660 100644 --- a/src/connectors/kafka/processor.rs +++ b/src/connectors/kafka/processor.rs @@ -109,10 +109,7 @@ impl Processor, ()> for ParseableSinkProcessor { let len = records.len(); debug!("Processing {} records", len); - self.build_event_from_chunk(&records) - .await? - .process() - .await?; + self.build_event_from_chunk(&records).await?.process()?; debug!("Processed {} records", len); Ok(()) diff --git a/src/event/mod.rs b/src/event/mod.rs index 8b599d4b8..42b6514f9 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -46,7 +46,7 @@ pub struct Event { // Events holds the schema related to a each event for a single log stream impl Event { - pub async fn process(self) -> Result<(), EventError> { + pub fn process(self) -> Result<(), EventError> { let mut key = get_schema_key(&self.rb.schema().fields); if self.time_partition.is_some() { let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string(); diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 42de1cb4f..6706470a2 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -106,8 +106,8 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< custom_partition_values: HashMap::new(), stream_type: StreamType::Internal, } - .process() - .await?; + .process()?; + Ok(()) } diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 37f5ee368..005b38a91 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -160,8 +160,7 @@ async fn push_logs( custom_partition_values, stream_type: StreamType::UserDefined, } - .process() - .await?; + .process()?; } Ok(()) }