1. 前提
已有logstash
已有filebeat
已有kafka
查询当前kafka中topics
./kafka-topics.sh –zookeeper 192.168.220.123:2181 –list
__consumer_offsets
test
test1
testbylixr
ELK的配置,请参考:http://www.jianshu.com/p/460a307adebb
2. Filebeat 配置
- input_type: log
document_type: login
paths:
- /data/ncyd/logs/epg/login.log
- /data/ncyd/epg2logs/epg/login.log
output.elasticsearch:
hosts: ["ip:9200"]
3. Logstash 配置
input {
beats {
port => "5044"
}
}
output {
if [type] == "login" {
kafka {
topic_id => "login"
bootstrap_servers => "ip:9092"
}
}
if [type] == "tomcataccess" {
elasticsearch {
hosts => ["ip:9200"]
index => "logstash-tomcat-accesslog-%{+YYYY.MM.dd}"
#codec => "json"
}
}
}
4. 访问应用产生login日志
5. 查看kafka中是否有相应topic和数据
- 查看topic
bash-4.3# ./kafka-topics.sh --zookeeper 192.168.220.123:2181 --list
__consumer_offsets
login
test
test1
testbylixr
- 查看topic中数据
./kafka-console-consumer.sh --zookeeper 192.168.220.123:2181 --from-beginning --topic login
2017-01-18 13:41:00 {"apkPlatform":"ZTE","apkVersionCode":"60","apkVersionName":"2.0.0","ipAddr":"ip","macAddr":"ABCDEFG","softwareVersion":"1.1.1","stbId":"stbidForTest","stbType":"OTTForTest","userId":"jl016"}
6. 其他
在这个过程中遇到一个很奇怪的错误
java.io.IOException: Unreasonable length = 1701969920
不清楚报错原因,重启了kafka和zookeeper之后 就变正常啦。写数据
bin/kafka-console-producer.sh –broker-list 192.168.220.123:9092 –topic login