springboot配置多套kafka以及SASL认证
pom.xml
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.7.0</version> </dependency>
yml配置,第一套正常配置,第二套增加了SASL认证
spring: kafka: one: bootstrap-servers: 192.168.1.10:9092 producer: retries: 3 batch-size: 16384 buffer-memory: 33554432 acks: 1 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: TEST001 enable-auto-commit: false topic: NOTICE_TEST_001 key-serializer: org.apache.kafka.common.serialization.StringDeserializer value-serializer: org.apache.kafka.common.serialization.StringDeserializer listener: ack-mode: MANUAL_IMMEDIATE poll-timeout: 3000 two: bootstrap-servers: 192.168.1.11:9092 producer: retries: 3 batch-size: 16384 buffer-memory: 33554432 acks: 1 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer properties: sasl.mechanism: SCRAM-SHA-512 security.protocol: SASL_PLAINTEXT # sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="password123"; sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="password123"; consumer: group-id: TEST001 enable-auto-commit: false topic: NOTICE_TEST_001 key-serializer: org.apache.kafka.common.serialization.StringDeserializer value-serializer: org.apache.kafka.common.serialization.StringDeserializer properties: sasl.mechanism: SCRAM-SHA-512 security.protocol: SASL_PLAINTEXT # sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="password123"; sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="password123"; listener: ack-mode: MANUAL_IMMEDIATE poll-timeout: 3000
配置类1
package com.dms.kafka.config; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.*; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ConsumerProperties; import org.springframework.kafka.listener.ContainerProperties; @EnableKafka @Configuration public class KafkaOneConfig { @Bean public KafkaTemplate<String,String> kafkaOneTemplate( @Autowired @Qualifier("producerOneFactory")ProducerFactory producerFactory){ return new KafkaTemplate<>(producerFactory); } @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaOneContainerFactory( @Autowired @Qualifier("kafkaOneProperties") KafkaProperties kafkaProperties, @Autowired @Qualifier("consumerOneFactory") ConsumerFactory consumerFactory ){ ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); factory.setConcurrency(kafkaProperties.getListener().getConcurrency() != null ? kafkaProperties.getListener().getConcurrency() : 3); factory.getContainerProperties().setPollTimeout(kafkaProperties.getListener().getPollTimeout() != null ? kafkaProperties.getListener().getPollTimeout().getSeconds() * 1000 : ConsumerProperties.DEFAULT_POLL_TIMEOUT); factory.getContainerProperties().setAckMode(kafkaProperties.getListener().getAckMode() != null ? kafkaProperties.getListener().getAckMode() : ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; } @Primary @ConfigurationProperties(prefix = "spring.kafka.one") @Bean public KafkaProperties kafkaOneProperties(){ return new KafkaProperties(); } @Primary @Bean public ConsumerFactory consumerOneFactory(@Autowired @Qualifier("kafkaOneProperties") KafkaProperties kafkaProperties){ return new DefaultKafkaConsumerFactory(kafkaProperties.buildConsumerProperties()); } @Primary @Bean public ProducerFactory producerOneFactory(@Autowired @Qualifier("kafkaOneProperties") KafkaProperties kafkaProperties){ return new DefaultKafkaProducerFactory(kafkaProperties.buildProducerProperties()); } }
配置类2
package com.dms.kafka.config; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.*; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ConsumerProperties; import org.springframework.kafka.listener.ContainerProperties; @EnableKafka @Configuration public class KafkaTwoConfig { @Bean public KafkaTemplate<String,String> kafkaTwoTemplate( @Autowired @Qualifier("producerTwoFactory") ProducerFactory producerFactory){ return new KafkaTemplate<>(producerFactory); } @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory( @Autowired @Qualifier("kafkaTwoProperties") KafkaProperties kafkaProperties, @Autowired @Qualifier("consumerTwoFactory") ConsumerFactory consumerFactory ){ ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); factory.setConcurrency(kafkaProperties.getListener().getConcurrency() != null ? kafkaProperties.getListener().getConcurrency() : 3); factory.getContainerProperties().setPollTimeout(kafkaProperties.getListener().getPollTimeout() != null ? kafkaProperties.getListener().getPollTimeout().getSeconds() * 1000 : ConsumerProperties.DEFAULT_POLL_TIMEOUT); factory.getContainerProperties().setAckMode(kafkaProperties.getListener().getAckMode() != null ? kafkaProperties.getListener().getAckMode() : ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; } @ConfigurationProperties(prefix = "spring.kafka.Two") @Bean public KafkaProperties kafkaTwoProperties(){ return new KafkaProperties(); } @Bean public ConsumerFactory consumerTwoFactory(@Autowired @Qualifier("kafkaTwoProperties") KafkaProperties kafkaProperties){ return new DefaultKafkaConsumerFactory(kafkaProperties.buildConsumerProperties()); } @Bean public ProducerFactory producerTwoFactory(@Autowired @Qualifier("kafkaTwoProperties") KafkaProperties kafkaProperties){ return new DefaultKafkaProducerFactory(kafkaProperties.buildProducerProperties()); } }
消费者1
package com.dms.kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; @Component public class ListenerOne { @KafkaListener(topics = "#{'${spring.kafka.one.consumer.topic}'}", containerFactory = "kafkaOneContainerFactory") public void onMessage(ConsumerRecord<String,String> record, Acknowledgment ack){ try { System.out.println(record.value()); }catch (Exception e){ }finally { ack.acknowledge(); } } }
消费者2
package com.dms.kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; @Component public class ListenerTwo { @KafkaListener(topics = "#{'${spring.kafka.two.consumer.topic}'}", containerFactory = "kafkaTwoContainerFactory") public void onMessage(ConsumerRecord<String,String> record, Acknowledgment ack){ try { System.out.println(record.value()); }catch (Exception e){ }finally { ack.acknowledge(); } } }
生产者1
package com.dms.kafka.producer; import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; @Component public class ProducerOne { @Autowired @Qualifier("kafkaOneTemplate") private KafkaTemplate<String,String> template; public RecordMetadata sendMsg(String topic, String msg){ try { ListenableFuture<SendResult<String, String>> future = template.send(topic,msg); return future.get().getRecordMetadata(); }catch (Exception e){ return null; } } }
生产者2
package com.dms.kafka.producer; import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; @Component public class ProducerTwo { @Autowired @Qualifier("kafkaTwoTemplate") private KafkaTemplate<String,String> template; public RecordMetadata sendMsg(String topic, String msg){ try { ListenableFuture<SendResult<String, String>> future = template.send(topic,msg); return future.get().getRecordMetadata(); }catch (Exception e){ return null; } } }
123 |
1