Skip to content

Commit

Permalink
[#noissue] Cleanup Kafka Configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Dec 15, 2023
1 parent be8e181 commit e222f24
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 126 deletions.
4 changes: 4 additions & 0 deletions pinot/pinot-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@
<artifactId>spring-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,66 +1,60 @@
package com.navercorp.pinpoint.pinot.kafka;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.env.Environment;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;

@Configuration
@PropertySource("classpath:/profiles/${pinpoint.profiles.active:release}/kafka-producer-factory.properties")
@PropertySource({
"classpath:/common-kafka.properties",
"classpath:/profiles/${pinpoint.profiles.active:release}/kafka-producer-factory.properties"
})
@EnableConfigurationProperties
public class KafkaConfiguration {

private final Logger logger = LogManager.getLogger(KafkaConfiguration.class);

public Map<String, Object> toConfig(KafkaProperties properties) {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());

config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, properties.getKeySerializer());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, properties.getValueSerializer());

config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, properties.getPartitionerClass());
config.put(ProducerConfig.ACKS_CONFIG, properties.getAcks());
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, properties.getCompressionType());

return config;
public KafkaConfiguration() {
logger.info("Install {}", KafkaConfiguration.class);
}

@Bean
public ProducerFactory kafkaProducerFactory(KafkaProperties properties) {
logger.info("kafka {}:{}", ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
logger.debug("kafka config:{}", properties);
public ProducerFactory<?, ?> kafkaProducerFactory(
ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {

Map<String, Object> config = toConfig(properties);
return new DefaultKafkaProducerFactory<>(config);
}
KafkaProperties properties = kafkaProperties();
// ref : KafkaAutoConfiguration
Map<String, Object> producerProperties = properties.buildProducerProperties();
logger.info("ProducerProperties:{}", producerProperties);

@Bean
public KafkaProperties kafkaProperties(Environment env) {
KafkaProperties properties = new KafkaProperties();
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(producerProperties);

bindProperties(env, "pinpoint.metric.kafka.bootstrap.servers", properties::setBootstrapServers);
bindProperties(env, "pinpoint.metric.kafka.key.serializer", properties::setKeySerializer);
bindProperties(env, "pinpoint.metric.kafka.value.serializer", properties::setValueSerializer);
bindProperties(env, "pinpoint.metric.kafka.acks", properties::setAcks);
bindProperties(env, "pinpoint.metric.kafka.compressionType", properties::setCompressionType);
KafkaProperties.Producer producer = properties.getProducer();
String transactionIdPrefix = producer.getTransactionIdPrefix();
if (transactionIdPrefix != null) {
factory.setTransactionIdPrefix(transactionIdPrefix);

Check warning on line 46 in pinot/pinot-kafka/src/main/java/com/navercorp/pinpoint/pinot/kafka/KafkaConfiguration.java

View check run for this annotation

Codecov / codecov/patch

pinot/pinot-kafka/src/main/java/com/navercorp/pinpoint/pinot/kafka/KafkaConfiguration.java#L46

Added line #L46 was not covered by tests
}

return properties;
customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
return factory;
}

private void bindProperties(Environment env, String key, Consumer<String> consumer) {
String value = env.getProperty(key);
if (value != null) {
consumer.accept(value);
}
@Bean
@ConfigurationProperties(prefix = "pinpoint.metric.kafka")
public KafkaProperties kafkaProperties() {
return new KafkaProperties();
}


}

This file was deleted.

7 changes: 7 additions & 0 deletions pinot/pinot-kafka/src/main/resources/common-kafka.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pinpoint.metric.kafka.clientId=pinpoint-pinot-kafka

pinpoint.metric.kafka.producer.keySerializer=org.apache.kafka.common.serialization.StringSerializer
pinpoint.metric.kafka.producer.valueSerializer=org.springframework.kafka.support.serializer.JsonSerializer
pinpoint.metric.kafka.producer.compressionType=zstd
pinpoint.metric.kafka.producer.acks=1
pinpoint.metric.kafka.producer.properties.partitioner.class=org.apache.kafka.clients.producer.internals.DefaultPartitioner
Original file line number Diff line number Diff line change
@@ -1,5 +1 @@
pinpoint.metric.kafka.bootstrap.servers=----local-metric-kafka-address----
#pinpoint.metric.kafka.key.serializer=
#pinpoint.metric.kafka.value.serializer=
#pinpoint.metric.kafka.acks=1
#pinpoint.metric.compressionType=zstd
pinpoint.metric.kafka.bootstrapServers=----local-profile-metric-kafka-address----
Original file line number Diff line number Diff line change
@@ -1,5 +1 @@
pinpoint.metric.kafka.bootstrap.servers=----release-metric-kafka-address----
#pinpoint.metric.kafka.key.serializer=
#pinpoint.metric.kafka.value.serializer=
#pinpoint.metric.kafka.acks=1
#pinpoint.metric.compressionType=zstd
pinpoint.metric.kafka.bootstrapServers=----release-profile-metric-kafka-address----
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.navercorp.pinpoint.pinot.kafka;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -10,18 +9,25 @@
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import java.util.List;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;

/**
* @author Woonduk Kang(emeroad)
*/
@ExtendWith(SpringExtension.class)
@ContextConfiguration(classes = KafkaConfiguration.class )
@TestPropertySource(properties = "pinpoint.metric.kafka.bootstrap.servers=test-kafka-bootstrap")
@TestPropertySource(properties = "pinpoint.metric.kafka.bootstrapServers=test-kafka-bootstrap")
class KafkaConfigurationTest {
@Autowired
ProducerFactory producerFactory;

@Test
void test() {
Assertions.assertEquals("test-kafka-bootstrap", producerFactory.getConfigurationProperties().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
Map<?, ?> properties = producerFactory.getConfigurationProperties();
List<String> boostrap = (List<String>) properties.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
assertThat(boostrap).containsExactly("test-kafka-bootstrap");
}
}

0 comments on commit e222f24

Please sign in to comment.