From b6c26f8498e18a638f770dadbf45f2e58f25662c Mon Sep 17 00:00:00 2001 From: emeroad Date: Thu, 1 Feb 2024 18:22:50 +0900 Subject: [PATCH] [#10531] Cleanup kafka configuration --- pinot/pinot-kafka/pom.xml | 12 +++ .../pinot/kafka/KafkaConfiguration.java | 68 ++++++++--------- .../pinpoint/pinot/kafka/KafkaProperties.java | 76 ------------------- .../src/main/resources/kafka-root.properties | 10 +++ .../local/kafka-producer-factory.properties | 6 +- .../release/kafka-producer-factory.properties | 6 +- .../pinot/kafka/KafkaConfigurationTest.java | 10 ++- 7 files changed, 62 insertions(+), 126 deletions(-) delete mode 100644 pinot/pinot-kafka/src/main/java/com/navercorp/pinpoint/pinot/kafka/KafkaProperties.java create mode 100644 pinot/pinot-kafka/src/main/resources/kafka-root.properties diff --git a/pinot/pinot-kafka/pom.xml b/pinot/pinot-kafka/pom.xml index 78869b35471e..812194a6e68b 100644 --- a/pinot/pinot-kafka/pom.xml +++ b/pinot/pinot-kafka/pom.xml @@ -47,6 +47,18 @@ spring-test test + + org.springframework.boot + spring-boot-autoconfigure + + + com.navercorp.pinpoint + pinpoint-commons-server + + + org.springframework.boot + spring-boot-test + diff --git a/pinot/pinot-kafka/src/main/java/com/navercorp/pinpoint/pinot/kafka/KafkaConfiguration.java b/pinot/pinot-kafka/src/main/java/com/navercorp/pinpoint/pinot/kafka/KafkaConfiguration.java index 8357801696f6..c40aab045db8 100644 --- a/pinot/pinot-kafka/src/main/java/com/navercorp/pinpoint/pinot/kafka/KafkaConfiguration.java +++ b/pinot/pinot-kafka/src/main/java/com/navercorp/pinpoint/pinot/kafka/KafkaConfiguration.java @@ -1,66 +1,60 @@ package com.navercorp.pinpoint.pinot.kafka; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; -import org.springframework.core.env.Environment; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.ProducerFactory; -import java.util.HashMap; import java.util.Map; -import java.util.function.Consumer; @Configuration -@PropertySource("classpath:/profiles/${pinpoint.profiles.active:release}/kafka-producer-factory.properties") +@PropertySource(value = { + "classpath:/kafka-root.properties", + "classpath:/profiles/${pinpoint.profiles.active:release}/kafka-producer-factory.properties" +}) +@EnableConfigurationProperties public class KafkaConfiguration { private final Logger logger = LogManager.getLogger(KafkaConfiguration.class); - public Map toConfig(KafkaProperties properties) { - Map config = new HashMap<>(); - config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers()); - - config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, properties.getKeySerializer()); - config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, properties.getValueSerializer()); - - config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, properties.getPartitionerClass()); - config.put(ProducerConfig.ACKS_CONFIG, properties.getAcks()); - config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, properties.getCompressionType()); - - return config; + public KafkaConfiguration() { + logger.info("Install {}", KafkaConfiguration.class); } @Bean - public ProducerFactory kafkaProducerFactory(KafkaProperties properties) { - logger.info("kafka {}:{}", ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers()); - logger.debug("kafka config:{}", properties); + public ProducerFactory kafkaProducerFactory( + ObjectProvider customizers) { - Map config = toConfig(properties); - return new DefaultKafkaProducerFactory<>(config); - } + KafkaProperties properties = kafkaProperties(); + // ref : KafkaAutoConfiguration + Map producerProperties = properties.buildProducerProperties(); + logger.info("ProducerProperties:{}", producerProperties); - @Bean - public KafkaProperties kafkaProperties(Environment env) { - KafkaProperties properties = new KafkaProperties(); + DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerProperties); - bindProperties(env, "pinpoint.metric.kafka.bootstrap.servers", properties::setBootstrapServers); - bindProperties(env, "pinpoint.metric.kafka.key.serializer", properties::setKeySerializer); - bindProperties(env, "pinpoint.metric.kafka.value.serializer", properties::setValueSerializer); - bindProperties(env, "pinpoint.metric.kafka.acks", properties::setAcks); - bindProperties(env, "pinpoint.metric.kafka.compressionType", properties::setCompressionType); + KafkaProperties.Producer producer = properties.getProducer(); + String transactionIdPrefix = producer.getTransactionIdPrefix(); + if (transactionIdPrefix != null) { + factory.setTransactionIdPrefix(transactionIdPrefix); + } - return properties; + customizers.orderedStream().forEach((customizer) -> customizer.customize(factory)); + return factory; } - private void bindProperties(Environment env, String key, Consumer consumer) { - String value = env.getProperty(key); - if (value != null) { - consumer.accept(value); - } + @Bean + @ConfigurationProperties(prefix = "pinpoint.metric.kafka") + public KafkaProperties kafkaProperties() { + return new KafkaProperties(); } + } diff --git a/pinot/pinot-kafka/src/main/java/com/navercorp/pinpoint/pinot/kafka/KafkaProperties.java b/pinot/pinot-kafka/src/main/java/com/navercorp/pinpoint/pinot/kafka/KafkaProperties.java deleted file mode 100644 index 8cb186d48ac5..000000000000 --- a/pinot/pinot-kafka/src/main/java/com/navercorp/pinpoint/pinot/kafka/KafkaProperties.java +++ /dev/null @@ -1,76 +0,0 @@ -package com.navercorp.pinpoint.pinot.kafka; - -import org.apache.kafka.clients.producer.internals.DefaultPartitioner; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.kafka.support.serializer.JsonSerializer; - -public class KafkaProperties { - private String bootstrapServers; - private String keySerializer = StringSerializer.class.getName(); - private String valueSerializer = JsonSerializer.class.getName(); - private String partitionerClass = DefaultPartitioner.class.getName(); - private String acks = "1"; - private String compressionType = "zstd"; - - - public String getBootstrapServers() { - return bootstrapServers; - } - - public void setBootstrapServers(String bootstrapServers) { - this.bootstrapServers = bootstrapServers; - } - - public String getKeySerializer() { - return keySerializer; - } - - public void setKeySerializer(String keySerializer) { - this.keySerializer = keySerializer; - } - - public String getValueSerializer() { - return valueSerializer; - } - - public void setValueSerializer(String valueSerializer) { - this.valueSerializer = valueSerializer; - } - - public String getPartitionerClass() { - return partitionerClass; - } - - public void setPartitionerClass(String partitionerClass) { - this.partitionerClass = partitionerClass; - } - - public String getAcks() { - return acks; - } - - public void setAcks(String acks) { - this.acks = acks; - } - - public String getCompressionType() { - return compressionType; - } - - public void setCompressionType(String compressionType) { - this.compressionType = compressionType; - } - - - @Override - public String toString() { - return "KafkaProperties{" + - "bootstrapServers='" + bootstrapServers + '\'' + - ", keySerializer='" + keySerializer + '\'' + - ", valueSerializer='" + valueSerializer + '\'' + - ", partitionerClass='" + partitionerClass + '\'' + - ", acks='" + acks + '\'' + - ", compressionType='" + compressionType + '\'' + - '}'; - } -} diff --git a/pinot/pinot-kafka/src/main/resources/kafka-root.properties b/pinot/pinot-kafka/src/main/resources/kafka-root.properties new file mode 100644 index 000000000000..15ef3e4f95c4 --- /dev/null +++ b/pinot/pinot-kafka/src/main/resources/kafka-root.properties @@ -0,0 +1,10 @@ +pinpoint.metric.kafka.bootstrapServers=----common-metric-kafka-address---- +pinpoint.metric.kafka.clientId=pinpoint-pinot-kafka + +pinpoint.metric.kafka.producer.keySerializer=org.apache.kafka.common.serialization.StringSerializer +pinpoint.metric.kafka.producer.valueSerializer=org.springframework.kafka.support.serializer.JsonSerializer +pinpoint.metric.kafka.producer.compressionType=zstd +pinpoint.metric.kafka.producer.acks=1 +pinpoint.metric.kafka.producer.properties.partitioner.class=org.apache.kafka.clients.producer.internals.DefaultPartitioner + + diff --git a/pinot/pinot-kafka/src/main/resources/profiles/local/kafka-producer-factory.properties b/pinot/pinot-kafka/src/main/resources/profiles/local/kafka-producer-factory.properties index 6002362ec92b..3b24c2109c7b 100644 --- a/pinot/pinot-kafka/src/main/resources/profiles/local/kafka-producer-factory.properties +++ b/pinot/pinot-kafka/src/main/resources/profiles/local/kafka-producer-factory.properties @@ -1,5 +1 @@ -pinpoint.metric.kafka.bootstrap.servers=----local-metric-kafka-address---- -#pinpoint.metric.kafka.key.serializer= -#pinpoint.metric.kafka.value.serializer= -#pinpoint.metric.kafka.acks=1 -#pinpoint.metric.compressionType=zstd \ No newline at end of file +pinpoint.metric.kafka.bootstrapServers=----local-metric-kafka-address---- diff --git a/pinot/pinot-kafka/src/main/resources/profiles/release/kafka-producer-factory.properties b/pinot/pinot-kafka/src/main/resources/profiles/release/kafka-producer-factory.properties index 210d0663dd5f..1a030c0a4285 100644 --- a/pinot/pinot-kafka/src/main/resources/profiles/release/kafka-producer-factory.properties +++ b/pinot/pinot-kafka/src/main/resources/profiles/release/kafka-producer-factory.properties @@ -1,5 +1 @@ -pinpoint.metric.kafka.bootstrap.servers=----release-metric-kafka-address---- -#pinpoint.metric.kafka.key.serializer= -#pinpoint.metric.kafka.value.serializer= -#pinpoint.metric.kafka.acks=1 -#pinpoint.metric.compressionType=zstd \ No newline at end of file +pinpoint.metric.kafka.bootstrapServers=----release-metric-kafka-address---- diff --git a/pinot/pinot-kafka/src/test/java/com/navercorp/pinpoint/pinot/kafka/KafkaConfigurationTest.java b/pinot/pinot-kafka/src/test/java/com/navercorp/pinpoint/pinot/kafka/KafkaConfigurationTest.java index 8fa4ac71bfba..a4df94b82f7c 100644 --- a/pinot/pinot-kafka/src/test/java/com/navercorp/pinpoint/pinot/kafka/KafkaConfigurationTest.java +++ b/pinot/pinot-kafka/src/test/java/com/navercorp/pinpoint/pinot/kafka/KafkaConfigurationTest.java @@ -1,7 +1,7 @@ package com.navercorp.pinpoint.pinot.kafka; import org.apache.kafka.clients.producer.ProducerConfig; -import org.junit.jupiter.api.Assertions; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.springframework.beans.factory.annotation.Autowired; @@ -10,18 +10,22 @@ import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringExtension; +import java.util.List; + /** * @author Woonduk Kang(emeroad) */ @ExtendWith(SpringExtension.class) @ContextConfiguration(classes = KafkaConfiguration.class ) -@TestPropertySource(properties = "pinpoint.metric.kafka.bootstrap.servers=test-kafka-bootstrap") +@TestPropertySource(properties = "pinpoint.metric.kafka.bootstrapServers=test-kafka-bootstrap") class KafkaConfigurationTest { @Autowired ProducerFactory producerFactory; @Test void test() { - Assertions.assertEquals("test-kafka-bootstrap", producerFactory.getConfigurationProperties().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); + List actual = (List) producerFactory.getConfigurationProperties().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG); + Assertions.assertThat(actual) + .containsExactly("test-kafka-bootstrap"); } } \ No newline at end of file