kafka集群安装

  1. 使用的测试环境如下所示,需要注意的是zookeeper的版本需要和kafka的版本一致,不然会报错:
ip 系统版本 主机名 zookeeper版本 kafka版本 java版本
172.16.50.61 centos7 kafka_zk_01 3.5.7 kafka_2.12-2.5.0 13.0.2
172.16.50.62 centos7 kafka_zk_02 3.5.7 kafka_2.12-2.5.0 13.0.2
172.16.50.63 centos7 kafka_zk_03 3.5.7 kafka_2.12-2.5.0 13.0.2
  1. 安装java环境,参考 centos7 安装手动安装java环境
  2. 安装 zookeeper ,参考 zookeeper集群搭建
  3. 在这三台服务器上面,安装kafka,配置文件”/opt/kafka/config/server.properties”修改成如下
broker.id=1 #kafka_zk_02节点修改为2,kafka_zk_03的节点修改为3
listeners=PLAINTEXT://172.16.50.61:9092 #修改Kafka的IP地址为各自节点的本地地址
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka/logs  #日志目录需要提前建好
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=72
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.16.50.61:2181,172.16.50.62:2181,172.16.50.63:2181 # zookeeper集群的连接地址
delete.topic.enable=true
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=3000

  1. 最后启动kafka,可以参考 使用supervisor运行kafka

使用supervisor运行kafka

安装 supervisor 参考 supervisor安装

创建并编辑 /etc/supervisord.d/kafka.ini 文件,内容如下:

[program:kafka]
command=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
environment=JMX_PORT=9999
process_name=%(program_name)s
numprocs=1
directory=/opt/kafka/
autostart=true
autorestart=true
startsecs=10
startretries=5
user=root
redirect_stderr=false
stdout_logfile=/var/log/supervisor/kafka.log
stdout_logfile_maxbytes=100MB
stdout_logfile_backups=10
stdout_capture_maxbytes=10MB
stdout_events_enabled=false
stderr_logfile=/var/log/supervisor/kafka_error.log
stderr_logfile_maxbytes=1MB
stderr_logfile_backups=5
stderr_capture_maxbytes=1MB

载入新加的 kafka.ini配置文件

supervisorctl update