Skip to content

Commit b4b8073

Browse files
authored
KAFKA-19042 Move PlaintextConsumerFetchTest to client-integration-tests module (#19520)
Use Java to rewrite `PlaintextConsumerFetchTest` by new test infra and move it to client-integration-tests module. Reviewers: PoAn Yang <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 5a96816 commit b4b8073

File tree

4 files changed

+689
-347
lines changed

4 files changed

+689
-347
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients;
18+
19+
import org.apache.kafka.clients.consumer.Consumer;
20+
import org.apache.kafka.clients.consumer.ConsumerRecord;
21+
import org.apache.kafka.clients.producer.Producer;
22+
import org.apache.kafka.clients.producer.ProducerRecord;
23+
import org.apache.kafka.common.TopicPartition;
24+
import org.apache.kafka.common.record.TimestampType;
25+
import org.apache.kafka.common.test.ClusterInstance;
26+
import org.apache.kafka.common.test.TestUtils;
27+
28+
import java.time.Duration;
29+
import java.util.ArrayList;
30+
import java.util.List;
31+
import java.util.Set;
32+
33+
import static org.junit.jupiter.api.Assertions.assertEquals;
34+
35+
public class ClientsTestUtils {
36+
37+
private static final String KEY_PREFIX = "key ";
38+
private static final String VALUE_PREFIX = "value ";
39+
40+
private ClientsTestUtils() {}
41+
42+
public static <K, V> List<ConsumerRecord<K, V>> consumeRecords(
43+
Consumer<K, V> consumer,
44+
int numRecords
45+
) throws InterruptedException {
46+
List<ConsumerRecord<K, V>> records = new ArrayList<>();
47+
TestUtils.waitForCondition(() -> {
48+
consumer.poll(Duration.ofMillis(100)).forEach(records::add);
49+
return records.size() >= numRecords;
50+
}, 60000, "Timed out before consuming expected " + numRecords + " records.");
51+
52+
return records;
53+
}
54+
55+
public static void consumeAndVerifyRecords(
56+
Consumer<byte[], byte[]> consumer,
57+
TopicPartition tp,
58+
int numRecords,
59+
int startingOffset,
60+
int startingKeyAndValueIndex,
61+
long startingTimestamp,
62+
long timestampIncrement
63+
) throws InterruptedException {
64+
var records = consumeRecords(consumer, numRecords);
65+
for (var i = 0; i < numRecords; i++) {
66+
var record = records.get(i);
67+
var offset = startingOffset + i;
68+
69+
assertEquals(tp.topic(), record.topic());
70+
assertEquals(tp.partition(), record.partition());
71+
72+
assertEquals(TimestampType.CREATE_TIME, record.timestampType());
73+
var timestamp = startingTimestamp + i * (timestampIncrement > 0 ? timestampIncrement : 1);
74+
assertEquals(timestamp, record.timestamp());
75+
76+
assertEquals(offset, record.offset());
77+
var keyAndValueIndex = startingKeyAndValueIndex + i;
78+
assertEquals(KEY_PREFIX + keyAndValueIndex, new String(record.key()));
79+
assertEquals(VALUE_PREFIX + keyAndValueIndex, new String(record.value()));
80+
// this is true only because K and V are byte arrays
81+
assertEquals((KEY_PREFIX + keyAndValueIndex).length(), record.serializedKeySize());
82+
assertEquals((VALUE_PREFIX + keyAndValueIndex).length(), record.serializedValueSize());
83+
}
84+
}
85+
86+
public static void consumeAndVerifyRecords(
87+
Consumer<byte[], byte[]> consumer,
88+
TopicPartition tp,
89+
int numRecords,
90+
int startingOffset,
91+
int startingKeyAndValueIndex,
92+
long startingTimestamp
93+
) throws InterruptedException {
94+
consumeAndVerifyRecords(consumer, tp, numRecords, startingOffset, startingKeyAndValueIndex, startingTimestamp, -1);
95+
}
96+
97+
public static void consumeAndVerifyRecords(
98+
Consumer<byte[], byte[]> consumer,
99+
TopicPartition tp,
100+
int numRecords,
101+
int startingOffset
102+
) throws InterruptedException {
103+
consumeAndVerifyRecords(consumer, tp, numRecords, startingOffset, 0, 0, -1);
104+
}
105+
106+
public static void sendRecords(
107+
ClusterInstance cluster,
108+
TopicPartition tp,
109+
int numRecords,
110+
long startingTimestamp,
111+
long timestampIncrement
112+
) {
113+
try (Producer<byte[], byte[]> producer = cluster.producer()) {
114+
for (var i = 0; i < numRecords; i++) {
115+
sendRecord(producer, tp, startingTimestamp, i, timestampIncrement);
116+
}
117+
producer.flush();
118+
}
119+
}
120+
121+
public static void sendRecords(
122+
ClusterInstance cluster,
123+
TopicPartition tp,
124+
int numRecords,
125+
long startingTimestamp
126+
) {
127+
sendRecords(cluster, tp, numRecords, startingTimestamp, -1);
128+
}
129+
130+
public static void sendRecords(
131+
ClusterInstance cluster,
132+
TopicPartition tp,
133+
int numRecords
134+
) {
135+
sendRecords(cluster, tp, numRecords, System.currentTimeMillis());
136+
}
137+
138+
public static List<ProducerRecord<byte[], byte[]>> sendRecords(
139+
Producer<byte[], byte[]> producer,
140+
TopicPartition tp,
141+
int numRecords,
142+
long startingTimestamp,
143+
long timestampIncrement
144+
) {
145+
List<ProducerRecord<byte[], byte[]>> records = new ArrayList<>();
146+
for (var i = 0; i < numRecords; i++) {
147+
var record = sendRecord(producer, tp, startingTimestamp, i, timestampIncrement);
148+
records.add(record);
149+
}
150+
producer.flush();
151+
return records;
152+
}
153+
154+
public static void sendRecords(
155+
Producer<byte[], byte[]> producer,
156+
TopicPartition tp,
157+
int numRecords,
158+
long startingTimestamp
159+
) {
160+
for (var i = 0; i < numRecords; i++) {
161+
sendRecord(producer, tp, startingTimestamp, i, -1);
162+
}
163+
producer.flush();
164+
}
165+
166+
public static void awaitAssignment(
167+
Consumer<byte[], byte[]> consumer,
168+
Set<TopicPartition> expectedAssignment
169+
) throws InterruptedException {
170+
TestUtils.waitForCondition(() -> {
171+
consumer.poll(Duration.ofMillis(100));
172+
return consumer.assignment().equals(expectedAssignment);
173+
}, "Timed out while awaiting expected assignment " + expectedAssignment + ". " +
174+
"The current assignment is " + consumer.assignment()
175+
);
176+
}
177+
178+
private static ProducerRecord<byte[], byte[]> sendRecord(
179+
Producer<byte[], byte[]> producer,
180+
TopicPartition tp,
181+
long startingTimestamp,
182+
int numRecord,
183+
long timestampIncrement
184+
) {
185+
var timestamp = startingTimestamp + numRecord * (timestampIncrement > 0 ? timestampIncrement : 1);
186+
var record = new ProducerRecord<>(
187+
tp.topic(),
188+
tp.partition(),
189+
timestamp,
190+
(KEY_PREFIX + numRecord).getBytes(),
191+
(VALUE_PREFIX + numRecord).getBytes()
192+
);
193+
producer.send(record);
194+
return record;
195+
}
196+
}

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCallbackTest.java

+4-64
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,23 @@
1616
*/
1717
package org.apache.kafka.clients.consumer;
1818

19-
import org.apache.kafka.clients.producer.Producer;
20-
import org.apache.kafka.clients.producer.ProducerRecord;
2119
import org.apache.kafka.common.TopicPartition;
22-
import org.apache.kafka.common.record.TimestampType;
2320
import org.apache.kafka.common.test.ClusterInstance;
2421
import org.apache.kafka.common.test.TestUtils;
2522
import org.apache.kafka.common.test.api.ClusterTest;
2623
import org.apache.kafka.common.test.api.ClusterTestDefaults;
2724
import org.apache.kafka.common.test.api.Type;
2825

2926
import java.time.Duration;
30-
import java.util.ArrayList;
3127
import java.util.Collection;
3228
import java.util.List;
3329
import java.util.Locale;
3430
import java.util.Map;
3531
import java.util.concurrent.atomic.AtomicBoolean;
3632
import java.util.function.BiConsumer;
3733

34+
import static org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords;
35+
import static org.apache.kafka.clients.ClientsTestUtils.sendRecords;
3836
import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
3937
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
4038
import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
@@ -198,7 +196,7 @@ private void testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedC
198196
var totalRecords = 120;
199197
var startingTimestamp = 0L;
200198

201-
sendRecords(totalRecords, startingTimestamp);
199+
sendRecords(cluster, tp, totalRecords, startingTimestamp);
202200

203201
triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, partitions) -> {
204202
executeConsumer.seek(tp, startingOffset);
@@ -209,6 +207,7 @@ private void testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedC
209207
consumer.resume(List.of(tp));
210208
consumeAndVerifyRecords(
211209
consumer,
210+
tp,
212211
(int) (totalRecords - startingOffset),
213212
(int) startingOffset,
214213
(int) startingOffset,
@@ -290,63 +289,4 @@ private Consumer<byte[], byte[]> createConsumer(GroupProtocol protocol) {
290289
ENABLE_AUTO_COMMIT_CONFIG, "false"
291290
));
292291
}
293-
294-
private void sendRecords(int numRecords, long startingTimestamp) {
295-
try (Producer<byte[], byte[]> producer = cluster.producer()) {
296-
for (var i = 0; i < numRecords; i++) {
297-
var timestamp = startingTimestamp + i;
298-
var record = new ProducerRecord<>(
299-
tp.topic(),
300-
tp.partition(),
301-
timestamp,
302-
("key " + i).getBytes(),
303-
("value " + i).getBytes()
304-
);
305-
producer.send(record);
306-
}
307-
producer.flush();
308-
}
309-
}
310-
311-
protected void consumeAndVerifyRecords(
312-
Consumer<byte[], byte[]> consumer,
313-
int numRecords,
314-
int startingOffset,
315-
int startingKeyAndValueIndex,
316-
long startingTimestamp
317-
) throws InterruptedException {
318-
var records = consumeRecords(consumer, numRecords);
319-
for (var i = 0; i < numRecords; i++) {
320-
var record = records.get(i);
321-
var offset = startingOffset + i;
322-
323-
assertEquals(tp.topic(), record.topic());
324-
assertEquals(tp.partition(), record.partition());
325-
326-
assertEquals(TimestampType.CREATE_TIME, record.timestampType());
327-
var timestamp = startingTimestamp + i;
328-
assertEquals(timestamp, record.timestamp());
329-
330-
assertEquals(offset, record.offset());
331-
var keyAndValueIndex = startingKeyAndValueIndex + i;
332-
assertEquals("key " + keyAndValueIndex, new String(record.key()));
333-
assertEquals("value " + keyAndValueIndex, new String(record.value()));
334-
// this is true only because K and V are byte arrays
335-
assertEquals(("key " + keyAndValueIndex).length(), record.serializedKeySize());
336-
assertEquals(("value " + keyAndValueIndex).length(), record.serializedValueSize());
337-
}
338-
}
339-
340-
protected <K, V> List<ConsumerRecord<K, V>> consumeRecords(
341-
Consumer<K, V> consumer,
342-
int numRecords
343-
) throws InterruptedException {
344-
List<ConsumerRecord<K, V>> records = new ArrayList<>();
345-
TestUtils.waitForCondition(() -> {
346-
consumer.poll(Duration.ofMillis(100)).forEach(records::add);
347-
return records.size() >= numRecords;
348-
}, 60000, "Timed out before consuming expected " + numRecords + " records.");
349-
350-
return records;
351-
}
352292
}

0 commit comments

Comments
 (0)