一个简单的spring+kafka生产者
1. pom
org.springframework.kafka spring-kafka
2. 生产者
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; import com.xxx.npi.module.common.msg.dto.MsgBase; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.List; @Service public class MyMessageProducerService { @Value("${npi.default-url}") private String domain; private final KafkaTemplate kafkaTemplate; public MyMessageProducerService(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } public void sendMessage(String topicName, T msgObj) { List list = new ArrayList(); list.add(msgObj); if("https://npi.xxx.com".equals(domain)){ kafkaTemplate.send(topicName, toJsonString(list)); } } public void sendMessage(String topicName, List list) { if("https://npi.xxx.com".equals(domain)){ kafkaTemplate.send(topicName, toJsonString(list)); } } private String toJsonString(Object obj) { return JSON.toJSONString(obj, SerializerFeature.WriteDateUseDateFormat, SerializerFeature.WriteMapNullValue, SerializerFeature.WriteNullListAsEmpty, SerializerFeature.WriteNullStringAsEmpty, SerializerFeature.DisableCircularReferenceDetect); } }
3. 配置
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SslConfigs; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.Resource; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; import java.nio.file.StandardCopyOption; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaProducerConfig { @Value("${spring.kafka.producer.bootstrap-servers}") private String servers; @Value("${spring.kafka.producer.retries}") private int retries; @Value("${spring.kafka.producer.acks}") private String acks; @Value("${spring.kafka.producer.batch-size}") private int batchSize; @Value("${spring.kafka.producer.linger-ms}") private int lingerMs; @Value("${spring.kafka.producer.buffer-memory}") private int bufferMemory; @Value("${spring.kafka.producer.key-serializer}") private String keySerializer; @Value("${spring.kafka.producer.value-serializer}") private String valueSerializer; @Value("${spring.kafka.producer.security.protocol}") private String securityProtocol; @Value("${spring.kafka.producer.ssl.truststore.location}") private Resource sslTruststoreLocationResource; @Value("${spring.kafka.producer.ssl.truststore.password}") private String sslTruststorePassword; @Value("${spring.kafka.producer.sasl.mechanism}") private String saslMechanism; @Value("${spring.kafka.producer.sasl.jaas.config}") private String saslJaasConfig; @SuppressWarnings({"unchecked", "rawtypes"}) @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate(producerFactory()); } @SuppressWarnings("unchecked") @Bean public ProducerFactory producerFactory() { @SuppressWarnings("rawtypes") DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory(producerConfigs()); // factory.transactionCapable(); // factory.setTransactionIdPrefix("transaction-"); return factory; } public Map producerConfigs() { Map props = new HashMap(); props.put("bootstrap.servers", servers); props.put("acks", acks); props.put("retries", retries); props.put("batch.size", batchSize); props.put("linger.ms", lingerMs); props.put("buffer.memory", bufferMemory); props.put("key.serializer", keySerializer); props.put("value.serializer", valueSerializer); props.put("security.protocol", securityProtocol); props.put("sasl.mechanism", saslMechanism); props.put("sasl.jaas.config", saslJaasConfig); // 如果需要更低级别的消息丢失防护,可以启用幂等性 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // SSL配置 props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS"); try { // 将类路径资源转换为临时文件路径 InputStream inputStream = sslTruststoreLocationResource.getInputStream(); File tempFile = File.createTempFile("client_truststore", ".jks"); Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING); props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, tempFile.getAbsolutePath()); props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sslTruststorePassword); } catch (IOException e) { throw new RuntimeException("Failed to locate truststore file", e); } return props; } }
4. application
spring: kafka: producer: bootstrap-servers: n2.ikt.xxx.com:9092, n3.ikt.xxx.com:9092, n4.ikt.xxx.com:9092, n5.ikt.xxx.com:9092, n6.ikt.xxx.com:9092 acks: all retries: 3 batch-size: 16384 linger-ms: 1 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer security.protocol: SASL_SSL ssl.truststore.location: classpath:client_truststore.jks ssl.truststore.password: pwd sasl.mechanism: SCRAM-SHA-512 sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username='kaf-username' password='pwd'; topic: br: mdscinpi.mdscinpi-data.tst mem: mdscinpi.msdcinpi-data.tst fbr: mdscinpi.inpi-data.tst cr: mdscinpi.npi-data.tst
(图片来源网络,侵删)
文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。