重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
这篇文章主要讲解了“Elasticsearch reindex及Java使用sliceScorll优化查询的方法”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Elasticsearch reindex及Java使用sliceScorll优化查询的方法”吧!
创新互联主要从事成都做网站、成都网站建设、网页设计、企业做网站、公司建网站等业务。立足成都服务兴县,10多年网站建设经验,价格优惠、服务专业,欢迎来电咨询建站服务:18980820575
Reindex会将一个索引的数据复制到另一个已存在的索引,但是并不会复制原索引的mapping(映射)、shard(分片)、replicas(副本)等配置信息。
简单实例如下
POST _reindex { "source": { "remote": { "host": "http://otherhost:9200", // 远程es的ip和port列表 "socket_timeout": "1m", "connect_timeout": "10s" // 超时时间设置 }, "index": "my_index_name", // 源索引名称 "query": { // 满足条件的数据 "match": { "test": "data" } } }, "dest": { "index": "dest_index_name" // 目标索引名称 } }
具体详细的使用参考
ElasticSearch 6.3版本 Document APIs之Reindex API
elasticsearch 基础 —— ReIndex
在java中对于reindexapi没有找到,于是作者采用了别名转换和全Index查询加上bulk插入的方式对于索引进行迁移。
但是转移数据实在太慢,所以使用了slice对scorll查询进行优化
多线程reindex
具体开启线程数根据Index分片数进行调整,最好和主分片数相同,本例子为五个分片,同时还使用了别名转换对索引进行无缝衔接避免数据正常插入读取
//建新索引 createUserRecordIndex(newIndexName, typeName); //筛选时间 BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); RangeQueryBuilder rangeQueryBuilder; rangeQueryBuilder = QueryBuilders.rangeQuery("createTime") .gte(DateUtil.format(DateUtil.parse(createBeginDate, DateUtil.LONG_WEB_FORMAT), DateUtil.LONG_WEB_FORMAT)) .lte(DateUtil.format(DateUtil.parse(createEndDate, DateUtil.LONG_WEB_FORMAT), DateUtil.LONG_WEB_FORMAT)); boolQueryBuilder.must(rangeQueryBuilder); try { //多线程处理查询请求 Listlist = new ArrayList<>(); for (int i = 0; i < 5; i++) { SliceBuilder sliceBuilder = new SliceBuilder(i, 5); SearchResponse response = EsBuildersServiceUtil.getESClient() .prepareSearch(userRecordAlias) .setTypes(userRecordType) .setQuery(boolQueryBuilder) .setSize(1000).setScroll(new TimeValue(10000)) .slice(sliceBuilder) .execute() .actionGet(); SliceQuery sliceQuery = new SliceQuery(newIndexName, typeName, response); Future submit = threadPoolTaskExecutor.submit(sliceQuery); list.add(submit); } for (Future future : list) { future.get(); } } catch (Exception e) { log.error("reindex error =", e); throw new MembershipDataException(MembershipDataErrorCode.ES_INDEX_CONVERT_ERROR); } try { //别名转换 EsBuildersServiceUtil.getESClient().admin().indices().prepareAliases().removeAlias(oldIndexName, userRecordAlias).execute().actionGet(); EsBuildersServiceUtil.getESClient().admin().indices().prepareAliases().addAlias(newIndexName, userRecordAlias).execute().actionGet(); } catch (Exception e) { log.error(" convertAlias error =", e); throw new MembershipDataException(MembershipDataErrorCode.ES_ALIASES_CONVERT_ERROR); }
slice线程
class SliceQuery implements Callable { private String newIndexName; private String typeName; private SearchResponse response; private SliceQuery(String newIndexName, String typeName, SearchResponse response) { this.newIndexName = newIndexName; this.typeName = typeName; this.response = response; } @Override public Void call() { //获取总数量 long totalCount = response.getHits().getTotalHits(); //计算总次数,每次搜索数量为分片数*设置的size大小 int page = (int) totalCount / 1000; operateRecordList(response, newIndexName, typeName); for (int i = 0; i < page; i++) { //再次发送请求,并使用上次搜索结果的ScrollId response = EsBuildersServiceUtil.getESClient().prepareSearchScroll(response.getScrollId()) .setScroll(new TimeValue(10000)).execute() .actionGet(); operateRecordList(response, newIndexName, typeName); } return null; } }
批量插入
/** * 从查询数据中获取并批量插入Index * * @param response * @param indexName * @param typeName */ private void operateRecordList(SearchResponse response, String indexName, String typeName) { try { SearchHits hits = response.getHits(); Listlist = new ArrayList<>(); for (SearchHit hit : hits) { String sourceAsString = hit.getSourceAsString(); list.add(JSON.parseObject(sourceAsString, AddUserRecordRequest.class)); } //批量插入 saveBulkRecord(list, indexName, typeName); } catch (Exception e) { log.error("operateRecordList error =", e); throw new MembershipDataException(MembershipDataErrorCode.ES_DATA_ADD_ERROR); } } /** * 批量插入 * * @param list * @param indexName * @param typeName */ private void saveBulkRecord(List list, String indexName, String typeName) { try { BulkRequestBuilder bulkRequest = EsBuildersServiceUtil.getESClient().prepareBulk(); for (AddUserRecordRequest recordRequest : list) { JSONObject json = JSONObject.fromObject(recordRequest); bulkRequest.add(EsBuildersServiceUtil.getESClient() .prepareIndex(indexName, typeName) .setSource(json)); } if (list.size() > 0) { bulkRequest.execute().actionGet(); } } catch (Exception e) { log.error("saveBulkRecord error =", e); throw new MembershipDataException(MembershipDataErrorCode.ES_DATA_ADD_ERROR); } }
感谢各位的阅读,以上就是“Elasticsearch reindex及Java使用sliceScorll优化查询的方法”的内容了,经过本文的学习后,相信大家对Elasticsearch reindex及Java使用sliceScorll优化查询的方法这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!