搭建前软件准备:
CentOS 6.8
JDK 1.8+
zookeeper-3.4.10.tar.gz
apache-flume-1.7.0-bin.tar.gz
kafka_2.10-0.10.2.2.tgz
elasticsearch-5.4.0.tar.gz
kibana-5.4.0-linux-x86_64.tar.gz
logstash-5.4.0.tar.gz
图示架构(有点丑嘻嘻):
1.flume收集日志信息到kafka
2.logstash过滤并收集kafka中的数据到elasticsearch
3.kibana将elastisearch的数据显示在前台
(1)在随意位置新建一个名为access.log的文件,这个文件就是我们flume收集的日志.
(2)启动zookeeper,在启动kafka。(这里是单机版,集群也是可以的)
(3)新建flume从access-log文件采集到kafka的配置文件(flume-access-log-kafka.properties)我放置在 /usr/local/flume/conf 下面
access-log-agent.sources=access-log
access-log-agent.sinks=kafka
access-log-agent.channels=memory
access-log-agent.sources.access-log.type=exec
access-log-agent.sources.access-log.channels=memory
access-log-agent.sources.access-log.command=tail -f /usr/local/flume/access.log
access-log-agent.sources.access-log.fileHeader=false
access-log-agent.channels.memory.type=memory
access-log-agent.channels.memory.capacity=1000
access-log-agent.channels.memory.transactionCapacity=1000
access-log-agent.channels.memory.byteCapacityBufferPercentage=20
access-log-agent.channels.memory.byteCapacity=800000
access-log-agent.sinks.kafka.type=org.apache.flume.sink.kafka.KafkaSink
access-log-agent.sinks.kafka.channel=memory
access-log-agent.sinks.kafka.kafka.bootstrap.servers=localhost:9092
access-log-agent.sinks.kafka.kafka.topic=account-access-log
access-log-agent.sinks.kafka.serializer.class=kafka.serializer.StringEncoder
access-log-agent.sinks.kafka.kafka.producer.acks=1
access-log-agent.sinks.kafka.custom.encoding=UTF-8
(4)用flume-access-log-kafka.properties来启动flume
flume-ng agent --conf /usr/local/flume/conf --conf-file ../conf/flume-access-log-kafka.properties --name access-log-agent -Dflume.root.logger=INFO,console
(5)追加日志到access.log
日志格式为
55.3.244.1 GET /index.html 15824 0.043
追加
echo "55.3.244.1 GET /index.html 15824 0.043" >> /usr/local/flume/access.log
(6)配置 logstash从kafka拉取数据到elasticsearch的文件(access_log_statis)
input{
kafka{
#定义一个类型,以与其他日志类型区分
type => "access"
#logstash 导出时数据解码方式
codec =>"plain"
#消费组
group_id => "account-access-consumer"
#消费者标识
client_id => "account-access-consumer-1"
#消费的主题
topics => "account-access-log"
#连接 Kafka 集群配置
bootstrap_servers => "localhost:9092"
#消费起始位置
auto_offset_reset => "latest"
#消费者线程数
consumer_threads => 5
}
}
filter{
if[type]=="access"{
grok{
#从日志中提取所关注的字段
match => { "message" => "%{IP:client_id_address} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBE
R:http_response_time}" }
}
}
}
output{
elasticsearch{
hosts => ["localhost:9200"]
codec => "plain"
index => "account_access_log-%{+YYYY.MM.dd}"
}
}
(7)启动logstash(确保启动了elasticsearch)
./logstash -f ../etc/access_log_statis &
(8)启动kibana
(9)在kibana的页面选择Management(左边最后一个选项),选择Index Patterns
创建account_access_log-*
(10)去Discover就可以看到日志信息了