Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,46 +1,44 @@
package no.nav.foreldrepenger.vtp.kafkaembedded;

import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import java.util.Properties;

import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import kafka.server.KafkaConfig;
import kafka.server.KafkaRaftServer;


class KafkaLocal {
private Logger LOG = LoggerFactory.getLogger(KafkaLocal.class);

private KafkaServer kafka;
private ZooKeeperLocal zookeeper;
private KafkaRaftServer kafka;

KafkaLocal(Properties kafkaProperties, Properties zkProperties) {
KafkaLocal(Properties kafkaProperties) {
var kafkaConfig = new KafkaConfig(kafkaProperties);
startZookeeper(zkProperties);
startKafka(kafkaConfig);
}

private void startZookeeper(Properties zkProperties) {
LOG.info("starting local zookeeper...");
zookeeper = new ZooKeeperLocal(zkProperties);
}

private void startKafka(KafkaConfig kafkaConfig) {
kafka = new KafkaServer(kafkaConfig,
KafkaServer.$lessinit$greater$default$2(),
KafkaServer.$lessinit$greater$default$3(),
true);
LOG.info("starting local kafka broker...");
LOG.info("Starting Kafka in KRaft mode...");
kafka = new KafkaRaftServer(kafkaConfig, Time.SYSTEM);
kafka.startup();
LOG.info("Kafka started successfully in KRaft mode");
}

void stop() {
LOG.info("stopping kafka...");
kafka.shutdown();
zookeeper.stop();
LOG.info("Stopping Kafka...");
if (kafka != null) {
kafka.shutdown();
kafka.awaitShutdown();
}
// Delete temp directories
LOG.info("Kafka stopped");
}

public KafkaServer getKafka() {

public KafkaRaftServer getKafka() {
return kafka;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package no.nav.foreldrepenger.vtp.kafkaembedded;

import no.nav.foreldrepenger.util.KeystoreUtils;
import java.util.Collection;
import java.util.Properties;
import java.util.stream.Collectors;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand All @@ -12,9 +15,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Properties;
import java.util.stream.Collectors;
import no.nav.foreldrepenger.util.KeystoreUtils;


public class LocalKafkaServer {
Expand All @@ -24,11 +25,9 @@ public class LocalKafkaServer {
private KafkaLocal kafka;
private LocalKafkaProducer localProducer;
private AdminClient kafkaAdminClient;
private int zookeeperPort;
private int kafkaBrokerPort;

public LocalKafkaServer(final int zookeeperPort, final int kafkaBrokerPort, Collection<String> bootstrapTopics) {
this.zookeeperPort = zookeeperPort;
public LocalKafkaServer(final int kafkaBrokerPort, Collection<String> bootstrapTopics) {
this.kafkaBrokerPort = kafkaBrokerPort;
this.bootstrapTopics = bootstrapTopics;
}
Expand All @@ -55,39 +54,25 @@ private static Properties createAdminClientProps(String boostrapServer) {
return props;
}

private static Properties setupZookeperProperties(int zookeeperPort) {
Properties zkProperties = new Properties();
final String zookeeperTempInstanceDataDir = "" + System.currentTimeMillis(); // For å hindre NodeExists-feil på restart p.g.a. at data allerede finnes i katalogen.
zkProperties.put("dataDir", "target/zookeeper/" + zookeeperTempInstanceDataDir);
zkProperties.put("clientPort", "" + zookeeperPort);
zkProperties.put("admin.enableServer", "false");
zkProperties.put("jaasLoginRenew", "3600000");

zkProperties.put("authorizer.class.name", "kafka.security.auth.SimpleAclAuthorizer");
zkProperties.put("allow.everyone.if.no.acl.found", "true");
zkProperties.put("ssl.client.auth", "required");

zkProperties.put("ssl.keystore.location", KeystoreUtils.getKeystoreFilePath());
zkProperties.put("ssl.keystore.password", KeystoreUtils.getKeyStorePassword());
zkProperties.put("ssl.truststore.location", KeystoreUtils.getTruststoreFilePath());
zkProperties.put("ssl.truststore.password", KeystoreUtils.getTruststorePassword());

return zkProperties;
}

private static Properties setupKafkaProperties(int zookeeperPort) {
private static Properties setupKafkaProperties() {
Properties kafkaProperties = new Properties();
kafkaProperties.put("listener.security.protocol.map", "INTERNAL:SASL_SSL,EXTERNAL:SASL_SSL"); //TODO: Fjern når POC fungerer
kafkaProperties.put("zookeeper.connect", "localhost:" + zookeeperPort);
kafkaProperties.put("process.roles", "broker,controller");
kafkaProperties.put("node.id", "1001");
kafkaProperties.put("controller.quorum.voters", "1001@localhost:9095");
kafkaProperties.put("controller.listener.names", "CONTROLLER");
kafkaProperties.put("offsets.topic.replication.factor", "1");
kafkaProperties.put("log.dirs", "target/kafka-logs");
kafkaProperties.put("auto.create.topics.enable", "true");
kafkaProperties.put("listeners", "INTERNAL://:9092,EXTERNAL://:9093");
kafkaProperties.put("advertised.listeners", "INTERNAL://localhost:9092,EXTERNAL://vtp:9093");
kafkaProperties.put("listeners", "INTERNAL://:9092,EXTERNAL://:9093,PLAINTEXT://:9094,CONTROLLER://:9095");
kafkaProperties.put("advertised.listeners", "INTERNAL://localhost:9092,EXTERNAL://vtp:9093,PLAINTEXT://localhost:9094,CONTROLLER://localhost:9095");
kafkaProperties.put("socket.request.max.bytes", "369296130");
kafkaProperties.put("sasl.enabled.mechanisms", "DIGEST-MD5,PLAIN");
kafkaProperties.put("sasl.mechanism.inter.broker.protocol", "PLAIN");
kafkaProperties.put("inter.broker.listener.name", "INTERNAL");
kafkaProperties.put("listener.security.protocol.map", "INTERNAL:SASL_SSL,EXTERNAL:SASL_SSL,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT");

String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
kafkaProperties.put("SASL_SSL.".toLowerCase() + SaslConfigs.SASL_JAAS_CONFIG, String.format(jaasTemplate, "vtp", "vtp"));
Expand All @@ -101,10 +86,6 @@ private static Properties setupKafkaProperties(int zookeeperPort) {
return kafkaProperties;
}

public int getZookeperPort() {
return zookeeperPort;
}

public int getKafkaBrokerPort() {
return kafkaBrokerPort;
}
Expand All @@ -116,10 +97,9 @@ public AdminClient getKafkaAdminClient() {
public void start() {
final var bootstrapServers = String.format("%s:%s", "localhost", kafkaBrokerPort);

var kafkaProperties = setupKafkaProperties(zookeeperPort);
var zkProperties = setupZookeperProperties(zookeeperPort);
var kafkaProperties = setupKafkaProperties();
System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, "kafkasecurity.conf");
kafka = new KafkaLocal(kafkaProperties, zkProperties);
kafka = new KafkaLocal(kafkaProperties);
kafkaAdminClient = AdminClient.create(createAdminClientProps(bootstrapServers));
kafkaAdminClient.createTopics(
bootstrapTopics.stream().map(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ public MockServer() throws Exception {

ldapServer = new LdapServer(new File(KeystoreUtils.getKeystoreFilePath()), KeystoreUtils.getKeyStorePassword().toCharArray());
var kafkaBrokerPort = Integer.parseInt(System.getProperty("kafkaBrokerPort", "9092"));
var zookeeperPort = Integer.parseInt(System.getProperty("zookeeper.port", "2181"));
kafkaServer = new LocalKafkaServer(zookeeperPort, kafkaBrokerPort, getBootstrapTopics());
kafkaServer = new LocalKafkaServer(kafkaBrokerPort, getBootstrapTopics());
}

public static void main(String[] args) throws Exception {
Expand Down