Skip to content

Commit

Permalink
[#10531] Cleanup kafka configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Dec 22, 2023
1 parent 8298a9e commit 680d2d2
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 124 deletions.
12 changes: 12 additions & 0 deletions pinot/pinot-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@
<artifactId>spring-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>com.navercorp.pinpoint</groupId>
<artifactId>pinpoint-commons-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
</dependency>

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,66 +1,60 @@
package com.navercorp.pinpoint.pinot.kafka;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.env.Environment;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;

@Configuration
@PropertySource("classpath:/profiles/${pinpoint.profiles.active:release}/kafka-producer-factory.properties")
@PropertySource(value = {
"classpath:/kafka-root.properties",
"classpath:/profiles/${pinpoint.profiles.active:release}/kafka-producer-factory.properties"
})
@EnableConfigurationProperties
public class KafkaConfiguration {

private final Logger logger = LogManager.getLogger(KafkaConfiguration.class);

public Map<String, Object> toConfig(KafkaProperties properties) {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());

config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, properties.getKeySerializer());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, properties.getValueSerializer());

config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, properties.getPartitionerClass());
config.put(ProducerConfig.ACKS_CONFIG, properties.getAcks());
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, properties.getCompressionType());

return config;
public KafkaConfiguration() {
logger.info("Install {}", KafkaConfiguration.class);
}

@Bean
public ProducerFactory kafkaProducerFactory(KafkaProperties properties) {
logger.info("kafka {}:{}", ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
logger.debug("kafka config:{}", properties);
public ProducerFactory<?, ?> kafkaProducerFactory(
ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {

Map<String, Object> config = toConfig(properties);
return new DefaultKafkaProducerFactory<>(config);
}
KafkaProperties properties = kafkaProperties();
// ref : KafkaAutoConfiguration
Map<String, Object> producerProperties = properties.buildProducerProperties();
logger.info("ProducerProperties:{}", producerProperties);

@Bean
public KafkaProperties kafkaProperties(Environment env) {
KafkaProperties properties = new KafkaProperties();
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(producerProperties);

bindProperties(env, "pinpoint.metric.kafka.bootstrap.servers", properties::setBootstrapServers);
bindProperties(env, "pinpoint.metric.kafka.key.serializer", properties::setKeySerializer);
bindProperties(env, "pinpoint.metric.kafka.value.serializer", properties::setValueSerializer);
bindProperties(env, "pinpoint.metric.kafka.acks", properties::setAcks);
bindProperties(env, "pinpoint.metric.kafka.compressionType", properties::setCompressionType);
KafkaProperties.Producer producer = properties.getProducer();
String transactionIdPrefix = producer.getTransactionIdPrefix();
if (transactionIdPrefix != null) {
factory.setTransactionIdPrefix(transactionIdPrefix);
}

return properties;
customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
return factory;
}

private void bindProperties(Environment env, String key, Consumer<String> consumer) {
String value = env.getProperty(key);
if (value != null) {
consumer.accept(value);
}
@Bean
@ConfigurationProperties(prefix = "pinpoint.metric.kafka")
public KafkaProperties kafkaProperties() {
return new KafkaProperties();
}


}

This file was deleted.

10 changes: 10 additions & 0 deletions pinot/pinot-kafka/src/main/resources/kafka-root.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
pinpoint.metric.kafka.bootstrapServers=----common-metric-kafka-address----
pinpoint.metric.kafka.clientId=pinpoint-pinot-kafka

pinpoint.metric.kafka.producer.keySerializer=org.apache.kafka.common.serialization.StringSerializer
pinpoint.metric.kafka.producer.valueSerializer=org.springframework.kafka.support.serializer.JsonSerializer
pinpoint.metric.kafka.producer.compressionType=zstd
pinpoint.metric.kafka.producer.acks=1
pinpoint.metric.kafka.producer.properties.partitioner.class=org.apache.kafka.clients.producer.internals.DefaultPartitioner


Original file line number Diff line number Diff line change
@@ -1,5 +1 @@
pinpoint.metric.kafka.bootstrap.servers=----local-metric-kafka-address----
#pinpoint.metric.kafka.key.serializer=
#pinpoint.metric.kafka.value.serializer=
#pinpoint.metric.kafka.acks=1
#pinpoint.metric.compressionType=zstd
pinpoint.metric.kafka.bootstrapServers=----local-metric-kafka-address----
Original file line number Diff line number Diff line change
@@ -1,5 +1 @@
pinpoint.metric.kafka.bootstrap.servers=----release-metric-kafka-address----
#pinpoint.metric.kafka.key.serializer=
#pinpoint.metric.kafka.value.serializer=
#pinpoint.metric.kafka.acks=1
#pinpoint.metric.compressionType=zstd
pinpoint.metric.kafka.bootstrapServers=----release-metric-kafka-address----
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
@ExtendWith(SpringExtension.class)
@ContextConfiguration(classes = KafkaConfiguration.class )
@TestPropertySource(properties = "pinpoint.metric.kafka.bootstrap.servers=test-kafka-bootstrap")
@TestPropertySource(properties = "pinpoint.metric.kafka.bootstrapServers=test-kafka-bootstrap")
class KafkaConfigurationTest {
@Autowired
ProducerFactory producerFactory;
Expand Down

0 comments on commit 680d2d2

Please sign in to comment.