From fb3d6bd6f71b6c1b1bf5d8d464334990b1f1349a Mon Sep 17 00:00:00 2001 From: Jens-Otto Larsen <46576810+jolarsen@users.noreply.github.com> Date: Mon, 24 Mar 2025 19:06:12 +0100 Subject: [PATCH] =?UTF-8?q?Fors=C3=B8k=20med=20KRaft=20uten=20ZK=20(#1615)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../vtp/kafkaembedded/KafkaLocal.java | 40 +++++++------ .../vtp/kafkaembedded/LocalKafkaServer.java | 52 ++++++----------- .../vtp/kafkaembedded/ZooKeeperLocal.java | 57 ------------------- .../foreldrepenger/vtp/server/MockServer.java | 3 +- 4 files changed, 36 insertions(+), 116 deletions(-) delete mode 100644 mocks/kafka-embedded-mock/src/main/java/no/nav/foreldrepenger/vtp/kafkaembedded/ZooKeeperLocal.java diff --git a/mocks/kafka-embedded-mock/src/main/java/no/nav/foreldrepenger/vtp/kafkaembedded/KafkaLocal.java b/mocks/kafka-embedded-mock/src/main/java/no/nav/foreldrepenger/vtp/kafkaembedded/KafkaLocal.java index 452e1677c..e05da3a6f 100644 --- a/mocks/kafka-embedded-mock/src/main/java/no/nav/foreldrepenger/vtp/kafkaembedded/KafkaLocal.java +++ b/mocks/kafka-embedded-mock/src/main/java/no/nav/foreldrepenger/vtp/kafkaembedded/KafkaLocal.java @@ -1,46 +1,44 @@ package no.nav.foreldrepenger.vtp.kafkaembedded; -import kafka.server.KafkaConfig; -import kafka.server.KafkaServer; +import java.util.Properties; + +import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Properties; +import kafka.server.KafkaConfig; +import kafka.server.KafkaRaftServer; class KafkaLocal { private Logger LOG = LoggerFactory.getLogger(KafkaLocal.class); - private KafkaServer kafka; - private ZooKeeperLocal zookeeper; + private KafkaRaftServer kafka; - KafkaLocal(Properties kafkaProperties, Properties zkProperties) { + KafkaLocal(Properties kafkaProperties) { var kafkaConfig = new KafkaConfig(kafkaProperties); - startZookeeper(zkProperties); startKafka(kafkaConfig); } - private void startZookeeper(Properties zkProperties) { - LOG.info("starting local zookeeper..."); - zookeeper = new ZooKeeperLocal(zkProperties); - } - private void startKafka(KafkaConfig kafkaConfig) { - kafka = new KafkaServer(kafkaConfig, - KafkaServer.$lessinit$greater$default$2(), - KafkaServer.$lessinit$greater$default$3(), - true); - LOG.info("starting local kafka broker..."); + LOG.info("Starting Kafka in KRaft mode..."); + kafka = new KafkaRaftServer(kafkaConfig, Time.SYSTEM); kafka.startup(); + LOG.info("Kafka started successfully in KRaft mode"); } void stop() { - LOG.info("stopping kafka..."); - kafka.shutdown(); - zookeeper.stop(); + LOG.info("Stopping Kafka..."); + if (kafka != null) { + kafka.shutdown(); + kafka.awaitShutdown(); + } + // Delete temp directories + LOG.info("Kafka stopped"); } - public KafkaServer getKafka() { + + public KafkaRaftServer getKafka() { return kafka; } } diff --git a/mocks/kafka-embedded-mock/src/main/java/no/nav/foreldrepenger/vtp/kafkaembedded/LocalKafkaServer.java b/mocks/kafka-embedded-mock/src/main/java/no/nav/foreldrepenger/vtp/kafkaembedded/LocalKafkaServer.java index 3822cc511..4d5f241d1 100644 --- a/mocks/kafka-embedded-mock/src/main/java/no/nav/foreldrepenger/vtp/kafkaembedded/LocalKafkaServer.java +++ b/mocks/kafka-embedded-mock/src/main/java/no/nav/foreldrepenger/vtp/kafkaembedded/LocalKafkaServer.java @@ -1,6 +1,9 @@ package no.nav.foreldrepenger.vtp.kafkaembedded; -import no.nav.foreldrepenger.util.KeystoreUtils; +import java.util.Collection; +import java.util.Properties; +import java.util.stream.Collectors; + import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.ProducerConfig; @@ -12,9 +15,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collection; -import java.util.Properties; -import java.util.stream.Collectors; +import no.nav.foreldrepenger.util.KeystoreUtils; public class LocalKafkaServer { @@ -24,11 +25,9 @@ public class LocalKafkaServer { private KafkaLocal kafka; private LocalKafkaProducer localProducer; private AdminClient kafkaAdminClient; - private int zookeeperPort; private int kafkaBrokerPort; - public LocalKafkaServer(final int zookeeperPort, final int kafkaBrokerPort, Collection bootstrapTopics) { - this.zookeeperPort = zookeeperPort; + public LocalKafkaServer(final int kafkaBrokerPort, Collection bootstrapTopics) { this.kafkaBrokerPort = kafkaBrokerPort; this.bootstrapTopics = bootstrapTopics; } @@ -55,39 +54,25 @@ private static Properties createAdminClientProps(String boostrapServer) { return props; } - private static Properties setupZookeperProperties(int zookeeperPort) { - Properties zkProperties = new Properties(); - final String zookeeperTempInstanceDataDir = "" + System.currentTimeMillis(); // For å hindre NodeExists-feil på restart p.g.a. at data allerede finnes i katalogen. - zkProperties.put("dataDir", "target/zookeeper/" + zookeeperTempInstanceDataDir); - zkProperties.put("clientPort", "" + zookeeperPort); - zkProperties.put("admin.enableServer", "false"); - zkProperties.put("jaasLoginRenew", "3600000"); - - zkProperties.put("authorizer.class.name", "kafka.security.auth.SimpleAclAuthorizer"); - zkProperties.put("allow.everyone.if.no.acl.found", "true"); - zkProperties.put("ssl.client.auth", "required"); - zkProperties.put("ssl.keystore.location", KeystoreUtils.getKeystoreFilePath()); - zkProperties.put("ssl.keystore.password", KeystoreUtils.getKeyStorePassword()); - zkProperties.put("ssl.truststore.location", KeystoreUtils.getTruststoreFilePath()); - zkProperties.put("ssl.truststore.password", KeystoreUtils.getTruststorePassword()); - return zkProperties; - } - private static Properties setupKafkaProperties(int zookeeperPort) { + private static Properties setupKafkaProperties() { Properties kafkaProperties = new Properties(); - kafkaProperties.put("listener.security.protocol.map", "INTERNAL:SASL_SSL,EXTERNAL:SASL_SSL"); //TODO: Fjern når POC fungerer - kafkaProperties.put("zookeeper.connect", "localhost:" + zookeeperPort); + kafkaProperties.put("process.roles", "broker,controller"); + kafkaProperties.put("node.id", "1001"); + kafkaProperties.put("controller.quorum.voters", "1001@localhost:9095"); + kafkaProperties.put("controller.listener.names", "CONTROLLER"); kafkaProperties.put("offsets.topic.replication.factor", "1"); kafkaProperties.put("log.dirs", "target/kafka-logs"); kafkaProperties.put("auto.create.topics.enable", "true"); - kafkaProperties.put("listeners", "INTERNAL://:9092,EXTERNAL://:9093"); - kafkaProperties.put("advertised.listeners", "INTERNAL://localhost:9092,EXTERNAL://vtp:9093"); + kafkaProperties.put("listeners", "INTERNAL://:9092,EXTERNAL://:9093,PLAINTEXT://:9094,CONTROLLER://:9095"); + kafkaProperties.put("advertised.listeners", "INTERNAL://localhost:9092,EXTERNAL://vtp:9093,PLAINTEXT://localhost:9094,CONTROLLER://localhost:9095"); kafkaProperties.put("socket.request.max.bytes", "369296130"); kafkaProperties.put("sasl.enabled.mechanisms", "DIGEST-MD5,PLAIN"); kafkaProperties.put("sasl.mechanism.inter.broker.protocol", "PLAIN"); kafkaProperties.put("inter.broker.listener.name", "INTERNAL"); + kafkaProperties.put("listener.security.protocol.map", "INTERNAL:SASL_SSL,EXTERNAL:SASL_SSL,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"); String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"; kafkaProperties.put("SASL_SSL.".toLowerCase() + SaslConfigs.SASL_JAAS_CONFIG, String.format(jaasTemplate, "vtp", "vtp")); @@ -101,10 +86,6 @@ private static Properties setupKafkaProperties(int zookeeperPort) { return kafkaProperties; } - public int getZookeperPort() { - return zookeeperPort; - } - public int getKafkaBrokerPort() { return kafkaBrokerPort; } @@ -116,10 +97,9 @@ public AdminClient getKafkaAdminClient() { public void start() { final var bootstrapServers = String.format("%s:%s", "localhost", kafkaBrokerPort); - var kafkaProperties = setupKafkaProperties(zookeeperPort); - var zkProperties = setupZookeperProperties(zookeeperPort); + var kafkaProperties = setupKafkaProperties(); System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, "kafkasecurity.conf"); - kafka = new KafkaLocal(kafkaProperties, zkProperties); + kafka = new KafkaLocal(kafkaProperties); kafkaAdminClient = AdminClient.create(createAdminClientProps(bootstrapServers)); kafkaAdminClient.createTopics( bootstrapTopics.stream().map( diff --git a/mocks/kafka-embedded-mock/src/main/java/no/nav/foreldrepenger/vtp/kafkaembedded/ZooKeeperLocal.java b/mocks/kafka-embedded-mock/src/main/java/no/nav/foreldrepenger/vtp/kafkaembedded/ZooKeeperLocal.java deleted file mode 100644 index c2e70ce13..000000000 --- a/mocks/kafka-embedded-mock/src/main/java/no/nav/foreldrepenger/vtp/kafkaembedded/ZooKeeperLocal.java +++ /dev/null @@ -1,57 +0,0 @@ -package no.nav.foreldrepenger.vtp.kafkaembedded; - -import java.io.IOException; -import java.util.Properties; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.apache.zookeeper.server.ServerConfig; -import org.apache.zookeeper.server.ZooKeeperServerMain; -import org.apache.zookeeper.server.admin.AdminServer; -import org.apache.zookeeper.server.quorum.QuorumPeerConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -class ZooKeeperLocal { - private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperLocal.class); - private final Thread t; - private final ZooKeeperServerMain zooKeeperServer; - - ZooKeeperLocal(Properties zkProperties) { - QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig(); - try { - quorumConfiguration.parseProperties(zkProperties); - } catch (Exception e) { - throw new RuntimeException(e); - } - - zooKeeperServer = new ZooKeeperServerMain(); - final ServerConfig configuration = new ServerConfig(); - configuration.readFrom(quorumConfiguration); - - var started = new CountDownLatch(1); - t = new Thread(() -> { - try { - started.countDown(); // here we go - zooKeeperServer.runFromConfig(configuration); - } catch (IOException | AdminServer.AdminServerException e) { - LOG.error("Zookeeper failed: ", e); - } - }); - t.start(); - // Vent på zookeeper start - try { - if (!started.await(5, TimeUnit.SECONDS)) { - throw new IllegalStateException("Could not start Zookeeper in time (5 secs)"); - } - int sekunder = 1; - Thread.sleep(sekunder * 1000L); // vent littegrann til på zookeeper - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - void stop() { - t.interrupt(); - } -} diff --git a/server/src/main/java/no/nav/foreldrepenger/vtp/server/MockServer.java b/server/src/main/java/no/nav/foreldrepenger/vtp/server/MockServer.java index d5fc330ab..f94ba445d 100644 --- a/server/src/main/java/no/nav/foreldrepenger/vtp/server/MockServer.java +++ b/server/src/main/java/no/nav/foreldrepenger/vtp/server/MockServer.java @@ -67,8 +67,7 @@ public MockServer() throws Exception { ldapServer = new LdapServer(new File(KeystoreUtils.getKeystoreFilePath()), KeystoreUtils.getKeyStorePassword().toCharArray()); var kafkaBrokerPort = Integer.parseInt(System.getProperty("kafkaBrokerPort", "9092")); - var zookeeperPort = Integer.parseInt(System.getProperty("zookeeper.port", "2181")); - kafkaServer = new LocalKafkaServer(zookeeperPort, kafkaBrokerPort, getBootstrapTopics()); + kafkaServer = new LocalKafkaServer(kafkaBrokerPort, getBootstrapTopics()); } public static void main(String[] args) throws Exception {