从0开始搭建ELK日志分析系统

搭建前软件准备:

CentOS 6.8

JDK 1.8+

zookeeper-3.4.10.tar.gz

apache-flume-1.7.0-bin.tar.gz

kafka_2.10-0.10.2.2.tgz

elasticsearch-5.4.0.tar.gz

kibana-5.4.0-linux-x86_64.tar.gz

logstash-5.4.0.tar.gz

图示架构(有点丑嘻嘻):

《从0开始搭建ELK日志分析系统》
《从0开始搭建ELK日志分析系统》

1.flume收集日志信息到kafka

2.logstash过滤并收集kafka中的数据到elasticsearch

3.kibana将elastisearch的数据显示在前台

(1)在随意位置新建一个名为access.log的文件,这个文件就是我们flume收集的日志.

(2)启动zookeeper,在启动kafka。(这里是单机版,集群也是可以的)

(3)新建flume从access-log文件采集到kafka的配置文件(flume-access-log-kafka.properties)我放置在 /usr/local/flume/conf 下面

access-log-agent.sources=access-log
access-log-agent.sinks=kafka
access-log-agent.channels=memory

access-log-agent.sources.access-log.type=exec
access-log-agent.sources.access-log.channels=memory
access-log-agent.sources.access-log.command=tail -f /usr/local/flume/access.log
access-log-agent.sources.access-log.fileHeader=false

access-log-agent.channels.memory.type=memory
access-log-agent.channels.memory.capacity=1000
access-log-agent.channels.memory.transactionCapacity=1000
access-log-agent.channels.memory.byteCapacityBufferPercentage=20
access-log-agent.channels.memory.byteCapacity=800000

access-log-agent.sinks.kafka.type=org.apache.flume.sink.kafka.KafkaSink
access-log-agent.sinks.kafka.channel=memory
access-log-agent.sinks.kafka.kafka.bootstrap.servers=localhost:9092
access-log-agent.sinks.kafka.kafka.topic=account-access-log
access-log-agent.sinks.kafka.serializer.class=kafka.serializer.StringEncoder
access-log-agent.sinks.kafka.kafka.producer.acks=1
access-log-agent.sinks.kafka.custom.encoding=UTF-8

(4)用flume-access-log-kafka.properties来启动flume

flume-ng agent --conf /usr/local/flume/conf --conf-file ../conf/flume-access-log-kafka.properties --name access-log-agent -Dflume.root.logger=INFO,console

(5)追加日志到access.log

日志格式为

55.3.244.1 GET /index.html 15824 0.043

追加

echo "55.3.244.1 GET /index.html 15824 0.043" >> /usr/local/flume/access.log

(6)配置 logstash从kafka拉取数据到elasticsearch的文件(access_log_statis)

input{
        kafka{
                #定义一个类型,以与其他日志类型区分
                type => "access"
                #logstash 导出时数据解码方式
                codec =>"plain"
                #消费组
                group_id => "account-access-consumer"
                #消费者标识
                client_id => "account-access-consumer-1"
                #消费的主题
                topics => "account-access-log"
                #连接 Kafka 集群配置
                bootstrap_servers => "localhost:9092"
                #消费起始位置
                auto_offset_reset => "latest"
                #消费者线程数
                consumer_threads => 5
                }
}
filter{
        if[type]=="access"{
                grok{
                #从日志中提取所关注的字段
                match => { "message" => "%{IP:client_id_address} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBE
R:http_response_time}" }
}
                        }
}
output{
        elasticsearch{
                hosts => ["localhost:9200"]
                codec => "plain"
                index => "account_access_log-%{+YYYY.MM.dd}"
                }
        }

(7)启动logstash(确保启动了elasticsearch)

./logstash -f  ../etc/access_log_statis &

(8)启动kibana

(9)在kibana的页面选择Management(左边最后一个选项),选择Index Patterns

创建account_access_log-*

(10)去Discover就可以看到日志信息了

《从0开始搭建ELK日志分析系统》
《从0开始搭建ELK日志分析系统》

    原文作者:Rooter
    原文地址: https://zhuanlan.zhihu.com/p/47163044
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞