diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 4baa63bff4c6a..44026be4b4a46 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -247,7 +247,6 @@ public static void waitForCondition(TestCondition testCondition, String conditio public static void waitForCondition(TestCondition testCondition, long maxWaitMs, String conditionDetails) throws InterruptedException { long startTime = System.currentTimeMillis(); - while (!testCondition.conditionMet() && ((System.currentTimeMillis() - startTime) < maxWaitMs)) { Thread.sleep(Math.min(maxWaitMs, 100L)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index 08928937dbc63..51fa06ae4ad02 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -52,9 +52,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Properties; import java.util.UUID; import java.util.regex.Pattern; @@ -81,9 +79,6 @@ public class RegexSourceIntegrationTest { private static final String FA_TOPIC = "fa"; private static final String FOO_TOPIC = "foo"; - private static final int FIRST_UPDATE = 0; - private static final int SECOND_UPDATE = 1; - private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic"; private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName(); private Properties streamsConfiguration; @@ -121,6 +116,8 @@ public void tearDown() throws Exception { public void testRegexMatchesTopicsAWhenCreated() throws Exception { final Serde stringSerde = Serdes.String(); + final List expectedFirstAssignment = Arrays.asList("TEST-TOPIC-1"); + final List expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2"); StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration); @@ -146,41 +143,35 @@ public void testRegexMatchesTopicsAWhenCreated() throws Exception { TestCondition oneTopicAdded = new TestCondition() { @Override public boolean conditionMet() { - List assignedTopics = testStreamThread.assignedTopicPartitions.get(FIRST_UPDATE); - return assignedTopics != null && assignedTopics.contains("TEST-TOPIC-1") && !assignedTopics.contains("TEST-TOPIC-2"); + return testStreamThread.assignedTopicPartitions.equals(expectedFirstAssignment); } }; streamThreads[0] = testStreamThread; streams.start(); - TestUtils.waitForCondition(oneTopicAdded, STREAM_TASKS_NOT_UPDATED); + TestUtils.waitForCondition(oneTopicAdded, STREAM_TASKS_NOT_UPDATED); CLUSTER.createTopic("TEST-TOPIC-2"); TestCondition secondTopicAdded = new TestCondition() { @Override public boolean conditionMet() { - List assignedTopics = testStreamThread.assignedTopicPartitions.get(SECOND_UPDATE); - return assignedTopics != null && assignedTopics.contains("TEST-TOPIC-1") && assignedTopics.contains("TEST-TOPIC-2"); + return testStreamThread.assignedTopicPartitions.equals(expectedSecondAssignment); } }; - TestUtils.waitForCondition(secondTopicAdded, STREAM_TASKS_NOT_UPDATED); + TestUtils.waitForCondition(secondTopicAdded, STREAM_TASKS_NOT_UPDATED); streams.close(); - - List expectedFirstAssignment = Arrays.asList("TEST-TOPIC-1"); - List expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2"); - - assertThat(testStreamThread.assignedTopicPartitions.get(FIRST_UPDATE), equalTo(expectedFirstAssignment)); - assertThat(testStreamThread.assignedTopicPartitions.get(SECOND_UPDATE), equalTo(expectedSecondAssignment)); } @Test public void testRegexMatchesTopicsAWhenDeleted() throws Exception { final Serde stringSerde = Serdes.String(); + final List expectedFirstAssignment = Arrays.asList("TEST-TOPIC-A", "TEST-TOPIC-B"); + final List expectedSecondAssignment = Arrays.asList("TEST-TOPIC-B"); StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration); @@ -209,34 +200,25 @@ public void testRegexMatchesTopicsAWhenDeleted() throws Exception { TestCondition bothTopicsAdded = new TestCondition() { @Override public boolean conditionMet() { - List assignedTopics = testStreamThread.assignedTopicPartitions.get(FIRST_UPDATE); - return assignedTopics != null && assignedTopics.contains("TEST-TOPIC-A") && assignedTopics.contains("TEST-TOPIC-B"); + return testStreamThread.assignedTopicPartitions.equals(expectedFirstAssignment); } }; streams.start(); - TestUtils.waitForCondition(bothTopicsAdded, STREAM_TASKS_NOT_UPDATED); + TestUtils.waitForCondition(bothTopicsAdded, STREAM_TASKS_NOT_UPDATED); CLUSTER.deleteTopic("TEST-TOPIC-A"); - TestCondition oneTopicRemoved = new TestCondition() { @Override public boolean conditionMet() { - List assignedTopics = testStreamThread.assignedTopicPartitions.get(SECOND_UPDATE); - return assignedTopics != null && !assignedTopics.contains("TEST-TOPIC-A") && assignedTopics.contains("TEST-TOPIC-B"); + return testStreamThread.assignedTopicPartitions.equals(expectedSecondAssignment); } }; - TestUtils.waitForCondition(oneTopicRemoved, STREAM_TASKS_NOT_UPDATED); + TestUtils.waitForCondition(oneTopicRemoved, STREAM_TASKS_NOT_UPDATED); streams.close(); - - List expectedFirstAssignment = Arrays.asList("TEST-TOPIC-A", "TEST-TOPIC-B"); - List expectedSecondAssignment = Arrays.asList("TEST-TOPIC-B"); - - assertThat(testStreamThread.assignedTopicPartitions.get(FIRST_UPDATE), equalTo(expectedFirstAssignment)); - assertThat(testStreamThread.assignedTopicPartitions.get(SECOND_UPDATE), equalTo(expectedSecondAssignment)); } @@ -291,7 +273,7 @@ public void testShouldReadFromRegexAndNamedTopics() throws Exception { assertThat(actualValues, equalTo(expectedReceivedValues)); } - //TODO should be updated to expected = TopologyBuilderException after KAFKA-3708 + // TODO should be updated to expected = TopologyBuilderException after KAFKA-3708 @Test(expected = AssertionError.class) public void testNoMessagesSentExceptionFromOverlappingPatterns() throws Exception { @@ -304,8 +286,8 @@ public void testNoMessagesSentExceptionFromOverlappingPatterns() throws Exceptio KStreamBuilder builder = new KStreamBuilder(); - // overlapping patterns here, no messages should be sent as TopologyBuilderException - // will be thrown when the processor topology is built. + // overlapping patterns here, no messages should be sent as TopologyBuilderException + // will be thrown when the processor topology is built. KStream pattern1Stream = builder.stream(Pattern.compile("foo.*")); KStream pattern2Stream = builder.stream(Pattern.compile("f.*")); @@ -334,9 +316,7 @@ public void testNoMessagesSentExceptionFromOverlappingPatterns() throws Exceptio } private class TestStreamThread extends StreamThread { - - public Map> assignedTopicPartitions = new HashMap<>(); - private int index = 0; + public volatile List assignedTopicPartitions = new ArrayList<>(); public TestStreamThread(TopologyBuilder builder, StreamsConfig config, KafkaClientSupplier clientSupplier, String applicationId, String clientId, UUID processId, Metrics metrics, Time time) { super(builder, config, clientSupplier, applicationId, clientId, processId, metrics, time, new StreamsMetadataState(builder)); @@ -344,15 +324,15 @@ public TestStreamThread(TopologyBuilder builder, StreamsConfig config, KafkaClie @Override public StreamTask createStreamTask(TaskId id, Collection partitions) { - List assignedTopics = new ArrayList<>(); + List topicPartitions = new ArrayList<>(); for (TopicPartition partition : partitions) { - assignedTopics.add(partition.topic()); + topicPartitions.add(partition.topic()); } - Collections.sort(assignedTopics); - assignedTopicPartitions.put(index++, assignedTopics); + Collections.sort(topicPartitions); + + assignedTopicPartitions = topicPartitions; return super.createStreamTask(id, partitions); } } - }