Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion clients/src/test/java/org/apache/kafka/test/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,6 @@ public static void waitForCondition(TestCondition testCondition, String conditio
public static void waitForCondition(TestCondition testCondition, long maxWaitMs, String conditionDetails) throws InterruptedException {
long startTime = System.currentTimeMillis();


while (!testCondition.conditionMet() && ((System.currentTimeMillis() - startTime) < maxWaitMs)) {
Thread.sleep(Math.min(maxWaitMs, 100L));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.regex.Pattern;
Expand All @@ -81,9 +79,6 @@ public class RegexSourceIntegrationTest {
private static final String FA_TOPIC = "fa";
private static final String FOO_TOPIC = "foo";

private static final int FIRST_UPDATE = 0;
private static final int SECOND_UPDATE = 1;

private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName();
private Properties streamsConfiguration;
Expand Down Expand Up @@ -121,6 +116,8 @@ public void tearDown() throws Exception {
public void testRegexMatchesTopicsAWhenCreated() throws Exception {

final Serde<String> stringSerde = Serdes.String();
final List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-1");
final List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");

StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration);

Expand All @@ -146,41 +143,35 @@ public void testRegexMatchesTopicsAWhenCreated() throws Exception {
TestCondition oneTopicAdded = new TestCondition() {
@Override
public boolean conditionMet() {
List<String> assignedTopics = testStreamThread.assignedTopicPartitions.get(FIRST_UPDATE);
return assignedTopics != null && assignedTopics.contains("TEST-TOPIC-1") && !assignedTopics.contains("TEST-TOPIC-2");
return testStreamThread.assignedTopicPartitions.equals(expectedFirstAssignment);
}
};

streamThreads[0] = testStreamThread;
streams.start();

TestUtils.waitForCondition(oneTopicAdded, STREAM_TASKS_NOT_UPDATED);
TestUtils.waitForCondition(oneTopicAdded, STREAM_TASKS_NOT_UPDATED);

CLUSTER.createTopic("TEST-TOPIC-2");

TestCondition secondTopicAdded = new TestCondition() {
@Override
public boolean conditionMet() {
List<String> assignedTopics = testStreamThread.assignedTopicPartitions.get(SECOND_UPDATE);
return assignedTopics != null && assignedTopics.contains("TEST-TOPIC-1") && assignedTopics.contains("TEST-TOPIC-2");
return testStreamThread.assignedTopicPartitions.equals(expectedSecondAssignment);
}
};

TestUtils.waitForCondition(secondTopicAdded, STREAM_TASKS_NOT_UPDATED);
TestUtils.waitForCondition(secondTopicAdded, STREAM_TASKS_NOT_UPDATED);

streams.close();

List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-1");
List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");

assertThat(testStreamThread.assignedTopicPartitions.get(FIRST_UPDATE), equalTo(expectedFirstAssignment));
assertThat(testStreamThread.assignedTopicPartitions.get(SECOND_UPDATE), equalTo(expectedSecondAssignment));
}

@Test
public void testRegexMatchesTopicsAWhenDeleted() throws Exception {

final Serde<String> stringSerde = Serdes.String();
final List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-A", "TEST-TOPIC-B");
final List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-B");

StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration);

Expand Down Expand Up @@ -209,34 +200,25 @@ public void testRegexMatchesTopicsAWhenDeleted() throws Exception {
TestCondition bothTopicsAdded = new TestCondition() {
@Override
public boolean conditionMet() {
List<String> assignedTopics = testStreamThread.assignedTopicPartitions.get(FIRST_UPDATE);
return assignedTopics != null && assignedTopics.contains("TEST-TOPIC-A") && assignedTopics.contains("TEST-TOPIC-B");
return testStreamThread.assignedTopicPartitions.equals(expectedFirstAssignment);
}
};
streams.start();

TestUtils.waitForCondition(bothTopicsAdded, STREAM_TASKS_NOT_UPDATED);
TestUtils.waitForCondition(bothTopicsAdded, STREAM_TASKS_NOT_UPDATED);

CLUSTER.deleteTopic("TEST-TOPIC-A");


TestCondition oneTopicRemoved = new TestCondition() {
@Override
public boolean conditionMet() {
List<String> assignedTopics = testStreamThread.assignedTopicPartitions.get(SECOND_UPDATE);
return assignedTopics != null && !assignedTopics.contains("TEST-TOPIC-A") && assignedTopics.contains("TEST-TOPIC-B");
return testStreamThread.assignedTopicPartitions.equals(expectedSecondAssignment);
}
};

TestUtils.waitForCondition(oneTopicRemoved, STREAM_TASKS_NOT_UPDATED);
TestUtils.waitForCondition(oneTopicRemoved, STREAM_TASKS_NOT_UPDATED);

streams.close();

List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-A", "TEST-TOPIC-B");
List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-B");

assertThat(testStreamThread.assignedTopicPartitions.get(FIRST_UPDATE), equalTo(expectedFirstAssignment));
assertThat(testStreamThread.assignedTopicPartitions.get(SECOND_UPDATE), equalTo(expectedSecondAssignment));
}


Expand Down Expand Up @@ -291,7 +273,7 @@ public void testShouldReadFromRegexAndNamedTopics() throws Exception {
assertThat(actualValues, equalTo(expectedReceivedValues));
}

//TODO should be updated to expected = TopologyBuilderException after KAFKA-3708
// TODO should be updated to expected = TopologyBuilderException after KAFKA-3708
@Test(expected = AssertionError.class)
public void testNoMessagesSentExceptionFromOverlappingPatterns() throws Exception {

Expand All @@ -304,8 +286,8 @@ public void testNoMessagesSentExceptionFromOverlappingPatterns() throws Exceptio
KStreamBuilder builder = new KStreamBuilder();


// overlapping patterns here, no messages should be sent as TopologyBuilderException
// will be thrown when the processor topology is built.
// overlapping patterns here, no messages should be sent as TopologyBuilderException
// will be thrown when the processor topology is built.

KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("foo.*"));
KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("f.*"));
Expand Down Expand Up @@ -334,25 +316,23 @@ public void testNoMessagesSentExceptionFromOverlappingPatterns() throws Exceptio
}

private class TestStreamThread extends StreamThread {

public Map<Integer, List<String>> assignedTopicPartitions = new HashMap<>();
private int index = 0;
public volatile List<String> assignedTopicPartitions = new ArrayList<>();

public TestStreamThread(TopologyBuilder builder, StreamsConfig config, KafkaClientSupplier clientSupplier, String applicationId, String clientId, UUID processId, Metrics metrics, Time time) {
super(builder, config, clientSupplier, applicationId, clientId, processId, metrics, time, new StreamsMetadataState(builder));
}

@Override
public StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
List<String> assignedTopics = new ArrayList<>();
List<String> topicPartitions = new ArrayList<>();
for (TopicPartition partition : partitions) {
assignedTopics.add(partition.topic());
topicPartitions.add(partition.topic());
}
Collections.sort(assignedTopics);
assignedTopicPartitions.put(index++, assignedTopics);
Collections.sort(topicPartitions);

assignedTopicPartitions = topicPartitions;
return super.createStreamTask(id, partitions);
}

}

}