springboot配置多套kafka以及SASL认证

pom.xml

<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
			<version>2.7.0</version>
		</dependency>

yml配置,第一套正常配置,第二套增加了SASL认证

spring: 
  kafka:
    one:
      bootstrap-servers: 192.168.1.10:9092
      producer:
        retries: 3
        batch-size: 16384
        buffer-memory: 33554432
        acks: 1
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.apache.kafka.common.serialization.StringSerializer
      consumer:
        group-id: TEST001
        enable-auto-commit: false
        topic: NOTICE_TEST_001
        key-serializer: org.apache.kafka.common.serialization.StringDeserializer
        value-serializer: org.apache.kafka.common.serialization.StringDeserializer
      listener:
        ack-mode: MANUAL_IMMEDIATE
        poll-timeout: 3000
    two:
      bootstrap-servers: 192.168.1.11:9092
      producer:
        retries: 3
        batch-size: 16384
        buffer-memory: 33554432
        acks: 1
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.apache.kafka.common.serialization.StringSerializer
        properties:
          sasl.mechanism: SCRAM-SHA-512
          security.protocol: SASL_PLAINTEXT
#          sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="password123";
          sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="password123";
      consumer:
        group-id: TEST001
        enable-auto-commit: false
        topic: NOTICE_TEST_001
        key-serializer: org.apache.kafka.common.serialization.StringDeserializer
        value-serializer: org.apache.kafka.common.serialization.StringDeserializer
        properties:
          sasl.mechanism: SCRAM-SHA-512
          security.protocol: SASL_PLAINTEXT
#          sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="password123";
          sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="password123";
      listener:
        ack-mode: MANUAL_IMMEDIATE
        poll-timeout: 3000

配置类1

package com.dms.kafka.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerProperties;
import org.springframework.kafka.listener.ContainerProperties;

@EnableKafka
@Configuration
public class KafkaOneConfig {
    @Bean
    public KafkaTemplate<String,String> kafkaOneTemplate(
            @Autowired @Qualifier("producerOneFactory")ProducerFactory producerFactory){
        return new KafkaTemplate<>(producerFactory);
    }
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaOneContainerFactory(
            @Autowired @Qualifier("kafkaOneProperties") KafkaProperties kafkaProperties,
            @Autowired @Qualifier("consumerOneFactory") ConsumerFactory consumerFactory
    ){
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(kafkaProperties.getListener().getConcurrency() != null ?
                kafkaProperties.getListener().getConcurrency() : 3);
        factory.getContainerProperties().setPollTimeout(kafkaProperties.getListener().getPollTimeout() != null ?
                kafkaProperties.getListener().getPollTimeout().getSeconds() * 1000 :
                ConsumerProperties.DEFAULT_POLL_TIMEOUT);
        factory.getContainerProperties().setAckMode(kafkaProperties.getListener().getAckMode() != null ?
                kafkaProperties.getListener().getAckMode() : ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
    @Primary
    @ConfigurationProperties(prefix = "spring.kafka.one")
    @Bean
    public KafkaProperties kafkaOneProperties(){
        return new KafkaProperties();
    }
    @Primary
    @Bean
    public ConsumerFactory consumerOneFactory(@Autowired @Qualifier("kafkaOneProperties") KafkaProperties kafkaProperties){
        return new DefaultKafkaConsumerFactory(kafkaProperties.buildConsumerProperties());
    }
    @Primary
    @Bean
    public ProducerFactory producerOneFactory(@Autowired @Qualifier("kafkaOneProperties") KafkaProperties kafkaProperties){
        return new DefaultKafkaProducerFactory(kafkaProperties.buildProducerProperties());
    }
}

配置类2

package com.dms.kafka.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerProperties;
import org.springframework.kafka.listener.ContainerProperties;

@EnableKafka
@Configuration
public class KafkaTwoConfig {
    @Bean
    public KafkaTemplate<String,String> kafkaTwoTemplate(
            @Autowired @Qualifier("producerTwoFactory") ProducerFactory producerFactory){
        return new KafkaTemplate<>(producerFactory);
    }
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory(
            @Autowired @Qualifier("kafkaTwoProperties") KafkaProperties kafkaProperties,
            @Autowired @Qualifier("consumerTwoFactory") ConsumerFactory consumerFactory
    ){
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(kafkaProperties.getListener().getConcurrency() != null ?
                kafkaProperties.getListener().getConcurrency() : 3);
        factory.getContainerProperties().setPollTimeout(kafkaProperties.getListener().getPollTimeout() != null ?
                kafkaProperties.getListener().getPollTimeout().getSeconds() * 1000 :
                ConsumerProperties.DEFAULT_POLL_TIMEOUT);
        factory.getContainerProperties().setAckMode(kafkaProperties.getListener().getAckMode() != null ?
                kafkaProperties.getListener().getAckMode() : ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
    @ConfigurationProperties(prefix = "spring.kafka.Two")
    @Bean
    public KafkaProperties kafkaTwoProperties(){
        return new KafkaProperties();
    }
    @Bean
    public ConsumerFactory consumerTwoFactory(@Autowired @Qualifier("kafkaTwoProperties") KafkaProperties kafkaProperties){
        return new DefaultKafkaConsumerFactory(kafkaProperties.buildConsumerProperties());
    }
    @Bean
    public ProducerFactory producerTwoFactory(@Autowired @Qualifier("kafkaTwoProperties") KafkaProperties kafkaProperties){
        return new DefaultKafkaProducerFactory(kafkaProperties.buildProducerProperties());
    }
}

消费者1

package com.dms.kafka.consumer;


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class ListenerOne {
    @KafkaListener(topics = "#{'${spring.kafka.one.consumer.topic}'}", containerFactory = "kafkaOneContainerFactory")
    public void onMessage(ConsumerRecord<String,String> record, Acknowledgment ack){
        try {
            System.out.println(record.value());
        }catch (Exception e){

        }finally {
            ack.acknowledge();
        }
    }
}

消费者2

package com.dms.kafka.consumer;


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class ListenerTwo {
    @KafkaListener(topics = "#{'${spring.kafka.two.consumer.topic}'}", containerFactory = "kafkaTwoContainerFactory")
    public void onMessage(ConsumerRecord<String,String> record, Acknowledgment ack){
        try {
            System.out.println(record.value());
        }catch (Exception e){

        }finally {
            ack.acknowledge();
        }
    }
}

生产者1

package com.dms.kafka.producer;

import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

@Component
public class ProducerOne {

    @Autowired
    @Qualifier("kafkaOneTemplate")
    private KafkaTemplate<String,String> template;

    public RecordMetadata sendMsg(String topic, String msg){
        try {
            ListenableFuture<SendResult<String, String>> future = template.send(topic,msg);
            return future.get().getRecordMetadata();
        }catch (Exception e){
            return null;
        }

    }
}

生产者2

package com.dms.kafka.producer;

import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

@Component
public class ProducerTwo {

    @Autowired
    @Qualifier("kafkaTwoTemplate")
    private KafkaTemplate<String,String> template;

    public RecordMetadata sendMsg(String topic, String msg){
        try {
            ListenableFuture<SendResult<String, String>> future = template.send(topic,msg);
            return future.get().getRecordMetadata();
        }catch (Exception e){
            return null;
        }

    }
}


1

123

{context}