kafka 和 Logstash的使用

1. 前提

已有logstash
已有filebeat
已有kafka
查询当前kafka中topics
./kafka-topics.sh –zookeeper 192.168.220.123:2181 –list

__consumer_offsets
test
test1
testbylixr

ELK的配置,请参考:http://www.jianshu.com/p/460a307adebb

2. Filebeat 配置

- input_type: log
  document_type: login
  paths:
     - /data/ncyd/logs/epg/login.log
     - /data/ncyd/epg2logs/epg/login.log
output.elasticsearch:
  hosts: ["ip:9200"]

3. Logstash 配置

input {     
  beats {
    port => "5044"
   }
}
output {
        if [type] == "login" {
            kafka {
                topic_id => "login"
                bootstrap_servers => "ip:9092"
             }
        }
        if [type] == "tomcataccess" {
             elasticsearch {
                hosts => ["ip:9200"]
                index => "logstash-tomcat-accesslog-%{+YYYY.MM.dd}"
                #codec => "json"
           }
        }   
}

4. 访问应用产生login日志

5. 查看kafka中是否有相应topic和数据

  1. 查看topic
bash-4.3# ./kafka-topics.sh  --zookeeper 192.168.220.123:2181 --list
__consumer_offsets
login
test
test1
testbylixr
  1. 查看topic中数据
./kafka-console-consumer.sh --zookeeper 192.168.220.123:2181 --from-beginning --topic login
2017-01-18 13:41:00 {"apkPlatform":"ZTE","apkVersionCode":"60","apkVersionName":"2.0.0","ipAddr":"ip","macAddr":"ABCDEFG","softwareVersion":"1.1.1","stbId":"stbidForTest","stbType":"OTTForTest","userId":"jl016"}

6. 其他

  1. 在这个过程中遇到一个很奇怪的错误
    java.io.IOException: Unreasonable length = 1701969920
    不清楚报错原因,重启了kafka和zookeeper之后 就变正常啦。

  2. 写数据
    bin/kafka-console-producer.sh –broker-list 192.168.220.123:9092 –topic login

    原文作者:灼灼2015
    原文地址: https://www.jianshu.com/p/5803df0ddff3
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞