Spark 操作 Elasticsearch 性能优化

网友投稿 925 2022-05-30

1 - 背景说明

elasticsearch 在对大批量数据进行统计、聚合等操作时,性能差,主要原因有:

ES 是通过 批量加载数据到内存中,然后进行计算的,其 scroll.size 的默认最大值为 10000,超过此值就会报错 —— 需要修改配置文件;

ES 使用 JVM 堆内存进行计算,但官方建议单个 ES 实例的堆内存要低于 32 GB(不能等于),否则将有资源的浪费、性能的损耗 —— 主要与 JVM 的指针压缩算法有关。

基于此,在大批量数据下的统计、聚合、排序等场景,可借助 Spark 提升运算性能。

2 - 开发方法

2.1 引入依赖

ES 官方提供了一款工具:elasticsearch-hadoop,通过 Maven 引入即可使用。

    org.elasticsearch     elasticsearch-spark-20_2.11     7.6.0

可参考官方文档:Elasticsearch for Apache Hadoop 7.6 » Apache Spark

2.2 代码开发

开发步骤:

创建 SparkContext;

从 ES 中加载数据,分组统计,然后排序;

将排序后的结果保存到 HDFS 中。

示例代码如下:

public static void main(String[] args) {     LOG.info("***** Start to run the Spark on ES test.");     try {         // Create a configuration class SparkConf,         // meanwhile set the Secure configuration that the Elasticsearch Cluster needed,         // finally create a SparkContext.         SparkConf conf = new SparkConf()             .setAppName("SparkOnEs")             .set("es.nodes", esServerHost)             // when you specified in es.nodes, then es.port is not necessary             // .set("es.port", "24100")             .set("es.nodes.discovery", "true")             .set("es.index.auto.create", "true")             .set("es.internal.spark.sql.pushdown", "true")             // 要返回的字段,多个时以“,”分隔,此配置的性能要高于 es.read.field.include             .set("es.read.source.filter", "name,age")             // 滚动查询时,批大小最大值默认为10000             .set("es.scroll.size", "10000")             // 每个分区最多处理100万条数据             .set("es.input.max.docs.per.partition", "1000000");         JavaSparkContext jsc = new JavaSparkContext(conf);         // Group data from ES         groupBySpark(jsc);          jsc.stop();     } catch (IOException e) {         LOG.error("***** There are exceptions in main.", e);     } } /**  * 1. query all data from ES by JavaEsSpark,  * 2. group by specified field and sort the result,  * 3. save the result to HDFS or local files  *  * @param jsc the Java Spark Context which has already initialization  */ public static void groupBySpark(JavaSparkContext jsc) {     long begin = System.currentTimeMillis();     JavaPairRDD pairRDD = JavaEsSpark.esRDD(jsc, esIndex);     // 根据 age 字段进行分组     final String field = "age";     JavaPairRDDresultRdd = pairRDD.mapPartitionsToPair(         new PairFlatMapFunction>, String, Long>() {             @Override             public Iterator call(                 Iterator> iterator) throws Exception {                 List list = new ArrayList<>(10000);                 iterator.forEachRemaining(                     row -> list.add(new Tuple2<>(row._2.get(field).toString(), 1L)));                 return list.iterator();             }         })         .reduceByKey((v1, v2) -> (v1 + v2))         .mapToPair(row -> new Tuple2<>(row._2, row._1))         // 对不同年龄的人数,倒序排序         .sortByKey(false)         .mapToPair(row -> new Tuple2<>(row._2, row._1));     long end = System.currentTimeMillis();     long spentTime = end - begin;     LOG.info("***** GroupBy data from ES successful, spent time: {} ms", spentTime);     resultRdd.saveAsTextFile("/user/spark-on-es/group-result/");     LOG.info("***** Save all result to HDFS successful."); }

3 - 运行任务

3.1 打包项目

在 FI 8.0.0 中下载 Spark 客户端,获取样例代码后解压,用 IDEA 打开:File -> Open,选中项目的 pom.xml 文件 -> OK,即可完成项目的导入。

(1)修改项目中的相关配置,与要测试集群中的信息一致;

(2) 通过 IDEA 自带的 Maven 工具,打包项目,生成 target\SparkOnES-1.0.jar;

(3)将打包生成的 jar 包上传到 Spark 客户端所在的服务器下,这里以 /opt/spark-on-es/ 为例;

(4)将 esParams.properties、user.keytab、krb5.conf 三个文件上传到 /opt/spark-on-es/ 下;

(5)将项目所需的 jar 包上传到 /opt/spark-on-es/libs/ 下。

说明:样例代码运行至少需要如下 jar 包,请从 Elasticsearch 的客户端、Maven 中心仓等处获取相关包。

3.2 client 模式提交 Spark 任务

运行命令如下:

cd /opt/spark-on-es/ # 下述命令为一条命令,其中 /opt/spark-on-es/libs/ 是外部依赖的jar包路径: spark-submit --class com.huawei.bigdata.spark.examples.SparkOnEs \ --master yarn --deploy-mode client \ --jars $(files=(/opt/spark-on-es/libs/*.jar); IFS=,; echo "${files[*]}") \ ./SparkOnEs-1.0.jar

3.3 cluster 模式提交 Spark 任务

运行命令如下:

cd /opt/spark-on-es/ # 下述命令为一条命令,其中 --files 参数指定配置文件: spark-submit --class com.huawei.bigdata.spark.examples.SparkOnEs \ --master yarn --deploy-mode cluster \ --jars $(files=(/opt/spark-on-es/libs/*.jar); IFS=,; echo "${files[*]}") \ --files ./user.keytab,./krb5.conf,./esParams.properties \ --driver-memory 6g \ --executor-cores 5 \ --num-executors 150 \ --executor-memory 5g \ ./SparkOnEs-1.0.jar

3.4 查看运行结果

(1) 查询 Elasticsearch 中的数据:

# 安全模式集群下:kinit认证后,查看Elasticsearch中的index: curl -XGET --tlsv1.2 --negotiate -k -u : 'https://10.10.10.11:24100/_cat/indices?v' # 普通模式集群下:使用http(而非https)查询即可: curl -XGET 'http://10.10.10.11:24100/_cat/indices?v' # 通过下述命令对people索引中的数据进行范围查询: curl -XPOST 'http://10.10.10.11:24100/people/_search?pretty' -H 'Content-Type:application/json' -d ' {     "query": {         "range": {             "createdTime": {"gte": "2010-01-01T00:00:00Z", "lt": "2015-12-31T23:59:59Z"}         }     } }'

注:Elasticsearch相关查询命令,请参考【业务操作指南】-【Elasticsearch】-【Linux下curl命令的使用】。

(2) 查询 HDFS 中的分组文件:

样例代码中将分组结果保存到 HDFS 中,安装客户端、kinit 认证后,可通过下述命令进行统计:

# 查看所有的文件,及其大小: [root@10.10.10.11 spark-on-es]# hdfs dfs -du -s -h /user/spark-on-es/result/* 0         0        /user/spark-on-es/result/_SUCCESS 709       2.1 K    /user/spark-on-es/result/part-00000 3.5 K     10.4 K   /user/spark-on-es/result/part-00001 2.1 K     6.4 K    /user/spark-on-es/result/part-00002 3.1 K     9.4 K    /user/spark-on-es/result/part-00003 ...... # 统计结果集的个数: [root@10.10.10.11 spark-on-es]# hdfs dfs -cat /user/spark-on-es/result/* | wc -l 2000000 # 查看某个文件中的内容: [root@10.10.10.11 spark-on-es]# hdfs dfs -cat /user/spark-on-es/result/part-00000 (573267,99) (1929095,98)

4 - 性能优化方法

此样例代码的性能瓶颈:Spark 读取 Elasticsearch 中全量数据的过程,耗时最久,优化思路有:

1)增加索引的分片个数:elasticsearch-spark 工具读取 Elasticsearch 中的数据时,任务的并行度默认是索引的分片个数,因此分片个数越多,并行度越高;

Elasticsearch 中索引的分片个数不宜太大,此时可通过 es.input.max.docs.per.partition 参数规划 Spark 读取 Elasticsearch 中数据的 Partition 个数,也可提升并行度。(详见示例代码)

2)增大 scroll.size 的值:elasticsearch-spark 工具通过 Scroll 滚动读取数据,其大小默认是50,可以提高至10000,建议不高于50000,否则容易产生 OOM;

3)合理使用 Yarn 资源:参考 《产品文档-业务操作指南-Yarn-性能调优-节点配置调优》 中的说明,合理设置 Yarn 的资源,以 cluster 模式运行任务时,应适当修改 spark-submit 的参数:

Spark 操作 Elasticsearch 性能优化

--driver-memory 6g     ## Driver的内存大小 --executor-cores 5     ## 每个Executor可用的 CPU 核数 --num-executors 150    ## Executor的个数,num-executors * executor-cores 不能超过 Yarn的VCores数 --executor-memory 5g   ## 每个Executor的内存大小,num-executors * executor-memory 不能超过Yarn的最大内存

5 - 参考资料

Elasticsearch for Apache Hadoop 7.6 » Configuration

6 - 其他代码参考

通过 Scroll 滚动查询大批量数据的逻辑: /** * Query data from ES by ES rest client */ private static void queryDataByRestHighLevelClient(JavaSparkContext jsc) { LOG.info("=====> Query data from ES by Rest High Level Client beginning..."); LOG.info("=====> Query string: {}", esQueryJsonString); long begin = System.currentTimeMillis(); List> resultMap = new ArrayList<>(1024 * 100); try { // query data by scroll api, avoid the OOM SearchRequest searchRequest = new SearchRequest(index); final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L)); searchRequest.scroll(scroll); // set the size of result, note: if the number of size was too large, may cause the OOM SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(2000); searchSourceBuilder.query(QueryBuilders.rangeQuery(esQueryField).gte(esQueryRangeBegin).lt(esQueryRangeEnd)); String[] includeFields = new String[] {"id", "name", "birthday"}; String[] excludeFields = new String[] {"age"}; searchSourceBuilder.fetchSource(includeFields, excludeFields); searchRequest.source(searchSourceBuilder); SearchResponse searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT); String scrollId = searchResponse.getScrollId(); SearchHit[] searchHits = searchResponse.getHits().getHits(); while (searchHits != null && searchHits.length > 0) { for (SearchHit hit : searchHits) { Map source = hit.getSourceAsMap(); resultMap.add(source); } // continue scroll search SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId); scrollRequest.scroll(scroll); searchResponse = highLevelClient.scroll(scrollRequest, RequestOptions.DEFAULT); scrollId = searchResponse.getScrollId(); searchHits = searchResponse.getHits().getHits(); } } catch (Exception e) { LOG.error("***** Query data failed, exception occurred.", e); } JavaRDD> rdd = jsc.parallelize(resultMap); long end = System.currentTimeMillis(); long spentTime = end - begin; LOG.info("=====> Query data from ES by Rest High Level Client, rdd's count: {}, spent time: {} ms", rdd.count(), spentTime); } 通过 bulkPut 创建大量测试数据的逻辑: /** * Put data by a bulk request * * @param restClient the Client of Elasticsearch */ private static void putDataByBulk(RestClient restClient) { LOG.info("***** Bulk put data beginning..."); // total number of documents need to index long totalRecordNum = 100000; // number of document per bulk request long oneCommit = 500; long circleNumber = totalRecordNum / oneCommit; StringEntity entity; Gson gson = new Gson(); Map esMap = new HashMap<>(); String str = "{ \"index\" : { \"_index\" : \"" + index + "\", \"_type\" : \"" + type + "\"} }"; for (int i = 0; i < circleNumber; i++) { StringBuilder builder = new StringBuilder(); for (int j = 1; j <= oneCommit; j++) { esMap.clear(); esMap.put("id", (i * oneCommit + j) + ""); esMap.put("name", getName()); esMap.put("age", ThreadLocalRandom.current().nextInt(1, 30)); esMap.put("birthday", getBirthday()); /* esMap.clear(); id = i * oneCommit + j + ""; esMap.put("id", id); esMap.put("name", "name-" + id); esMap.put("age", ThreadLocalRandom.current().nextInt(1, 30)); esMap.put("birthday", new Date()); */ String strJson = gson.toJson(esMap); builder.append(str).append("\n"); builder.append(strJson).append("\n"); } entity = new StringEntity(builder.toString(), ContentType.APPLICATION_JSON); entity.setContentEncoding("UTF-8"); Response response; try { Request request = new Request("PUT", "/_bulk"); request.addParameter("pretty", "true"); request.setEntity(entity); response = restClientTest.performRequest(request); if (HttpStatus.SC_OK == response.getStatusLine().getStatusCode()) { LOG.info("***** Already input documents: " + oneCommit * i); } else { LOG.error("***** Bulk failed."); } LOG.info("Bulk response entity is : " + EntityUtils.toString(response.getEntity())); } catch (Exception e) { LOG.error("Bulk failed, exception occurred.", e); } } } // 较真实的数据 private static String[] firstName = {"Jenny", "James", "Linda", "Judy", "Karen", "Kelly", "Margaret", "Rose", "Nora", "Wendy"}; private static String[] lastName = {"Abel", "Abraham", "Kent", "Brown", "White", "Cotton", "Hawk", "George", "Henry", "David"}; private static String getName() { int index = ThreadLocalRandom.current().nextInt(0, 10); return firstName[index] + " " + lastName[index]; } private static String getBirthday() { ThreadLocalRandom random = ThreadLocalRandom.current(); int year = random.nextInt(1990, 2021); int month = random.nextInt(1, 13); int day = random.nextInt(1, 29); int hour = random.nextInt(0, 24); int minute = random.nextInt(0, 60); int second = random.nextInt(0, 60); LocalDateTime time = LocalDateTime.of(year, Month.of(month), day, hour, minute, second); DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); return formatter.format(time); } }

MapReduce服务 spark Elasticsearch

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:MindSpore21天实战营(5)使用PyCharm Kit进行基于Wide&Deep实现CTR预估实战
下一篇:Elasticsearch 6.3.2 启动过程
相关文章