I have an application that may need multiple producers. All code samples I see seem to support a single producer, reading config from app during app startup. If there are multiple producers and we want to pass in different producer config, is there out of the box support in Spring? Or should I just go without spring in that case?
5 Answers
you will have to create two different ProducerFactory below is example
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; @Configuration public class KafkaProducerConfig { @Bean public ProducerFactory<String, String> confluentProducerFactory() { HashMap<String, Object> configProps = new HashMap<String, Object>(); configProps.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public ProducerFactory<String, String> cloudraProducerFactory() { HashMap<String, Object> configProps = new HashMap<String, Object>(); configProps.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094"); configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean(name = "confluent") public KafkaTemplate<String, String> confluentKafkaTemplate() { return new KafkaTemplate<>(confluentProducerFactory()); } @Bean(name = "cloudera") public KafkaTemplate<String, String> clouderaKafkaTemplate() { return new KafkaTemplate<>(cloudraProducerFactory()); } } public class ProducerExample { @Autowired @Qualifier("cloudera") private KafkaTemplate clouderaKafkaTemplate; @Autowired @Qualifier("confluent") private KafkaTemplate confluentKafkaTemplate; public void send() { confluentKafkaTemplate.send("TestConfluent", "hey there..confluent"); clouderaKafkaTemplate.send("TestCloudEra","hey there.. cloudera"); } } 3 Comments
If you still want to keep your configuration in application.yaml as usual, and keep Java configuration as minimum as possible, you can extend KafkaProperties.Producer.
@Configuration @ConfigurationProperties(prefix = "spring.kafka.producer-1") @RequiredArgsConstructor class FirstProducer extends KafkaProperties.Producer { private final KafkaProperties common; @Qualifier("producer-1") @Bean public ProducerFactory<?, ?> producerFactory() { final var conf = new HashMap<>( this.common.buildProducerProperties() ); conf.putAll(this.buildProperties()); return new DefaultKafkaProducerFactory<>(conf); } @Qualifier("producer-1") @Bean public KafkaTemplate<?, ?> kafkaTemplate() { return new KafkaTemplate<>(this.producerFactory()); } } @Configuration @ConfigurationProperties(prefix = "spring.kafka.producer-2") @RequiredArgsConstructor class SecondProducer extends KafkaProperties.Producer { private final KafkaProperties common; @Qualifier("producer-2") @Bean public ProducerFactory<?, ?> producerFactory() { final var conf = new HashMap<>( this.common.buildProducerProperties() ); conf.putAll(this.buildProperties()); return new DefaultKafkaProducerFactory<>(conf); } @Qualifier("producer-2") @Bean public KafkaTemplate<?, ?> kafkaTemplate() { return new KafkaTemplate<>(this.producerFactory()); } } 2 Comments
Starting with version 2.5, you can use a RoutingKafkaTemplate to select the producer at runtime, based on the destination topic name. https://docs.spring.io/spring-kafka/reference/html/#routing-template
Comments
Spring boot doesn't provide out of the box support for multiple producer configuration. You can write your own custom kafka configuration to support multiple producer config something like this:-
kafka: producer: producer1: topic: topic1 bootstrap-servers: server1:9092,server1:9093,server1:9094 retries: 0 acks: all producer2: topic: topic2 bootstrap-servers: server2:9092,server2:9093,server2:9094 retries: 2 acks: 1 producer3: ... producer4: ... Read the configuration from class file:-
@Configuration @ConfigurationProperties(prefix = "kafka") @Getter @Setter public class KafkaCustomProperties { private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092")); private String clientId; private Map<String, String> properties = new HashMap<>(); private Map<String, KafkaProperties.Producer> producer; private Map<String, KafkaProperties.Consumer> consumer; private KafkaProperties.Ssl ssl = new KafkaProperties.Ssl(); private KafkaProperties.Security security = new KafkaProperties.Security(); public Map<String, Object> buildCommonProperties() { Map<String, Object> properties = new HashMap<>(); if (this.bootstrapServers != null) { properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); } if (this.clientId != null) { properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId); } properties.putAll(this.ssl.buildProperties()); properties.putAll(this.security.buildProperties()); if (!CollectionUtils.isEmpty(this.properties)) { properties.putAll(this.properties); } return properties; } } use this configuration to generate KafkaTemplate beans for each producer using @Qualifier annotation
@Configuration @RequiredArgsConstructor @Slf4j public class KafkaMultipleProducerConfig { private final KafkaCustomProperties kafkaCustomProperties; @Bean @Qualifier("producer1") public KafkaTemplate<String, Object> producer1KafkaTemplate() { return new KafkaTemplate<>(producerFactory("producer1")); } @Bean @Qualifier("producer2") public KafkaTemplate<String, Object> producer2KafkaTemplate() { return new KafkaTemplate<>(producerFactory("producer2")); } private ProducerFactory<String, Object> producerFactory(String producerName) { Map<String, Object> properties = new HashMap<>(kafkaCustomProperties.buildCommonProperties()); if (nonNull(kafkaCustomProperties.getProducer())) { KafkaProperties.Producer producerProperties = kafkaCustomProperties.getProducer().get(producerName); if (nonNull(producerProperties)) { properties.putAll(producerProperties.buildProperties()); } } log.info("Kafka Producer '{}' properties: {}", producerName, properties); return new DefaultKafkaProducerFactory<>(properties); } } and use these KafkaTemplate beans to publish message to different producer config.
Refer to the post https://codingnconcepts.com/spring-boot/configure-multiple-kafka-producer/ for detailed explanation.