Linux 搭建 kafka 流程
优质博文:IT-BLOG-CN
一、安装环境
【1】CenOS7虚拟机三台
【2】已经搭建好的zookeeper集群。
【3】软件版本:kafka_2.11-1.0.0
二、创建目录并下载安装软件
【1】创建目录
cd /opt mkdir kafka #创建项目目录 cd kafka mkdir kafkalogs #创建kafka消息目录,主要存放kafka消息
【2】下载软件:我用的是xshell工具将下载好的软件传送到虚拟机的,放在了/opt/kafka/目录下,并解压:
tar -zxvf kafka_2.11-1.0.0.tgz
三、修改配置文件
【1】进入到config目录
cd /opt/kafka/kafka_2.11-1.0.0/config/
我们主要关心的文件只有一个:server.properties我们可以发现在这个目录下有很多文件,还有zookeeper的文件。我们可以直接使用kafka自带的zookeeper集群来启动,但是考虑到未来的项目需求,建议使用独立的zookeeper集群。
-rw-r--r--. 1 root root 906 Oct 27 08:56 connect-console-sink.properties -rw-r--r--. 1 root root 909 Oct 27 08:56 connect-console-source.properties -rw-r--r--. 1 root root 5807 Oct 27 08:56 connect-distributed.properties -rw-r--r--. 1 root root 883 Oct 27 08:56 connect-file-sink.properties -rw-r--r--. 1 root root 881 Oct 27 08:56 connect-file-source.properties -rw-r--r--. 1 root root 1111 Oct 27 08:56 connect-log4j.properties -rw-r--r--. 1 root root 2730 Oct 27 08:56 connect-standalone.properties -rw-r--r--. 1 root root 1221 Oct 27 08:56 consumer.properties -rw-r--r--. 1 root root 4727 Oct 27 08:56 log4j.properties -rw-r--r--. 1 root root 1919 Oct 27 08:56 producer.properties -rw-r--r--. 1 root root 7030 Nov 22 18:10 server.properties -rw-r--r--. 1 root root 1032 Oct 27 08:56 tools-log4j.properties -rw-r--r--. 1 root root 1023 Oct 27 08:56 zookeeper.properties
【2】修改配置文件server.properties:需要说明的是,我现在是在虚拟机1上进行的操作,同样的操作要在三台虚拟机上都执行,只是有些细微的配置不同,其他配置信息完全相同。对于不同虚拟机上有差异的部分,我会一一指出。下面修改配置文件:
# 指定当前节点的brokerId,同一个集群中的brokerId需要唯一 broker.id=0 # 指定监听的地址及端口号,使用hostname 或者内网IP皆可 listeners=PLAINTEXT://hostname:9092 # 配置外网访问ip 如果为空 则会使用listeners(如果不为空) 其他的情况使用 InetAddress.getCanonicalHostName() 的值 # Hostname and port the broker will advertise to producers and consumers. # If not set, it uses the value for "listeners" if configured. Otherwise, it will use the value returned from java.net.InetAddress.getCanonicalHostName(). advertised.listeners=PLAINTEXT://hostname:9092 # 指定kafka日志文件的存储目录 log.dirs=/usr/local/kafka/logs # 指定zookeeper的连接地址,若有多个地址则用逗号分隔 zookeeper.connect=localhost:2181,localhost:2182,localhost:2183 # 配置可删除 delete.topic.enable=true
也就是说,zookeeper.connect指的是zookeeper集群的地址,可以是多个,多个之间用逗号分割。
使用scp命令发送到其他的服务器 注意配置文件需要重点修改几个地方 broker.id listeners以及advertised.listeners
scp -r /usr/local/develop/kafka_2.12-3.6.1 remote_ip:/usr/local/develop/kafka_2.12-3.6.1
需要搭建几个节点 就是发送几份,启动服务即可。
nohup sh ./bin/kafka-server-start.sh ./config/server.properties >./out.log 2>&1 &
四、启动kafka集群并测试
【1】启动服务:首先要启动kafka集群,并且是三台都要手动去启动。
// 进入kafka的bin目录 cd /opt/kafka/kafka_2.11-1.0.0/bin/ // 启动kafka ./kafka-server-start.sh -daemon ../config/server.properties
-daemon代表着以后台模式运行kafka集群,这样kafka的启动就不会影响我们继续在控制台输入命令。
【2】检查服务是否启动:执行命令jps,显示出了正在运行Kafka这个进程,进程号是4855。在运行这个命令时,你可能会出现错误,显示jps command not found。文章的最后给出了解决方案。
jps 4161 QuorumPeerMain 4855 Kafka 6809 Jps
【3】创建topic来验证是否创建成功
创建topic
./kafka-topics.sh --create --zookeeper 192.168.172.10:12181 --replication-factor 2 --partitions 1 --topic my-topic
参数解释:
--replication-factor 2 // 复制两份 --partitions 1 // 创建1个分区 --topic // 主题为my-topic -- --zookeeper // 此处为为zookeeper监听的地址
创建生产者producer
./kafka-console-producer.sh --broker-list 192.168.172.10:19092 --topic my-topic //`这个IP地址可以写brokerlist中的任意一个
此时,console处于阻塞状态,可以直接输入数据。
创建消费者: 此时要切换到另一台虚拟机的shell界面输入以下命令:
./kafka-console-consumer.sh --zookeeper 192.168.172.10:12181 --topic my-topic --from-beginning
此时,一旦有数据生成,此处的console中就会显示数据。