使用 https://github.com/taowen/es-monitor 可以用 SQL 进行 elasticsearch 的查询。Elasticsearch 2.0引入的一个重大特性是支持了PipelineAggregation。在有这个特性之前,elasticsearch聚合之后可以做的计算仅仅是对TermsAggregation的结果做一个排寻,并取个TOP N。初此之外什么计算都做不了。而SQL里一个重要的特性是HAVING字句,用其过滤我们不关心的桶,以减少结果的数量。今天我们就来看看如何用Pipeline Aggregation实现HAVING。
HAVING ipo_count > 300
SQL
$ cat << EOF | ./es_query.py http://127.0.0.1:9200
SELECT ipo_year, COUNT(*) AS ipo_count FROM symbol
GROUP BY ipo_year HAVING ipo_count > 200
EOF
{"ipo_count": 390, "ipo_year": 2014}
{"ipo_count": 334, "ipo_year": 2015}
{"ipo_count": 253, "ipo_year": 2013}
Elasticsearch
{
"aggs": {
"ipo_year": {
"terms": {
"field": "ipo_year",
"size": 0
},
"aggs": {
"having": {
"bucket_selector": {
"buckets_path": {
"ipo_count": "_count"
},
"script": {
"lang": "expression",
"inline": " ipo_count > 200"
}
}
}
}
}
},
"size": 0
}
这里bucket_selector使用的语法和前面GROUP BY ipo_year % 5
的语法是类似的。不同之处在于,之前的script是从document里取值。而这里的script是从当前bucket里取值。
{
"hits": {
"hits": [],
"total": 6714,
"max_score": 0.0
},
"_shards": {
"successful": 1,
"failed": 0,
"total": 1
},
"took": 3,
"aggregations": {
"ipo_year": {
"buckets": [
{
"key": 2014,
"doc_count": 390
},
{
"key": 2015,
"doc_count": 334
},
{
"key": 2013,
"doc_count": 253
}
],
"sum_other_doc_count": 0,
"doc_count_error_upper_bound": 0
}
},
"timed_out": false
}
Profile
[
{
"query": [
{
"query_type": "MatchAllDocsQuery",
"lucene": "*:*",
"time": "0.9405890000ms",
"breakdown": {
"score": 0,
"create_weight": 17443,
"next_doc": 753268,
"match": 0,
"build_scorer": 169878,
"advance": 0
}
}
],
"rewrite_time": 7177,
"collector": [
{
"name": "MultiCollector",
"reason": "search_multi",
"time": "6.110736000ms",
"children": [
{
"name": "TotalHitCountCollector",
"reason": "search_count",
"time": "0.8071620000ms"
},
{
"name": "LongTermsAggregator: [ipo_year]",
"reason": "aggregation",
"time": "2.416942000ms"
}
]
}
]
}
]
Having看来是Lucene计算完之后在Elasticsearch的内存里自己过滤的,所以没有体现在Profile的结果里。
HAVING ipo_count > 100 AND max_last_sale <= 10000
SQL
$ cat << EOF | ./es_query.py http://127.0.0.1:9200
SELECT ipo_year, COUNT(*) AS ipo_count, MAX(last_sale) AS max_last_sale FROM symbol
GROUP BY ipo_year HAVING ipo_count > 100 AND max_last_sale <= 10000
EOF
{"max_last_sale": 6178.0, "ipo_count": 390, "ipo_year": 2014}
Elasticsearch
{
"aggs": {
"ipo_year": {
"terms": {
"field": "ipo_year",
"size": 0
},
"aggs": {
"max_last_sale": {
"max": {
"field": "last_sale"
}
},
"having": {
"bucket_selector": {
"buckets_path": {
"max_last_sale": "max_last_sale",
"ipo_count": "_count"
},
"script": {
"lang": "expression",
"inline": " ipo_count > 100 && max_last_sale <= 10000"
}
}
}
}
}
},
"size": 0
}
{
"hits": {
"hits": [],
"total": 6714,
"max_score": 0.0
},
"_shards": {
"successful": 1,
"failed": 0,
"total": 1
},
"took": 3,
"aggregations": {
"ipo_year": {
"buckets": [
{
"max_last_sale": {
"value": 6178.0
},
"key": 2014,
"doc_count": 390
}
],
"sum_other_doc_count": 0,
"doc_count_error_upper_bound": 0
}
},
"timed_out": false
}
Profile
[
{
"query": [
{
"query_type": "MatchAllDocsQuery",
"lucene": "*:*",
"time": "0.2386400000ms",
"breakdown": {
"score": 0,
"create_weight": 7620,
"next_doc": 204982,
"match": 0,
"build_scorer": 26038,
"advance": 0
}
}
],
"rewrite_time": 2379,
"collector": [
{
"name": "MultiCollector",
"reason": "search_multi",
"time": "1.955767000ms",
"children": [
{
"name": "TotalHitCountCollector",
"reason": "search_count",
"time": "0.2170820000ms"
},
{
"name": "LongTermsAggregator: [ipo_year]",
"reason": "aggregation",
"time": "0.9893530000ms"
}
]
}
]
}
]