- 使用的测试环境如下所示,需要注意的是zookeeper的版本需要和kafka的版本一致,不然会报错:
ip | 系统版本 | 主机名 | zookeeper版本 | kafka版本 | java版本 |
---|---|---|---|---|---|
172.16.50.61 | centos7 | kafka_zk_01 | 3.5.7 | kafka_2.12-2.5.0 | 13.0.2 |
172.16.50.62 | centos7 | kafka_zk_02 | 3.5.7 | kafka_2.12-2.5.0 | 13.0.2 |
172.16.50.63 | centos7 | kafka_zk_03 | 3.5.7 | kafka_2.12-2.5.0 | 13.0.2 |
- 安装java环境,参考 centos7 安装手动安装java环境
- 安装 zookeeper ,参考 zookeeper集群搭建
- 在这三台服务器上面,安装kafka,配置文件”/opt/kafka/config/server.properties”修改成如下
broker.id=1 #kafka_zk_02节点修改为2,kafka_zk_03的节点修改为3
listeners=PLAINTEXT://172.16.50.61:9092 #修改Kafka的IP地址为各自节点的本地地址
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka/logs #日志目录需要提前建好
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=72
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.16.50.61:2181,172.16.50.62:2181,172.16.50.63:2181 # zookeeper集群的连接地址
delete.topic.enable=true
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=3000
- 最后启动kafka,可以参考 使用supervisor运行kafka