Kafka 分布式流处理系统
- producer:生产者
- consumer:消费者
- broker:数据存放
- topic:主题,消息标签
安装与使用
单机版安装
# 依赖环境:Java+Zookeeper(不过Kafka中也内置了Zookeeper)
# 下载并解压
tar -xf kafka_2.11-2.0.1.tgz -C /app/
# 配置环境变量
vim /etc/profile
export KAFKA_HOME=/app/kafka_2.11-2.0.1
export PATH=$KAFKA_HOME/bin:$PATH
source /etc/profile
# 启动Kafka(这个时候直接启动会失败一定记得先启动Zookeeper)
zookeeper-server-start.sh -daemon /app/kafka_2.11-2.0.1/config/zookeeper.properties
kafka-server-start.sh -daemon /app/kafka_2.11-2.0.1/config/server.properties
# 查看是否启动成功
jps -m
集群安装
- 单节点,多broker
# 启动多个配置文件,配置文件的broker.id必须唯一
kafka-server-start.sh -daemon /app/kafka_2.11-2.0.1/config/server1.properties
kafka-server-start.sh -daemon /app/kafka_2.11-2.0.1/config/server2.properties
- 多节点,多broker
配置文件
broker.id=0
listeners=PLAINTEXT://hostname:9092 # 真正bind的地址
# 如果kafka是在docker或者虚拟机中,外部访问时,需要使用以下参数
# advertised.listeners=PLAINTEXT://hostname:9092 # 暴露给外部的listeners
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400 # 发送缓存字节数
socket.receive.buffer.bytes=102400 # 接受缓存字节数
socket.request.max.bytes=104857600 # 保护OOM
log.dirs=/tmp/kafka-logs # 日志目录
num.partitions=1 # 默认分区数
num.recovery.threads.per.data.dir=1 #
offsets.topic.replication.factor=1 #
transaction.state.log.replication.factor=1 #
transaction.state.log.min.isr=1 #
log.retention.hours=168 # 日志保留时间
log.segment.bytes=1073741824 #
log.retention.check.interval.ms=300000 #
zookeeper.connect=localhost:2181 # zookeeper地址
zookeeper.connection.timeout.ms=6000 # Zookeeper连接超时时间
group.initial.rebalance.delay.ms=0 #
kafka常用命令
Note | Command |
---|---|
创建topic | kafka-topics.sh --zookeeper hostname:2181 --create --replication-factor 1 --partitions 1 --topic topic_name |
列举topic | kafka-topics.sh --zookeeper hostname:2181 --list |
查看topic详情 | kafka-topics.sh --zookeeper hostname:2181 --describe --topic topic_name |
修改topic分区信息 | kafka-topics.sh --zookeeper hostname:2181 --alter --partitions 20 --topic topic_name |
删除topic | kafka-topics.sh --zookeeper hostname:2181 --delete --topic topic_name |
启动consumer | kafka-console-consumer.sh --zookeeper hostname:2181 --topic topic_name |
启动producer | kafka-console-producer.sh --broker-list hostname:9092 --topic topic_name |
从头消费 | --from-beginning |
踩坑记
删除topic
- 删除topic报错
Note: This will have no impact if delete.topic.enable is not set to true.
解决方案:
在server.properties添加delete.topic.enble=true
清理日志
log.retention.hours=1
log.retention.minutes=60
log.retention.ms=3600000
# 或者
log.retention.bytes=1073741824