kafka消费消息并对消息进行RSA公钥解密

2024-02-27 1076阅读

温馨提示:这篇文章已超过397天没有更新,请注意相关的内容是否还可用!

SpringBoot版本2.x.x 具体是几,忘记了,是支持application.yml或者是application.properties配置的。当然也可以使用Java配置类。

kafka消费消息并对消息进行RSA公钥解密
(图片来源网络,侵删)

以下是使用Java配置类来配置的。

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig  {
    //指定kafka 代理集群地址,多个地址用英文逗号隔开
    private String bootstrapServers="x.x.x.x:xxxx,x.x.x.x:xxxx";
    
    //指定默认消费者group id,消费者监听到的也是这个
    private String groupId="xxx";
    
    //消费者在读取一个没有offset的分区或者offset无效时的策略,默认earliest是从头读,latest不是从头读
    private String autoOffsetReset="earliest";
    
    //是否自动提交偏移量offset,默认为true,一般是false,如果为false,则auto-commit-interval属性就会无效
    private boolean  enableAutoCommit=true;
    
    //自动提交间隔时间,接收到消息后多久会提交offset,前提需要开启自动提交,也就是enable-auto-commit设置为true,默认单位是毫秒(ms),如果写10s,最后加载的显示值为10000ms,需要符合特定时间格式:1000ms,1S,1M,1H,1D(毫秒,秒,分,小时,天)
    private String autoCommitInterval="1000";
    
    //指定消息key和消息体的编解码方式
    private String keyDeserializerClass="org.apache.kafka.common.serialization.StringDeserializer";
    
    private String valueDeserializerClass ="org.apache.kafka.common.serialization.StringDeserializer";
    
    //批量消费每次最多消费多少条信息 可以自己根据业务来配置
    //private String maxPollRecords="50";
    
    //协议类型,为SASL类型
    private String securityProtocol="SASL_PLAINTEXT";
    
    //协议
    private String saslMechanism="SCRAM-SHA-512";
    
    //用户名密码配置
    private String saslJaas="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"xxxx\" password=\"xxxx\";";
    @Bean
    ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory =
                new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        //factory.setBatchListener(false);//这里为true的时候,KafkaConsumer那里需要使用批量消费方法,不然报错
        return factory;
    }
    @Bean
    public ConsumerFactory consumerFactory() {
        Map props = new HashMap();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass);
        //props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
        props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
        props.put(SaslConfigs.SASL_JAAS_CONFIG,saslJaas);
        return new DefaultKafkaConsumerFactory(props);
    }
}

消息的代码

@Slf4j
@Service
public class KafkaConsumerService {
	//单条消费
    @KafkaListener(topics = "xxxx", groupId = "xxxx")
    public void consume(ConsumerRecord record) {
        String value = record.value();
		//业务逻辑
		...
    }
	//批量消费
	@KafkaListener(topics = "xxxx", groupId = "xxxx")
    public void consume(List
VPS购买请点击我

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!

目录[+]