目前在做的监控项目中有个对es的聚合查询的需求,需要用go语言实现,
需求就是查询某个IP在一个时间范围内,各个监控指标取时间单位内的平均值。
有点拗口,如下是es的查询语句,可以很明显的看到是要聚合cpu和mem两个field。
另外,时区必须要加上,否则少8小时,你懂的。
GET /monitor/v1/_search
{
"query": {
"bool": {
"must": [
{
"term": {
"ip": {
"value": "192.168.1.100"
}
}
},
{
"range": {
"datatime": {
"gte": 1545634340000,
"lte": "now"
}
}
}
]
}
},
"size": 0,
"aggs": {
"AVG_Metric": {
"date_histogram": {
"field": "datatime",
"interval": "day",
"time_zone": "Asia/Shanghai" ,
"format": "yyyy-MM-dd HH:mm:ss"
},
"aggs": {
"avg_mem": {
"avg": {
"field": "mem"
}
},"avg_cpu":{
"avg": {
"field": "cpu"
}
}
}
}
}
}
单条数据的内容大概是这样:
datatime是毫秒级的时间戳,索引的mapping一定要是date类型,否则不能做聚合。
{
"cpu":5,
"mem":77088,
"datatime":1545702661000,
"ip":"192.168.1.100"
}
查询出来的结果呢,长这个样子:
可以看到,在buckets中查询出了3条数据,其中几个字段的意思:
key_as_string与key:format后的时间和毫秒时间戳
doc_count:聚合了多少的doc
avg_mem与avg_cpu:查询语句中定义的别名和value。
{
"took": 4,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 2477,
"max_score": 0,
"hits": []
},
"aggregations": {
"AVG_Metric": {
"buckets": [
{
"key_as_string": "2018-12-24 00:00:00",
"key": 1545580800000,
"doc_count": 402,
"avg_mem": {
"value": 71208.1592039801
},
"avg_cpu": {
"value": 4.338308457711443
}
},
{
"key_as_string": "2018-12-25 00:00:00",
"key": 1545667200000,
"doc_count": 1258,
"avg_mem": {
"value": 77958.16852146263
},
"avg_cpu": {
"value": 4.639904610492846
}
},
{
"key_as_string": "2018-12-26 00:00:00",
"key": 1545753600000,
"doc_count": 817,
"avg_mem": {
"value": 86570.06609547124
},
"avg_cpu": {
"value": 4.975520195838433
}
}
]
}
}
}
下面使用go语言的es客户端“github.com/olivere/elastic”来实现相同的查询需求,我用的是v6版本:
关于聚合结果的输出,遇到了些问题,网上也搜索不到,官方也没找到相应的例子。不过总算试出两个可行的,比较推荐第一种,可以少定义两个结构体。上代码:
package main
import (
"github.com/olivere/elastic"
"context"
"encoding/json"
"fmt"
)
type Aggregations struct {
AVG_Metric AVG_Metric `json:"AVG_Metric"`
}
type AVG_Metric struct {
Buckets []Metric `json:"buckets"`
}
type Metric struct {
Key int64 `json:"key"`
Doc_count int64 `json:"doc_count"`
Avg_mem Value `json:"avg_mem"`
Avg_cpu Value `json:"avg_cpu"`
}
type Value struct {
Value float64 `json:"value"`
}
var client, _ = elastic.NewSimpleClient(elastic.SetURL("http://127.0.0.1:9200"))
func main() {
//指定ip和时间范围
boolSearch := elastic.NewBoolQuery().
Filter(elastic.NewTermsQuery("ip", "192.168.1.100")).
Filter(elastic.NewRangeQuery("datatime").Gte(1545634340000).Lte("now"))
//需要聚合的指标
mem := elastic.NewAvgAggregation().Field("mem")
cpu := elastic.NewAvgAggregation().Field("cpu")
//单位时间和指定字段
aggs := elastic.NewDateHistogramAggregation().
Interval("day").
Field("datatime").
TimeZone("Asia/Shanghai").
SubAggregation("avg_mem", mem).
SubAggregation("avg_cpu", cpu)
//查询语句
result, _ := client.Search().
Index("monitor").
Type("v1").
Query(boolSearch).
Size(0).
Aggregation("AVG_Metric", aggs).
Do(context.Background())
//结果输出:
//第一种方式
var m *[]Metric
term, _ := result.Aggregations.Terms("AVG_Metric")
for _, bucket := range term.Aggregations {
b, _ := bucket.MarshalJSON()
//fmt.Println(string(b))
json.Unmarshal(b, &m)
for _, v := range *m {
fmt.Println(v)
}
}
//第二种方式
var a *Aggregations
b, _ := json.Marshal(result.Aggregations)
//fmt.Println(string(b))
json.Unmarshal(b, &a)
for _, v := range a.AVG_Metric.Buckets {
fmt.Println(v)
}
}
{1545580800000 402 {71208.1592039801} {4.338308457711443}}
{1545667200000 1258 {77958.16852146263} {4.639904610492846}}
{1545753600000 965 {87164.17409326424} {4.972020725388601}}
{1545580800000 402 {71208.1592039801} {4.338308457711443}}
{1545667200000 1258 {77958.16852146263} {4.639904610492846}}
{1545753600000 965 {87164.17409326424} {4.972020725388601}}