2.从Mongodb拿出数据到ElasticSearch,使用Springboot2.x调用

这篇博客从如何从mongodb中导出数据到es中开始,包含了下面几点

  • 0.关于logstash点我
    * 如何从mongodb中读取数据
    * 如何指定index Template(es中的索引模板)
  • 1.关于es点我
    * 指定生成索引的mapping
    * 指定分词器,中文分词器
  • 2.关于springboot (现在是这一部分)
    * 关于es中的自动装配
    * 我都想哭,关于stored_fields ,如何改写ElasticTemplate
    * 简单分析一下源码
    so …

springboot data es中提供的ElasticSearchTemplate中的关于构建SearchRequest的部分,关于Fieldsstored_fields这个请求参数的问题。

《2.从Mongodb拿出数据到ElasticSearch,使用Springboot2.x调用》 哭了

苍天大地啊。

自动装配

关于springboot 的自动装配不了解的同学,欢迎(点我)了解。

《2.从Mongodb拿出数据到ElasticSearch,使用Springboot2.x调用》 自动装配.png

我们这里使用spring data的这个的整合es.

点进去查看具体自动装配了什么类,然后猜一猜,一般就可以了解各大概了。

《2.从Mongodb拿出数据到ElasticSearch,使用Springboot2.x调用》 image.png

剩下的自己看啊。

简单分析源码

这里我们从查询语句的查询这里开始分析。

《2.从Mongodb拿出数据到ElasticSearch,使用Springboot2.x调用》 image.png

提前说明,因为我改写了ElasticSearchTemplate,所以,我这里的
withFieds(...)这个方法才产生的效果是
stored_fileds.

第一步:解析传入的值

《2.从Mongodb拿出数据到ElasticSearch,使用Springboot2.x调用》 image.png

解析SearchQuery

《2.从Mongodb拿出数据到ElasticSearch,使用Springboot2.x调用》 image.png

自己打断点查看。

第二步:构建SearchRequest。

《2.从Mongodb拿出数据到ElasticSearch,使用Springboot2.x调用》 image.png

第三步:执行,解析生成的结果。

改写ElasticSearchTemplate

根据上面发现的问题,如果我们想在参数中添加stored_fileds参数,使用原生的ElasticTemplate是不可能实现的,所以我们改写ElasticTemplate

public class MyElasticSearchTemplate extends ElasticsearchTemplate {


    private Client client;
    private static final String FIELD_SCORE = "_score";
    private static final Logger QUERY_LOGGER = LoggerFactory.getLogger("org.springframework.data.elasticsearch.core.QUERY");
    private String searchTimeout;


    public MyElasticSearchTemplate(Client client){
        super(client);
        this.client = client;
    }

    public MyElasticSearchTemplate(Client client, ElasticsearchConverter elasticsearchConverter) {
        super(client, elasticsearchConverter, new DefaultResultMapper(elasticsearchConverter.getMappingContext()));
        this.client = client;
    }

    @Override
    public <T> AggregatedPage<T> queryForPage(SearchQuery query, Class<T> clazz, SearchResultMapper mapper) {
        SearchResponse response = dosearch(prepareSearch(query, clazz), query);
        return mapper.mapResults(response, clazz, query.getPageable());
    }

        @Override
    public <T> T query(SearchQuery query, ResultsExtractor<T> resultsExtractor) {
        SearchResponse response = dosearch(prepareSearch(query), query);
        return resultsExtractor.extract(response);
    }

    private SearchResponse dosearch(SearchRequestBuilder searchRequest, SearchQuery searchQuery) {
        if (searchQuery.getFilter() != null) {
            searchRequest.setPostFilter(searchQuery.getFilter());
        }

        if (!isEmpty(searchQuery.getElasticsearchSorts())) {
            for (SortBuilder sort : searchQuery.getElasticsearchSorts()) {
                searchRequest.addSort(sort);
            }
        }

        if (!searchQuery.getScriptFields().isEmpty()) {
            // _source should be return all the time
            // searchRequest.addStoredField("_source");
            for (ScriptField scriptedField : searchQuery.getScriptFields()) {
                searchRequest.addScriptField(scriptedField.fieldName(), scriptedField.script());
            }
        }

        if (searchQuery.getHighlightFields() != null || searchQuery.getHighlightBuilder() != null) {
            HighlightBuilder highlightBuilder = searchQuery.getHighlightBuilder();
            if (highlightBuilder == null) {
                highlightBuilder = new HighlightBuilder();
            }
            for (HighlightBuilder.Field highlightField : searchQuery.getHighlightFields()) {
                highlightBuilder.field(highlightField);
            }
            searchRequest.highlighter(highlightBuilder);
        }

        if (!isEmpty(searchQuery.getIndicesBoost())) {
            for (IndexBoost indexBoost : searchQuery.getIndicesBoost()) {
                searchRequest.addIndexBoost(indexBoost.getIndexName(), indexBoost.getBoost());
            }
        }

        if (!isEmpty(searchQuery.getAggregations())) {
            for (AbstractAggregationBuilder aggregationBuilder : searchQuery.getAggregations()) {
                searchRequest.addAggregation(aggregationBuilder);
            }
        }

        if (!isEmpty(searchQuery.getFacets())) {
            for (FacetRequest aggregatedFacet : searchQuery.getFacets()) {
                searchRequest.addAggregation(aggregatedFacet.getFacet());
            }
        }
        return getSearchResponse(searchRequest.setQuery(searchQuery.getQuery()));
    }

    private SearchResponse getSearchResponse(SearchRequestBuilder requestBuilder) {

        if (QUERY_LOGGER.isDebugEnabled()) {
            QUERY_LOGGER.debug(requestBuilder.toString());
        }

        return getSearchResponse(requestBuilder.execute());
    }

    private SearchResponse getSearchResponse(ActionFuture<SearchResponse> response) {
        return searchTimeout == null ? response.actionGet() : response.actionGet(searchTimeout);
    }



    private  <T> SearchRequestBuilder prepareSearch(Query query, Class<T> clazz) {
        if (query.getIndices().isEmpty()) {
            query.addIndices(retrieveIndexNameFromPersistentEntity(clazz));
        }
        if (query.getTypes().isEmpty()) {
            query.addTypes(retrieveTypeFromPersistentEntity(clazz));
        }
        return prepareSearch(query);
    }




    private String[] retrieveIndexNameFromPersistentEntity(Class clazz) {
        if (clazz != null) {
            return new String[] { getPersistentEntityFor(clazz).getIndexName() };
        }
        return null;
    }

    private String[] retrieveTypeFromPersistentEntity(Class clazz) {
        if (clazz != null) {
            return new String[] { getPersistentEntityFor(clazz).getIndexType() };
        }
        return null;
    }


    private SearchRequestBuilder prepareSearch(Query query) {
        Assert.notNull(query.getIndices(), "No index defined for Query");
        Assert.notNull(query.getTypes(), "No type defined for Query");

        int startRecord = 0;
        SearchRequestBuilder searchRequestBuilder = client.prepareSearch(toArray(query.getIndices()))
                .setSearchType(query.getSearchType())
                .setTypes(toArray(query.getTypes()))
                .setVersion(true)
                .setTrackScores(query.getTrackScores());

        if (query.getSourceFilter() != null) {
            SourceFilter sourceFilter = query.getSourceFilter();
            searchRequestBuilder.setFetchSource(sourceFilter.getIncludes(), sourceFilter.getExcludes());
        }

        if (query.getPageable().isPaged()) {
            startRecord = query.getPageable().getPageNumber() * query.getPageable().getPageSize();
            searchRequestBuilder.setSize(query.getPageable().getPageSize());
        }
        searchRequestBuilder.setFrom(startRecord);

        if (!query.getFields().isEmpty()) {
              //修改这句代码
            //修改这句代码
            //修改这句代码
              //修改这句代码
            searchRequestBuilder.storedFields(toArray(query.getFields()));
        }

        if (query.getIndicesOptions() != null) {
            searchRequestBuilder.setIndicesOptions(query.getIndicesOptions());
        }

        if (query.getSort() != null) {
            for (Sort.Order order : query.getSort()) {
                SortOrder sortOrder = order.getDirection().isDescending() ? SortOrder.DESC : SortOrder.ASC;

                if (FIELD_SCORE.equals(order.getProperty())) {
                    ScoreSortBuilder sort = SortBuilders //
                            .scoreSort() //
                            .order(sortOrder);

                    searchRequestBuilder.addSort(sort);
                } else {
                    FieldSortBuilder sort = SortBuilders //
                            .fieldSort(order.getProperty()) //
                            .order(sortOrder);

                    if (order.getNullHandling() == Sort.NullHandling.NULLS_FIRST) {
                        sort.missing("_first");
                    } else if (order.getNullHandling() == Sort.NullHandling.NULLS_LAST) {
                        sort.missing("_last");
                    }

                    searchRequestBuilder.addSort(sort);
                }
            }
        }

        if (query.getMinScore() > 0) {
            searchRequestBuilder.setMinScore(query.getMinScore());
        }
        return searchRequestBuilder;
    }

    private static String[] toArray(List<String> values) {
        String[] valuesAsArray = new String[values.size()];
        return values.toArray(valuesAsArray);
    }

}

注册使用

@Configuration
public class ElasticConfig {

    @Autowired
    private TransportClient transportClient;

    //导入
    @Bean
    public MyElasticSearchTemplate myElasticSearchTemplate(){
        return new MyElasticSearchTemplate(transportClient);
    }

    @Bean
    public ElasticsearchTemplate elasticsearchTemplate() {
        try {
            return new ElasticsearchTemplate(transportClient);
        } catch (Exception var4) {
            throw new IllegalStateException(var4);
        }
    }
}

总结:
我的妈呀,这个问题困扰了特别久,我百度,谷歌都找不到原因,如果我不看源码,我都不知道是这样的。所以推荐大家查看源码解决问题。笔芯。

《2.从Mongodb拿出数据到ElasticSearch,使用Springboot2.x调用》 加油.jpg

    原文作者:砂糖z
    原文地址: https://www.jianshu.com/p/b88803fccc1f
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞