.1 kafka+zookeeper集群环境以安装
1.2 下载flume
本文使用flume1.7
下载地址:http://flume.apache.org/download.html
二.配置flume
2.1 上传flume
- #上传下载包至/opt/software
- cd /opt/software
- rz apache-flume-1.7.0-bin.tar.gz
- #解压
- tar -zxvf apache-flume-1.7.0-bin.tar.gz
- #复制apache-flume-1.7.0-bin至/usr/local
- cp -r apache-flume-1.7.0-bin flume
2.2 配置环境变量
- vi /etc/profile
- export FLUME_HOME=/usr/local/flume
- export path=$PATH:$FLUME_HOME/bin
- 保存退出
- 重新编译profile
- source /etc/profile
2.3 配置flume-env
- #添加jdk路径
- cd /usr/local/flume/conf
- cp -r flume-env.sh.template flume-env.sh
- vi flume-env.sh
- export JAVA_HOME=/usr/local/jdk
三.测试flume+kafka
kafka接受flume监控数据
3.1 配置flume-conf
cp -r flume-conf.properties.template flume-conf.properties
vi flume-conf.properties
- #配置文件信息
- agent.sources = s1
- agent.channels = c1
- agent.sinks = k1
- agent.sources.s1.type=exec
- agent.sources.s1.command=tail -F /opt/software/abc.log
- agent.sources.s1.channels=c1
- agent.channels.c1.type=memory
- agent.channels.c1.capacity=10000
- agent.channels.c1.transactionCapacity=100
- #设置Kafka接收器
- agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
- #设置Kafka的broker地址和端口号
- agent.sinks.k1.brokerList=192.168.32.128:9092,192.168.32.131:9092,192.168.32.132:9092
- #设置Kafka的Topic
- agent.sinks.k1.topic=flumeTest
- #设置序列化方式
- agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder
- agent.sinks.k1.channel=c1
3.2 编写简单Shell脚本abc.sh,并修改权限为可执行权限
- #定位
- cd /opt/software
- #新建abc.sh
- vi abc.sh
- for((i=0;i<=50000;i++));
- do echo “test-“+$i>>abc.log;
- done
chmod 755 abc.sh
3.3 启动zookeeper和kafka
详情见:http://blog.csdn.net/a123demi/article/details/70279296
3.4 kafka消费者监听flumeTest主题
bin/kafka-console-consumer.sh –zookeeper 192.168.32.128:2181,192.168.32.131:2181,192.168.32.132:2181 –topic flumeTest –from-beginning
3.5 启动flume
./bin/flume-ng agent -n agent -c conf -f conf/flume-conf.properties -Dflume.root.logger=INFO,console
注意:-n agent中要与flume-conf配置文件中agent对应
3.6 运行abc.sh
./abc.sh
3.7 输出结果
kafka消费者接受数据