15

I have an application that may need multiple producers. All code samples I see seem to support a single producer, reading config from app during app startup. If there are multiple producers and we want to pass in different producer config, is there out of the box support in Spring? Or should I just go without spring in that case?

5 Answers 5

25

you will have to create two different ProducerFactory below is example

 import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; @Configuration public class KafkaProducerConfig { @Bean public ProducerFactory<String, String> confluentProducerFactory() { HashMap<String, Object> configProps = new HashMap<String, Object>(); configProps.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public ProducerFactory<String, String> cloudraProducerFactory() { HashMap<String, Object> configProps = new HashMap<String, Object>(); configProps.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094"); configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean(name = "confluent") public KafkaTemplate<String, String> confluentKafkaTemplate() { return new KafkaTemplate<>(confluentProducerFactory()); } @Bean(name = "cloudera") public KafkaTemplate<String, String> clouderaKafkaTemplate() { return new KafkaTemplate<>(cloudraProducerFactory()); } } public class ProducerExample { @Autowired @Qualifier("cloudera") private KafkaTemplate clouderaKafkaTemplate; @Autowired @Qualifier("confluent") private KafkaTemplate confluentKafkaTemplate; public void send() { confluentKafkaTemplate.send("TestConfluent", "hey there..confluent"); clouderaKafkaTemplate.send("TestCloudEra","hey there.. cloudera"); } } 
Sign up to request clarification or add additional context in comments.

3 Comments

and multiple consumer ?
Can different producer factories(ie producer kafka config) be defined in application properties?
@user518066 Yes, see my answer below.
12

You can create several Producer instances (KafkaTemplate) via the same ProducerFactory.

If you need different Kafka configurations, you’ll need different ProducerFactory instances.

Comments

9

If you still want to keep your configuration in application.yaml as usual, and keep Java configuration as minimum as possible, you can extend KafkaProperties.Producer.

 @Configuration @ConfigurationProperties(prefix = "spring.kafka.producer-1") @RequiredArgsConstructor class FirstProducer extends KafkaProperties.Producer { private final KafkaProperties common; @Qualifier("producer-1") @Bean public ProducerFactory<?, ?> producerFactory() { final var conf = new HashMap<>( this.common.buildProducerProperties() ); conf.putAll(this.buildProperties()); return new DefaultKafkaProducerFactory<>(conf); } @Qualifier("producer-1") @Bean public KafkaTemplate<?, ?> kafkaTemplate() { return new KafkaTemplate<>(this.producerFactory()); } } @Configuration @ConfigurationProperties(prefix = "spring.kafka.producer-2") @RequiredArgsConstructor class SecondProducer extends KafkaProperties.Producer { private final KafkaProperties common; @Qualifier("producer-2") @Bean public ProducerFactory<?, ?> producerFactory() { final var conf = new HashMap<>( this.common.buildProducerProperties() ); conf.putAll(this.buildProperties()); return new DefaultKafkaProducerFactory<>(conf); } @Qualifier("producer-2") @Bean public KafkaTemplate<?, ?> kafkaTemplate() { return new KafkaTemplate<>(this.producerFactory()); } } 

2 Comments

That looks nice! However, would there exist a way to do away with all of the boilerplate code? The only difference I can spot is the name of the properties prefix.
You can extract shared parts to a superclass, here code is duplicated just as an illustation
4

Starting with version 2.5, you can use a RoutingKafkaTemplate to select the producer at runtime, based on the destination topic name. https://docs.spring.io/spring-kafka/reference/html/#routing-template

Comments

2

Spring boot doesn't provide out of the box support for multiple producer configuration. You can write your own custom kafka configuration to support multiple producer config something like this:-

kafka: producer: producer1: topic: topic1 bootstrap-servers: server1:9092,server1:9093,server1:9094 retries: 0 acks: all producer2: topic: topic2 bootstrap-servers: server2:9092,server2:9093,server2:9094 retries: 2 acks: 1 producer3: ... producer4: ... 

Read the configuration from class file:-

@Configuration @ConfigurationProperties(prefix = "kafka") @Getter @Setter public class KafkaCustomProperties { private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092")); private String clientId; private Map<String, String> properties = new HashMap<>(); private Map<String, KafkaProperties.Producer> producer; private Map<String, KafkaProperties.Consumer> consumer; private KafkaProperties.Ssl ssl = new KafkaProperties.Ssl(); private KafkaProperties.Security security = new KafkaProperties.Security(); public Map<String, Object> buildCommonProperties() { Map<String, Object> properties = new HashMap<>(); if (this.bootstrapServers != null) { properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); } if (this.clientId != null) { properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId); } properties.putAll(this.ssl.buildProperties()); properties.putAll(this.security.buildProperties()); if (!CollectionUtils.isEmpty(this.properties)) { properties.putAll(this.properties); } return properties; } } 

use this configuration to generate KafkaTemplate beans for each producer using @Qualifier annotation

@Configuration @RequiredArgsConstructor @Slf4j public class KafkaMultipleProducerConfig { private final KafkaCustomProperties kafkaCustomProperties; @Bean @Qualifier("producer1") public KafkaTemplate<String, Object> producer1KafkaTemplate() { return new KafkaTemplate<>(producerFactory("producer1")); } @Bean @Qualifier("producer2") public KafkaTemplate<String, Object> producer2KafkaTemplate() { return new KafkaTemplate<>(producerFactory("producer2")); } private ProducerFactory<String, Object> producerFactory(String producerName) { Map<String, Object> properties = new HashMap<>(kafkaCustomProperties.buildCommonProperties()); if (nonNull(kafkaCustomProperties.getProducer())) { KafkaProperties.Producer producerProperties = kafkaCustomProperties.getProducer().get(producerName); if (nonNull(producerProperties)) { properties.putAll(producerProperties.buildProperties()); } } log.info("Kafka Producer '{}' properties: {}", producerName, properties); return new DefaultKafkaProducerFactory<>(properties); } } 

and use these KafkaTemplate beans to publish message to different producer config.

Refer to the post https://codingnconcepts.com/spring-boot/configure-multiple-kafka-producer/ for detailed explanation.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.