亿级数据多条件组合查询——秒级响应解决方案

1 概述

组合查询为多条件组合查询,在很多场景下都有使用。购物网站中通过勾选类别、价格、销售量范围等属性来对所有的商品进行筛选,筛选出满足客户需要的商品,这是一种典型的组合查询。在小数据量的情况下,后台通过简单的sql语句便能够快速过滤出需要的数据,但随着数据量的增加,继续使用sql语句,查询效率会直线下降。当数据量达到一定的量级,服务器将会不堪重负甚至面临挂掉的危险,并且大数据量的存储也成为了一个问题。本文将讨论在亿级数据的情况下,多条件组合查询秒级响应的解决方案。

2 方案思考

2.1 数据存储

假定每条数据有10个字段,每个字段的大小为4Byte,共有1亿条数据。通过传统的关系型数据库mysql,使用JDBC批处理和事务混合的方式对数据进行插入,插入一亿数据大约需要半小时,字段可能会出现为空的情况,导致冗余。针对海量数据的存储,现如今使用较多的是HBase。使用HBase的好处有三:其一,它是非关系型数据库,字段为空的值只在逻辑上存在,在空间上不存在,因此解决了冗余的问题;其二,它是面向列的数据库,能够通过简单的API调用对字段进行横向扩展;其三,它是分布式数据库,表的RowKey 按照字典排序,Region按照RowKey设置split point进行shard,通过这种方式实现全局、分布式索引,通过RowKey索引数据能够在毫秒级返回。Hbase插入数据可以调用批量插入或者通过MR程序插入,实测在批量提交数据条数设置为1000,开10个线程的情况下,插入一亿数据大约需要10分钟。若需要加速插入速度,可以通过增加批量提交数、调整线程数或者使用MR程序进行Hbase的写入。Hbase本身是分布式数据库,数据存储可以存储在多个节点上,使用Zookeeper统一管理,提供数据备份和故障恢复的功能。因此使用Hbase作为数据仓库,对结构化数据进行存储。

2.2 数据查询

Hbase中的数据查询只有两种方式:一是使用get 'tablename', 'rowkey‘’直接通过rowkey进行查询,亿级数据的查询结果可以在毫秒内返回;二是设置过滤器对全表进行Scan扫描,该查询方式在海量数据的情况下耗时十分长,当然也和服务器的性能有关。我们的需求是秒级响应,如果使用全表扫描方式,数据量达到万级或者十万级就无法实现实时响应了,要进行这样的查询,往往是要通过类似Hive、Pig等系统进行全表的MapReduce计算,这种方式既浪费了机器的计算资源,又因高延迟使得应用黯然失色。因此我们考虑使用rowKey对数据进行查询,如果我们使用rowKey对全表进行多条件组合查询,这将对rowKey的设置要求十分高,面向业务而言这对程序员十分不友好,因此我们需要通过建立二级索引的方式,按索引的种类扫描各自独立的单索引表,最后将扫描结果merge,得到目标rowKey。HBase有原生的建立二级索引的方式,即使用HBase的coprocessor协处理器,可以根据业务进行灵活的设置,但较为复杂,本文讨论使用一种业务模式较为固定,但更加简单直接的方式创建索引——Solr。Solr是一个独立的企业级搜索应用服务器,是Apache Lucene项目的开源企业搜索平台。其主要功能包括全文检索、命中标示、分面搜索、动态聚类、数据库集成,以及富文本(如Word、PDF)的处理。Solr是高度可扩展的,并提供了分布式搜索和索引复制。我们可以直接使用Solr这一组件,通过修改配置文件以实现相关的业务需求。通过批量建立索引的方式对HBase中的一亿条数据的10个字段构建索引,耗时为3383s,约为1小时。具体代码如下:

public class ThreadsCreateIndexWork { 
    private static Logger logger = LoggerFactory.getLogger(ThreadsCreateIndexWork.class);
    public static void main(String[] args) throws IOException, SolrServerException { 
        if(args.length < 3) { 
            logger.info("[tableName | queueSize | threadCount]");
            logger.info("e.g.| test1 20000 20");
        }
        String tableName = args[0];
        String queueSize = args[1];
        String threadCount = args[2];

        long start = System.currentTimeMillis();

        final Configuration conf;
        Properties prop = PropertiesReaderUtils.getProperties("conf/path.properties");
        String server = prop.getProperty("solr.server");
        SolrServer solrServer = new ConcurrentUpdateSolrServer(server, Integer.parseInt(queueSize), Integer.parseInt(threadCount));

        conf = HBaseConfiguration.create();
        HTable table = new HTable(conf, tableName); // 这里指定HBase表名称
        Scan scan = new Scan();
        scan.addFamily(Bytes.toBytes("people")); // 这里指定HBase表的列族
        scan.setCaching(500);
        scan.setCacheBlocks(false);
        ResultScanner ss = table.getScanner(scan);

        try { 
            for (Result r : ss) { 
                SolrInputDocument solrDoc = new SolrInputDocument();
                solrDoc.addField("rowkey", new String(r.getRow()));
                for (KeyValue kv : r.raw()) { 
                    String fieldName = new String(kv.getQualifier());
                    String fieldValue = new String(kv.getValue());
                    if (fieldName.equalsIgnoreCase("upperClothing")
                            || fieldName.equalsIgnoreCase("lowerClothing")
                            || fieldName.equalsIgnoreCase("coatStyle")
                            || fieldName.equalsIgnoreCase("trousersStyle")
                            || fieldName.equalsIgnoreCase("sex")
                            || fieldName.equalsIgnoreCase("age")
                            || fieldName.equalsIgnoreCase("angle")
                            || fieldName.equalsIgnoreCase("bag")
                            || fieldName.equalsIgnoreCase("umbrella")
                            || fieldName.equalsIgnoreCase("featureType")){ 
                        solrDoc.addField(fieldName, fieldValue);
                    }
                }
                solrServer.add(solrDoc);
            }
            ss.close();
            table.close();
        } catch (IOException e) { 
        } finally { 
            ss.close();
            table.close();
        }

        long time = System.currentTimeMillis() - start;
        logger.info("---------- create index with thread use time " + String.valueOf(time));
    }
}

ConcurrentUpdateSolrServer类可以使用多线程向SolrCloud的多个节点发送http请求,queueSize为队列大小,即往Solr中一次性批量add的数据数目,threadCount为开启的线程数,可以根据服务器性能的不同进行自定义,以提高构建索引的速度。笔者对1000w条数据进行参数调整测试,得到如下结果:

queueSizethreadNumTime (s)
1000020274
2000020250
2000030255
2000040254
3000020255
因此测试中选用的参数为queueSize:20000threadNum:20,索引构建速度为4w/s,还尝试通过修改scan.setCaching(500);的大小来提高构建速度,但是发现该缓存大小对构建速度的影响可以忽略不计,应该是索引构建速度低于HBase的Scan速度,因此暂时没有必要对HBase的Scan操作进行加速。Solr对构建索引的服务进行了上层封装,提供一个web服务的接口,可以直接通过可视化界面对结果进行查询。

3 解决方案

《亿级数据多条件组合查询——秒级响应解决方案》
综上,针对亿级数据多条件组合查询,给出的解决方案是使用HBase+Solr的方式,CDH将HBase和Solr都以组件的方式提供出来,可以使用CDH平台对HBase和Solr进行统一的管理。Hbase用于存储海量数据,Solr使用SolrCloud模式进行部署,提供索引构建和查询。索引的创建可以通过接口离线批量创建,也可以使用HBase Indexer连接HBase和Solr,提供自动化索引构建,CDH平台也集成了Hbase Indexer(Lily HBase Indexer)这一组件,具体的整合方法见 Solr+Hbase+Hbase Indexer查询方案流程整合

4 方案测试

笔者将一亿条包含10个字段的数据开启10个线程插入Hbase中,然后使用Solr对10个字段构建了索引,在Solr的可视化界面进行查询,查询结果如下图所示。
《亿级数据多条件组合查询——秒级响应解决方案》
《亿级数据多条件组合查询——秒级响应解决方案》
《亿级数据多条件组合查询——秒级响应解决方案》
其中,QTime为响应时间,q为查询语句,wt为请求格式(设置请求格式为xml响应速度更快),numFound为找到符合查询条件的数据条数,docs为返回的数据也就是rowKey。可以看到,组合查询都能够在秒级响应返回响应rowKey,而通过rowKey在HBase中返回该条数据的所有字段可以在毫秒级响应,如下图所示:
《亿级数据多条件组合查询——秒级响应解决方案》
至此,可以证明,亿级数据多条件组合查询使用HBase+Solr的解决方案能够满足秒级响应的需求。具体流程操作可参考Solr+Hbase+Hbase Indexer查询方案流程整合

    原文作者:廿半
    原文地址: https://blog.csdn.net/qq_34842671/article/details/83036242
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞