diff --git a/pinot/pinot-kafka/pom.xml b/pinot/pinot-kafka/pom.xml
index 78869b35471e..812194a6e68b 100644
--- a/pinot/pinot-kafka/pom.xml
+++ b/pinot/pinot-kafka/pom.xml
@@ -47,6 +47,18 @@
spring-test
test
+
+ org.springframework.boot
+ spring-boot-autoconfigure
+
+
+ com.navercorp.pinpoint
+ pinpoint-commons-server
+
+
+ org.springframework.boot
+ spring-boot-test
+
diff --git a/pinot/pinot-kafka/src/main/java/com/navercorp/pinpoint/pinot/kafka/KafkaConfiguration.java b/pinot/pinot-kafka/src/main/java/com/navercorp/pinpoint/pinot/kafka/KafkaConfiguration.java
index 8357801696f6..c40aab045db8 100644
--- a/pinot/pinot-kafka/src/main/java/com/navercorp/pinpoint/pinot/kafka/KafkaConfiguration.java
+++ b/pinot/pinot-kafka/src/main/java/com/navercorp/pinpoint/pinot/kafka/KafkaConfiguration.java
@@ -1,66 +1,60 @@
package com.navercorp.pinpoint.pinot.kafka;
-import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.springframework.beans.factory.ObjectProvider;
+import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
-import org.springframework.core.env.Environment;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.ProducerFactory;
-import java.util.HashMap;
import java.util.Map;
-import java.util.function.Consumer;
@Configuration
-@PropertySource("classpath:/profiles/${pinpoint.profiles.active:release}/kafka-producer-factory.properties")
+@PropertySource(value = {
+ "classpath:/kafka-root.properties",
+ "classpath:/profiles/${pinpoint.profiles.active:release}/kafka-producer-factory.properties"
+})
+@EnableConfigurationProperties
public class KafkaConfiguration {
private final Logger logger = LogManager.getLogger(KafkaConfiguration.class);
- public Map toConfig(KafkaProperties properties) {
- Map config = new HashMap<>();
- config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
-
- config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, properties.getKeySerializer());
- config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, properties.getValueSerializer());
-
- config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, properties.getPartitionerClass());
- config.put(ProducerConfig.ACKS_CONFIG, properties.getAcks());
- config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, properties.getCompressionType());
-
- return config;
+ public KafkaConfiguration() {
+ logger.info("Install {}", KafkaConfiguration.class);
}
@Bean
- public ProducerFactory kafkaProducerFactory(KafkaProperties properties) {
- logger.info("kafka {}:{}", ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
- logger.debug("kafka config:{}", properties);
+ public ProducerFactory, ?> kafkaProducerFactory(
+ ObjectProvider customizers) {
- Map config = toConfig(properties);
- return new DefaultKafkaProducerFactory<>(config);
- }
+ KafkaProperties properties = kafkaProperties();
+ // ref : KafkaAutoConfiguration
+ Map producerProperties = properties.buildProducerProperties();
+ logger.info("ProducerProperties:{}", producerProperties);
- @Bean
- public KafkaProperties kafkaProperties(Environment env) {
- KafkaProperties properties = new KafkaProperties();
+ DefaultKafkaProducerFactory, ?> factory = new DefaultKafkaProducerFactory<>(producerProperties);
- bindProperties(env, "pinpoint.metric.kafka.bootstrap.servers", properties::setBootstrapServers);
- bindProperties(env, "pinpoint.metric.kafka.key.serializer", properties::setKeySerializer);
- bindProperties(env, "pinpoint.metric.kafka.value.serializer", properties::setValueSerializer);
- bindProperties(env, "pinpoint.metric.kafka.acks", properties::setAcks);
- bindProperties(env, "pinpoint.metric.kafka.compressionType", properties::setCompressionType);
+ KafkaProperties.Producer producer = properties.getProducer();
+ String transactionIdPrefix = producer.getTransactionIdPrefix();
+ if (transactionIdPrefix != null) {
+ factory.setTransactionIdPrefix(transactionIdPrefix);
+ }
- return properties;
+ customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
+ return factory;
}
- private void bindProperties(Environment env, String key, Consumer consumer) {
- String value = env.getProperty(key);
- if (value != null) {
- consumer.accept(value);
- }
+ @Bean
+ @ConfigurationProperties(prefix = "pinpoint.metric.kafka")
+ public KafkaProperties kafkaProperties() {
+ return new KafkaProperties();
}
+
}
diff --git a/pinot/pinot-kafka/src/main/java/com/navercorp/pinpoint/pinot/kafka/KafkaProperties.java b/pinot/pinot-kafka/src/main/java/com/navercorp/pinpoint/pinot/kafka/KafkaProperties.java
deleted file mode 100644
index 8cb186d48ac5..000000000000
--- a/pinot/pinot-kafka/src/main/java/com/navercorp/pinpoint/pinot/kafka/KafkaProperties.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package com.navercorp.pinpoint.pinot.kafka;
-
-import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.kafka.support.serializer.JsonSerializer;
-
-public class KafkaProperties {
- private String bootstrapServers;
- private String keySerializer = StringSerializer.class.getName();
- private String valueSerializer = JsonSerializer.class.getName();
- private String partitionerClass = DefaultPartitioner.class.getName();
- private String acks = "1";
- private String compressionType = "zstd";
-
-
- public String getBootstrapServers() {
- return bootstrapServers;
- }
-
- public void setBootstrapServers(String bootstrapServers) {
- this.bootstrapServers = bootstrapServers;
- }
-
- public String getKeySerializer() {
- return keySerializer;
- }
-
- public void setKeySerializer(String keySerializer) {
- this.keySerializer = keySerializer;
- }
-
- public String getValueSerializer() {
- return valueSerializer;
- }
-
- public void setValueSerializer(String valueSerializer) {
- this.valueSerializer = valueSerializer;
- }
-
- public String getPartitionerClass() {
- return partitionerClass;
- }
-
- public void setPartitionerClass(String partitionerClass) {
- this.partitionerClass = partitionerClass;
- }
-
- public String getAcks() {
- return acks;
- }
-
- public void setAcks(String acks) {
- this.acks = acks;
- }
-
- public String getCompressionType() {
- return compressionType;
- }
-
- public void setCompressionType(String compressionType) {
- this.compressionType = compressionType;
- }
-
-
- @Override
- public String toString() {
- return "KafkaProperties{" +
- "bootstrapServers='" + bootstrapServers + '\'' +
- ", keySerializer='" + keySerializer + '\'' +
- ", valueSerializer='" + valueSerializer + '\'' +
- ", partitionerClass='" + partitionerClass + '\'' +
- ", acks='" + acks + '\'' +
- ", compressionType='" + compressionType + '\'' +
- '}';
- }
-}
diff --git a/pinot/pinot-kafka/src/main/resources/kafka-root.properties b/pinot/pinot-kafka/src/main/resources/kafka-root.properties
new file mode 100644
index 000000000000..15ef3e4f95c4
--- /dev/null
+++ b/pinot/pinot-kafka/src/main/resources/kafka-root.properties
@@ -0,0 +1,10 @@
+pinpoint.metric.kafka.bootstrapServers=----common-metric-kafka-address----
+pinpoint.metric.kafka.clientId=pinpoint-pinot-kafka
+
+pinpoint.metric.kafka.producer.keySerializer=org.apache.kafka.common.serialization.StringSerializer
+pinpoint.metric.kafka.producer.valueSerializer=org.springframework.kafka.support.serializer.JsonSerializer
+pinpoint.metric.kafka.producer.compressionType=zstd
+pinpoint.metric.kafka.producer.acks=1
+pinpoint.metric.kafka.producer.properties.partitioner.class=org.apache.kafka.clients.producer.internals.DefaultPartitioner
+
+
diff --git a/pinot/pinot-kafka/src/main/resources/profiles/local/kafka-producer-factory.properties b/pinot/pinot-kafka/src/main/resources/profiles/local/kafka-producer-factory.properties
index 6002362ec92b..3b24c2109c7b 100644
--- a/pinot/pinot-kafka/src/main/resources/profiles/local/kafka-producer-factory.properties
+++ b/pinot/pinot-kafka/src/main/resources/profiles/local/kafka-producer-factory.properties
@@ -1,5 +1 @@
-pinpoint.metric.kafka.bootstrap.servers=----local-metric-kafka-address----
-#pinpoint.metric.kafka.key.serializer=
-#pinpoint.metric.kafka.value.serializer=
-#pinpoint.metric.kafka.acks=1
-#pinpoint.metric.compressionType=zstd
\ No newline at end of file
+pinpoint.metric.kafka.bootstrapServers=----local-metric-kafka-address----
diff --git a/pinot/pinot-kafka/src/main/resources/profiles/release/kafka-producer-factory.properties b/pinot/pinot-kafka/src/main/resources/profiles/release/kafka-producer-factory.properties
index 210d0663dd5f..1a030c0a4285 100644
--- a/pinot/pinot-kafka/src/main/resources/profiles/release/kafka-producer-factory.properties
+++ b/pinot/pinot-kafka/src/main/resources/profiles/release/kafka-producer-factory.properties
@@ -1,5 +1 @@
-pinpoint.metric.kafka.bootstrap.servers=----release-metric-kafka-address----
-#pinpoint.metric.kafka.key.serializer=
-#pinpoint.metric.kafka.value.serializer=
-#pinpoint.metric.kafka.acks=1
-#pinpoint.metric.compressionType=zstd
\ No newline at end of file
+pinpoint.metric.kafka.bootstrapServers=----release-metric-kafka-address----
diff --git a/pinot/pinot-kafka/src/test/java/com/navercorp/pinpoint/pinot/kafka/KafkaConfigurationTest.java b/pinot/pinot-kafka/src/test/java/com/navercorp/pinpoint/pinot/kafka/KafkaConfigurationTest.java
index 8fa4ac71bfba..a4df94b82f7c 100644
--- a/pinot/pinot-kafka/src/test/java/com/navercorp/pinpoint/pinot/kafka/KafkaConfigurationTest.java
+++ b/pinot/pinot-kafka/src/test/java/com/navercorp/pinpoint/pinot/kafka/KafkaConfigurationTest.java
@@ -1,7 +1,7 @@
package com.navercorp.pinpoint.pinot.kafka;
import org.apache.kafka.clients.producer.ProducerConfig;
-import org.junit.jupiter.api.Assertions;
+import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
@@ -10,18 +10,22 @@
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
+import java.util.List;
+
/**
* @author Woonduk Kang(emeroad)
*/
@ExtendWith(SpringExtension.class)
@ContextConfiguration(classes = KafkaConfiguration.class )
-@TestPropertySource(properties = "pinpoint.metric.kafka.bootstrap.servers=test-kafka-bootstrap")
+@TestPropertySource(properties = "pinpoint.metric.kafka.bootstrapServers=test-kafka-bootstrap")
class KafkaConfigurationTest {
@Autowired
ProducerFactory producerFactory;
@Test
void test() {
- Assertions.assertEquals("test-kafka-bootstrap", producerFactory.getConfigurationProperties().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
+ List actual = (List) producerFactory.getConfigurationProperties().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
+ Assertions.assertThat(actual)
+ .containsExactly("test-kafka-bootstrap");
}
}
\ No newline at end of file