Skip to content

Commit 2be14dd

Browse files
committed
Raise a runtime exception when the KafkaEvent is invalid.
1 parent 6da89a3 commit 2be14dd

File tree

2 files changed

+45
-3
lines changed

2 files changed

+45
-3
lines changed

powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/serializers/AbstractKafkaDeserializer.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,13 @@ private boolean isConsumerRecordsType(Type type) {
118118

119119
private <K, V> ConsumerRecords<K, V> convertToConsumerRecords(KafkaEvent kafkaEvent, Class<K> keyType,
120120
Class<V> valueType) {
121+
// Validate that this is actually a Kafka event by checking for required properties
122+
if (kafkaEvent == null || kafkaEvent.getEventSource() == null) {
123+
throw new RuntimeException(
124+
"Failed to deserialize Lambda handler input to ConsumerRecords: Input is not a valid Kafka event.");
125+
}
121126

122-
if (kafkaEvent == null || kafkaEvent.getRecords() == null) {
127+
if (kafkaEvent.getRecords() == null) {
123128
return ConsumerRecords.empty();
124129
}
125130

@@ -138,7 +143,7 @@ private <K, V> ConsumerRecords<K, V> convertToConsumerRecords(KafkaEvent kafkaEv
138143

139144
return createConsumerRecords(recordsMap);
140145
}
141-
146+
142147
/**
143148
* Creates ConsumerRecords with compatibility for both Kafka 3.x.x and 4.x.x.
144149
*
@@ -147,7 +152,8 @@ private <K, V> ConsumerRecords<K, V> convertToConsumerRecords(KafkaEvent kafkaEv
147152
* @param records Map of records by topic partition
148153
* @return ConsumerRecords instance
149154
*/
150-
protected <K, V> ConsumerRecords<K, V> createConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>> records) {
155+
protected <K, V> ConsumerRecords<K, V> createConsumerRecords(
156+
Map<TopicPartition, List<ConsumerRecord<K, V>>> records) {
151157
try {
152158
// Try to use the Kafka 4.x.x constructor with nextOffsets parameter
153159
return new ConsumerRecords<>(records, Map.of());

powertools-kafka/src/test/java/software/amazon/lambda/powertools/kafka/serializers/AbstractKafkaDeserializerTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,42 @@ void shouldHandleNullRecords(InputType inputType) {
307307
assertThat(records.count()).isZero();
308308
}
309309

310+
@ParameterizedTest
311+
@MethodSource("inputTypes")
312+
void shouldThrowExceptionWhenEventSourceIsNull(InputType inputType) {
313+
// Given
314+
// Create a JSON without eventSource property
315+
String kafkaJson = "{\n" +
316+
" \"records\": {\n" +
317+
" \"test-topic-1\": [\n" +
318+
" {\n" +
319+
" \"topic\": \"test-topic-1\",\n" +
320+
" \"partition\": 0,\n" +
321+
" \"offset\": 15,\n" +
322+
" \"timestamp\": 1545084650987,\n" +
323+
" \"timestampType\": \"CREATE_TIME\",\n" +
324+
" \"key\": null,\n" +
325+
" \"value\": null,\n" +
326+
" \"headers\": []\n" +
327+
" }\n" +
328+
" ]\n" +
329+
" }\n" +
330+
"}";
331+
Type type = TestUtils.createConsumerRecordsType(String.class, TestProductPojo.class);
332+
333+
// When/Then
334+
if (inputType == InputType.INPUT_STREAM) {
335+
ByteArrayInputStream inputStream = new ByteArrayInputStream(kafkaJson.getBytes());
336+
assertThatThrownBy(() -> deserializer.fromJson(inputStream, type))
337+
.isInstanceOf(RuntimeException.class)
338+
.hasMessageContaining("Input is not a valid Kafka event");
339+
} else {
340+
assertThatThrownBy(() -> deserializer.fromJson(kafkaJson, type))
341+
.isInstanceOf(RuntimeException.class)
342+
.hasMessageContaining("Input is not a valid Kafka event");
343+
}
344+
}
345+
310346
static Stream<Arguments> primitiveTypesProvider() {
311347
return Stream.of(
312348
// For each primitive type, test with both INPUT_STREAM and STRING

0 commit comments

Comments
 (0)