Skip to content

Commit

Permalink
Test topic client using multiple brokers (#277)
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 authored Feb 20, 2025
1 parent 5491bde commit 341aeeb
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 8 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version=3.5.3-SNAPSHOT
org.gradle.caching=true
org.gradle.parallel=true
kafkaVersion=3.8.1
testContainersVersion=1.20.4
testContainersVersion=1.20.5
confluentVersion=7.8.0
fluentKafkaVersion=3.0.0
junitVersion=5.11.4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,24 @@

package com.bakdata.kafka.util;

import static com.bakdata.kafka.KafkaTest.KAFKA_VERSION;
import static com.bakdata.kafka.KafkaTestClient.defaultTopicSettings;
import static java.util.Collections.emptyMap;
import static org.assertj.core.api.Assertions.assertThat;

import com.bakdata.kafka.KafkaTest;
import com.bakdata.kafka.ApacheKafkaContainerCluster;
import java.time.Duration;
import java.util.Map;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

class TopicClientTest extends KafkaTest {
@Testcontainers
class TopicClientTest {

@Container
private final ApacheKafkaContainerCluster kafkaCluster = new ApacheKafkaContainerCluster(KAFKA_VERSION, 3, 2);

private static final Duration CLIENT_TIMEOUT = Duration.ofSeconds(10L);

Expand Down Expand Up @@ -81,21 +88,20 @@ void shouldCreateTopic() {
assertThat(client.exists("topic")).isFalse();
final TopicSettings settings = TopicSettings.builder()
.partitions(5)
// .replicationFactor((short) 2) // FIXME setup testcontainers with multiple brokers
.replicationFactor((short) 1)
.replicationFactor((short) 2)
.build();
client.createTopic("topic", settings, emptyMap());
assertThat(client.exists("topic")).isTrue();
assertThat(client.describe("topic"))
.satisfies(info -> {
assertThat(info.getReplicationFactor()).isEqualTo((short) 1);
assertThat(info.getReplicationFactor()).isEqualTo((short) 2);
assertThat(info.getPartitions()).isEqualTo(5);
});
}
}

private TopicClient createClient() {
final String brokerList = this.getBootstrapServers();
final String brokerList = this.kafkaCluster.getBootstrapServers();
final Map<String, Object> config = Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
return TopicClient.create(config, CLIENT_TIMEOUT);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import static org.awaitility.Awaitility.await;

import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.Getter;
import org.apache.kafka.common.Uuid;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.ContainerState;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.utility.DockerImageName;

//from https://github.com/testcontainers/testcontainers-java/blob/1404c4429c0cb98fe46534bf33632d25dc5309e4/examples/kafka-cluster/src/test/java/com/example/kafkacluster/ApacheKafkaContainerCluster.java
public class ApacheKafkaContainerCluster implements Startable {

private final int brokersNum;

private final Network network;

@Getter
private final Collection<KafkaContainer> brokers;

public ApacheKafkaContainerCluster(final String version, final int brokersNum, final int internalTopicsRf) {
if (brokersNum < 0) {
throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0");
}
if (internalTopicsRf < 0 || internalTopicsRf > brokersNum) {
throw new IllegalArgumentException(
"internalTopicsRf '" + internalTopicsRf + "' must be less than brokersNum and greater than 0"
);
}

this.brokersNum = brokersNum;
this.network = Network.newNetwork();

final String controllerQuorumVoters = IntStream.range(0, brokersNum)
.mapToObj(brokerNum -> String.format("%d@broker-%d:9094", brokerNum, brokerNum))
.collect(Collectors.joining(","));

final String clusterId = Uuid.randomUuid().toString();

final DockerImageName dockerImageName = DockerImageName.parse("apache/kafka").withTag(version);
this.brokers = IntStream.range(0, brokersNum)
.mapToObj(brokerNum -> new KafkaContainer(dockerImageName)
.withNetwork(this.network)
.withNetworkAliases("broker-" + brokerNum)
.withEnv("CLUSTER_ID", clusterId)
.withEnv("KAFKA_BROKER_ID", brokerNum + "")
.withEnv("KAFKA_NODE_ID", brokerNum + "")
.withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", controllerQuorumVoters)
.withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", internalTopicsRf + "")
.withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0")
.withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", internalTopicsRf + "")
.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", internalTopicsRf + "")
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", internalTopicsRf + "")
.withStartupTimeout(Duration.ofMinutes(1)))
.collect(Collectors.toList());
}

private static int getNumberOfReadyBrokers(final ContainerState container)
throws IOException, InterruptedException {
final Container.ExecResult result = container
.execInContainer(
"sh",
"-c",
"/opt/kafka/bin/kafka-log-dirs.sh --bootstrap-server localhost:9093 --describe | "
+ "grep -o '\"broker\"' | "
+ "wc -l"
);
final String brokers = result.getStdout().replace("\n", "");

return Integer.parseInt(brokers);
}

public String getBootstrapServers() {
return this.brokers.stream().map(KafkaContainer::getBootstrapServers).collect(Collectors.joining(","));
}

@Override
public void start() {
// Needs to start all the brokers at once
this.brokers.parallelStream().forEach(GenericContainer::start);

await()
.atMost(Duration.ofSeconds(120))
.until(() -> {
final KafkaContainer container = this.brokers.stream()
.findFirst()
.orElseThrow();
return getNumberOfReadyBrokers(container);
}, readyBrokers -> readyBrokers == this.brokersNum);
}

@Override
public void stop() {
this.brokers.stream().parallel().forEach(GenericContainer::stop);

this.network.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@
@Testcontainers
public abstract class KafkaTest {
protected static final Duration POLL_TIMEOUT = Duration.ofSeconds(10);
public static final String KAFKA_VERSION = "3.8.1";
private final TestTopologyFactory testTopologyFactory = TestTopologyFactory.withSchemaRegistry();
@Container
private final KafkaContainer kafkaCluster = newCluster();

public static KafkaContainer newCluster() {
return new KafkaContainer(DockerImageName.parse("apache/kafka-native")
.withTag("3.8.1"));
.withTag(KAFKA_VERSION));
}

private static ConditionFactory await() {
Expand Down

0 comments on commit 341aeeb

Please sign in to comment.