最近接触到Flume,这里通过一些小案例做一些学习的分享。主要包括以下内容:
1-概念、2-源码编译、3-快速入门:https://www.imooc.com/article/278218
4-源码解读:https://www.imooc.com/article/278294
5-TAILDIR监听日志文件,源码修改、6-TAILDIR监听日志文件到HDFS的案例:
https://www.imooc.com/article/278481
7-TAILDIR监听日志文件到Kafka的案例
8-TAILDIR监听日志文件到ES6.X版本的案例(包括自己实现ES高版本的Sink)
注:本系列所有文章基于Flume 1.7.0
所有分析和注释代码都在:https://github.com/lizu18xz/flume-release-1.7.0
TAILDIR监听日志文件到Kafka的案例
(1)配置文件
1.配置文件
[taildir-avro.conf]
agent.sources = avro_sources01
agent.channels = avro_channel01
agent.sinks = avro_sink01
agent.sources.avro_sources01.type = TAILDIR
agent.sources.avro_sources01.positionFile = /home/elasticsearch/data/flume/taildir_position.json
agent.sources.avro_sources01.filegroups = f1
agent.sources.avro_sources01.filegroups.f1 = /home/elasticsearch/data/weblog/.*
agent.sources.avro_sources01.serviceName = dataWorks
agent.sources.avro_sources01.channels = avro_channel01
agent.channels.avro_channel01.type = memory
agent.channels.avro_channel01.capacity = 1000000
agent.channels.avro_channel01.transactionCapacity = 2000
agent.sinks.avro_sink01.type = avro
agent.sinks.avro_sink01.hostname = 192.168.88.129
agent.sinks.avro_sink01.port = 4545
agent.sinks.avro_sink01.channel = avro_channel01
[avro-kafka.conf]
agent.sources = kafka_sources01
agent.channels = kafka_channel01
agent.sinks = kafka_sink01
agent.sources.kafka_sources01.type = avro
agent.sources.kafka_sources01.bind = 192.168.88.129
agent.sources.kafka_sources01.port = 4545
agent.sources.kafka_sources01.channels = kafka_channel01
agent.channels.kafka_channel01.type = memory
agent.channels.kafka_channel01.capacity = 1000000
agent.channels.kafka_channel01.transactionCapacity = 6000
agent.sinks.kafka_sink01.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafka_sink01.kafka.topic = flume-kafka
agent.sinks.kafka_sink01.kafka.bootstrap.servers = localhost:9092
agent.sinks.kafka_sink01.flumeBatchSize = 6000
agent.sinks.kafka_sink01.kafka.producer.acks = 1
agent.sinks.kafka_sink01.kafka.producer.linger.ms = 1
agent.sinks.kafka_sink01.kafka.producer.compression.type = snappy
agent.sinks.kafka_sink01.channel = kafka_channel01
(2)启动kafka和flume
启动消费日志到kafka生产者的agent
./bin/flume-ng agent --conf conf -f /home/hadoop/app/data/avro-kafka.conf -n agent -Dflume.root.logger=INFO,console
启动生产日志的agent
bin/flume-ng agent --conf conf -f /home/elasticsearch/data/flume/taildir-avro.conf -n agent -Dflume.root.logger=INFO,console
(3)生产数据,启动kafka消费者
echo "2019-02-12 10:33:26 [com.faya.data.controller.LoginController]-[INFO] 用户登陆入参:userId = 10" >log.txt
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic flume-kafka --from-beginning
注意,需要现在kafka 创建flume-kafka的topic.
(4)kafka sink参数解析
http://flume.apache.org/releases/content/1.7.0/FlumeUserGuide.html#kafka-sink
可以在官网上面查看相关的参数
[type] – Must be set to org.apache.flume.sink.kafka.KafkaSink
类型必须这样设置,之前源码分析的时候讲过具体原因。
[kafka.bootstrap.servers,kafka.topic]都是基本的参数
[flumeBatchSize] – How many messages to process in one batch.一批处理多少条消息
注意和kafka 中的batch.size不是同一个:
如果接收的条数大于这个flumeBatchSize的数量,就会进行producer.flush(),
当达到配置的批次数量后,直接刷新,该方法会将数据全部生产到Kafka,Prevent(防止)
linger.ms from holding the batch,防止linger.ms参数控制这个批次。
这个配置是flume进行控制的,和kafka的batch.size参数无关。
部分代码:
for (; processedEvents < batchSize; processedEvents += 1) {
//从channel获取event
event = channel.take();
byte[] eventBody = event.getBody();
Map<String, String> headers = event.getHeaders();
eventTopic = headers.get(TOPIC_HEADER);
if (eventTopic == null) {
eventTopic = topic;
}
eventKey = headers.get(KEY_HEADER);
Integer partitionId = null;
try {
ProducerRecord<String, byte[]> record;
if (partitionId != null) {
record = new ProducerRecord<String, byte[]>(eventTopic, partitionId, eventKey,
serializeEvent(event, useAvroEventFormat));
} else {
record = new ProducerRecord<String, byte[]>(eventTopic, eventKey,
serializeEvent(event, useAvroEventFormat));
}
kafkaFutures.add(producer.send(record, new SinkCallback(startTime)));
} catch (NumberFormatException ex) {
throw new EventDeliveryException("Non integer partition id specified", ex);
} catch (Exception ex) {
...
throw new EventDeliveryException("Could not send event", ex);
}
}
//Prevent(防止) linger.ms from holding the batch,当达到配置的批次数量后,直接刷新,该方法会将数据全部生产到Kafka
producer.flush();
[Other Kafka Producer Properties] – 可以配置kafka producer的所有支持的
参数,For example: kafka.producer.linger.ms
[kafka.producer.linger.ms] 单位是ms 延迟 可以获取更多消息
[kafka.producer.batch.size] 单位是bytes 控制一次发送的大小,批处理
这两个都是kafka client里面的参数,当达到一个后就会进行批量发送数据了
关于kafka producer的相关参数可以到官网查看:
http://kafka.apache.org/documentation.html#producerapi
未完待续
8-TAILDIR监听日志文件到ES6.X版本的案例(包括自己实现ES高版本的Sink)
代码地址
https://github.com/lizu18xz/flume-release-1.7.0