Flume-agent内部架构:Flume-agent内部架构如封面,图片来源Welcome to Apache Flume。
1. 下载flume及kafka
地址:http://flume.apache.org/版本:apache-flume-1.8.0-bin.tar.gz
地址:http://kafka.apache.org/ 版本:kafka_2.11-1.0.0.tgz
2. 解压及移动
解压文件并移动到自己熟悉的位置,我的kafka和flume都放在/usr/local里
~/Downloads $ tar zxvf apache-flume-1.8.0-bin.tar.gz
~/Downloads $ mv apache-flume-1.8.0-bin /usr/local #注意root执行
~/Downloads $ tar zxf kafka_2.11-1.0.0.tgz
~/Downloads $ mv kafka_2.11-1.0.0.tgz /usr/local #注意root执行
3. 配置文件
在flume文件夹下conf文件夹下新建flume-kafka.conf文件
apache-flume-1.8.0-bin/conf $ touch flume-kafka.conf
apache-flume-1.8.0-bin/conf $ vim flume-kafka.conf
编辑配置文件如下:
#agent1组件名称
agent1.sources = source
agent1.sinks = sink
agent1.channels = channel
#agent1-source
agent1.sources.source.type = spooldir
agent1.sources.source.spoolDir = /usr/local/apache-flume-1.8.0-bin/logdir
#agent1-sink
agent1.sinks.sink.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.sink.topic = test#这里是kafka的topic名字
agent1.sinks.sink.brokerList = 127.0.0.1:9092 #本机端口为9092
agent1.sinks.sink.requiredAcks = 1
agent1.sinks.sink.batchSize = 100 #agent1-channel
agent1.channels.channel.type = memory
agent1.channels.channel.capacity = 1000
agent1.channels.channel.transactionCapacity = 100 #绑定source和sink到channel上
agent1.sources.source.channels = channel
agent1.sinks.sink.channel = channel
单机配置中kafka的配置文件暂时无需更改
4. flume和kafka整合
首先启动本机的zookeeper,我是单独配置的zookeeper,一直在后台挂载。这里提供启动kafka内置的zookeeper:
kafka_2.11-1.0.0 $ bin/zookeeper-server-start.sh config/zookeeper.properties
然后启动kafka服务:
kafka_2.11-1.0.0 $ bin/kafka-server-start.sh config/server.properties
创建一个topic–test并查看topic列表:
kafka_2.11-1.0.0 $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test kafka_2.11-1.0.0 $ bin/kafka-topics.sh --list --zookeeper localhost:2181
启动flume-kafka.conf:
apache-flume-1.8.0-bin $ bin/flume-ng agent --conf conf --conf-file conf/flume-kafka.conf --name agent1 -Dflume.root.logger=INFO,console #这里注意权限问题,如果没有读logdir文件中的权限一定要在sudo执行
kafka_2.11-1.0.0 $ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
最后在logdir文件夹中加入testKafka.log文件(写入flume connect kafka success!!):
apache-flume-1.8.0-bin $ touch testKafka.log
apache-flume-1.8.0-bin $ gedit testKafka.log #编辑文件内容为flume connect kafka success!!
看到consumer中出现编辑的内容表示编辑成功
5. 自己遇到的一些坑
- 未启动zookeeper
- kafka服务未启动,导致的 no brokers found when trying to rebalance(不知道是不是真的是这个原因,但启动后就好了)
- 之前看一篇博文说需要引用第三方的包,我引用之后跑不起来,去掉引用的包,并更改了kafka版本及安装方式就可以了(最开始的kafka使用homebrew安装的,文件结构和下载安装包安装的是不同的)
- 文件访问的权限问题,由于目标log文件需要root权限,flume-kafka.conf在启动的时候需要sudo,没有给权限就会出现以下问题:
[ERROR – org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)] Unable to start EventDrivenSourceRunner: { source:Spool Directory source source1: { spoolDir: /usr/local/flume/logtest } } – Exception follows.
org.apache.flume.FlumeException: Unable to read and modify files in the spooling directory: /usr/local/flume/logtest
参考文献:
[1] http://kafka.apache.org/quickstart
[2] http://flume.apache.org/
在我的公众号【轮子工厂】后台回复关键字:
1.回复【图书】:获取15本新手自学编程,零基础入门经典学习教材;
2.回复【我要造轮子】:获取100多本我根据知乎上面关于计算机问题的高赞回答里面的介绍整理出来的书籍;
3.回复【开发工具】:获取几大主流编程语言的开发工具~
4.回复【ps教程】:获取ps视频免费教程;
5.回复【早起】:每天早上7点准时叫你起床