Skip to content

Commit 6da89a3

Browse files
committed
When falling back to Lambda default, handle conversion between InputStream and String.
1 parent ef04849 commit 6da89a3

File tree

2 files changed

+73
-0
lines changed

2 files changed

+73
-0
lines changed

powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/serializers/LambdaDefaultDeserializer.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
*/
1313
package software.amazon.lambda.powertools.kafka.serializers;
1414

15+
import java.io.IOException;
1516
import java.io.InputStream;
1617
import java.lang.reflect.Type;
1718

@@ -33,6 +34,15 @@ public <T> T fromJson(InputStream input, Type type) {
3334
return (T) input;
3435
}
3536

37+
// If the target type is String, read the input stream as a String
38+
if (type.equals(String.class)) {
39+
try {
40+
return (T) new String(input.readAllBytes());
41+
} catch (IOException e) {
42+
throw new RuntimeException("Failed to read input stream as String", e);
43+
}
44+
}
45+
3646
return (T) JacksonFactory.getInstance().getSerializer(type).fromJson(input);
3747
}
3848

@@ -44,6 +54,11 @@ public <T> T fromJson(String input, Type type) {
4454
return (T) input;
4555
}
4656

57+
// If the target type is InputStream, read the input stream as a String
58+
if (type.equals(InputStream.class)) {
59+
return (T) new String(input).getBytes();
60+
}
61+
4762
return (T) JacksonFactory.getInstance().getSerializer(type).fromJson(input);
4863
}
4964
}

powertools-kafka/src/test/java/software/amazon/lambda/powertools/kafka/PowertoolsSerializerTest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
package software.amazon.lambda.powertools.kafka;
1414

1515
import static org.assertj.core.api.Assertions.assertThat;
16+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
1617
import static software.amazon.lambda.powertools.kafka.testutils.TestUtils.createConsumerRecordsType;
1718
import static software.amazon.lambda.powertools.kafka.testutils.TestUtils.serializeAvro;
1819

@@ -40,6 +41,7 @@
4041
import com.fasterxml.jackson.core.JsonProcessingException;
4142
import com.fasterxml.jackson.databind.ObjectMapper;
4243

44+
import software.amazon.lambda.powertools.kafka.serializers.LambdaDefaultDeserializer;
4345
import software.amazon.lambda.powertools.kafka.serializers.PowertoolsDeserializer;
4446
import software.amazon.lambda.powertools.kafka.testutils.TestProductPojo;
4547

@@ -142,6 +144,62 @@ void shouldHandleInputStreamType() throws IOException {
142144
assertThat(resultString).isEqualTo(testInput);
143145
}
144146

147+
@Test
148+
void shouldConvertInputStreamToString() {
149+
// When
150+
LambdaDefaultDeserializer deserializer = new LambdaDefaultDeserializer();
151+
152+
// Then
153+
String expected = "This is a test string";
154+
ByteArrayInputStream inputStream = new ByteArrayInputStream(expected.getBytes());
155+
156+
// Convert InputStream to String
157+
String result = deserializer.fromJson(inputStream, String.class);
158+
159+
// Verify the result
160+
assertThat(result).isEqualTo(expected);
161+
}
162+
163+
@Test
164+
void shouldThrowRuntimeExceptionWhenInputStreamIsInvalid() {
165+
// When
166+
LambdaDefaultDeserializer deserializer = new LambdaDefaultDeserializer();
167+
168+
// Create a problematic InputStream that throws IOException when read
169+
InputStream problematicStream = new InputStream() {
170+
@Override
171+
public int read() throws IOException {
172+
throw new IOException("Simulated IO error");
173+
}
174+
175+
@Override
176+
public byte[] readAllBytes() throws IOException {
177+
throw new IOException("Simulated IO error");
178+
}
179+
};
180+
181+
// Then
182+
assertThatThrownBy(() -> deserializer.fromJson(problematicStream, String.class))
183+
.isInstanceOf(RuntimeException.class)
184+
.hasMessageContaining("Failed to read input stream as String");
185+
}
186+
187+
@Test
188+
void shouldConvertStringToByteArray() {
189+
// When
190+
LambdaDefaultDeserializer deserializer = new LambdaDefaultDeserializer();
191+
192+
// Then
193+
String input = "This is a test string";
194+
195+
// Convert String to InputStream
196+
byte[] result = deserializer.fromJson(input, InputStream.class);
197+
198+
// Verify the result
199+
String resultString = new String(result);
200+
assertThat(resultString).isEqualTo(input);
201+
}
202+
145203
@ParameterizedTest
146204
@MethodSource("inputTypes")
147205
@SetEnvironmentVariable(key = "_HANDLER", value = "software.amazon.lambda.powertools.kafka.testutils.JsonHandler::handleRequest")

0 commit comments

Comments
 (0)