kafka开启SSL认证(包括内置zookeeper开启SSL)
zookeeper和kafka的SSL开启都可单独进行
生成SSL证书
使用jre自带的keytool工具生成,linux和windows下生成的证书可以通用
- 生成含有一个私钥的keystore文件,有效期10年(本文证书密码统一使用test123)
keytool -genkeypair -alias certificatekey -dname “CN=127.0.0.1, OU=127.0.0.1, O=127.0.0.1, L=SH, ST=SH, C=CN” -keyalg RSA -validity 3650 -keystore keystore.jks
- 查看生成的keystore文件
keytool -list -v -keystore keystore.jks
- 导出证书
keytool -export -alias certificatekey -keystore keystore.jks -rfc -file selfsignedcert.cer
- 导入证书到truststore文件中
keytool -import -alias certificatekey -file selfsignedcert.cer -keystore truststore.jks
- 查看生成的truststore文件
keytool -list -v -keystore truststore.jks
zookeeper开启SSL
受版本影响,需要3.5及以上版本才能正常开启,(可查看 libs/zookeeper-xxx.jar确认版本号)
- 将生成的证书文件统一放在kafka安装路径ssl文件夹(需要新建)
- 编辑config/zookeeper.properties配置文件,添加下列配置
#开启SSL #ssl端口 secureClientPort=3183 #开启ssl使用的本机访问地址 secureClientPortAddress=127.0.0.1 serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory ssl.keyStore.location=/home/app/kafka/ssl/keystore.jks ssl.keyStore.password=test123 ssl.trustStore.location=/home/app/kafka/ssl/truststore.jks ssl.trustStore.password=test123
若注释掉原本的 clientPort配置则可以屏蔽非SSL的访问地址
- Kafka配置zookeeper ssl连接
编辑 config/server.properties, 开启SSL并配置上证书和证书库路径以及相应密码,配置如下:
zookeeper.connection.timeout.ms=18000 zookeeper.connect=127.0.0.1:3183 zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty zookeeper.ssl.client.enable=true zookeeper.client.secure=true zookeeper.ssl.hostnameVerification=false zookeeper.ssl.keystore.location=/home/app/kafka/ssl/keystore.jks zookeeper.ssl.keystore.password=test123 zookeeper.ssl.truststore.location=/home/app/kafka/ssl/truststore.jks zookeeper.ssl.truststore.password=test123
kafka开启SSL
修改kafka安装路径下 config/server.properties文件,将kafka连接协议改为SSL,并配置上证书和证书库路径以及相应密码,如下:
#IP地址使用实际kafka服务器IP地址(需自己更换下面的ip) listeners =SSL://192.168.6.6:9093 #server.keystore.jks的地址 ssl.keystore.location=/home/app/kafka/ssl/keystore.jks ssl.keystore.password=test123 ssl.truststore.location=/home/app/kafka/ssl/truststore.jks ssl.truststore.password=test123 ssl.client.auth=required ssl.enabled.protocols=TLSv1.2 ssl.keystore.type=JKS ssl.truststore.type=JKS #关闭主机名 ssl.endpoint.identification.algorithm= #设置内部访问也用SSL,默认值为security.inter.broker.protocol=PLAINTEXT security.inter.broker.protocol=SSL
测试
启动zookeeper服务:
bin/zookeeper-server-start.sh config/zookeeper.properties
启动kafka服务:
bin/kafka-server-start.sh config/server.properties
创建客户端连接配置文件 client-ssl.properties
security.protocol=SSL ssl.enabled.protocols=TLSv1.2 ssl.truststore.location=/home/app/kafka/ssl/truststore.jks ssl.truststore.password=test123 ssl.keystore.location=/home/app/kafka/ssl/keystore.jks ssl.keystore.password=test123 ssl.key.password=test123 ssl.endpoint.identification.algorithm=
创建带证书校验的生产者
bin/kafka-console-producer.sh --broker-list 192.168.6.6:9093 --topic test --producer.config client-ssl.properties
另起窗口创建带证书校验的消费者
bin/kafka-console-consumer.sh --broker-list 192.168.6.6:9093 --topic test --consumer.config client-ssl.properties
尝试在生产者窗口中输入任意字符回车,若消费者窗口能正常接收则表示kafka运行正常