【云知易】Elasticsearch服务 入门 01 快速开始使用ES
999
2022-05-30
1 - 背景说明
elasticsearch 在对大批量数据进行统计、聚合等操作时,性能差,主要原因有:
ES 是通过 批量加载数据到内存中,然后进行计算的,其 scroll.size 的默认最大值为 10000,超过此值就会报错 —— 需要修改配置文件;
ES 使用 JVM 堆内存进行计算,但官方建议单个 ES 实例的堆内存要低于 32 GB(不能等于),否则将有资源的浪费、性能的损耗 —— 主要与 JVM 的指针压缩算法有关。
基于此,在大批量数据下的统计、聚合、排序等场景,可借助 Spark 提升运算性能。
2 - 开发方法
2.1 引入依赖
ES 官方提供了一款工具:elasticsearch-hadoop,通过 Maven 引入即可使用。
可参考官方文档:Elasticsearch for Apache Hadoop 7.6 » Apache Spark
2.2 代码开发
开发步骤:
创建 SparkContext;
从 ES 中加载数据,分组统计,然后排序;
将排序后的结果保存到 HDFS 中。
示例代码如下:
public static void main(String[] args) { LOG.info("***** Start to run the Spark on ES test."); try { // Create a configuration class SparkConf, // meanwhile set the Secure configuration that the Elasticsearch Cluster needed, // finally create a SparkContext. SparkConf conf = new SparkConf() .setAppName("SparkOnEs") .set("es.nodes", esServerHost) // when you specified in es.nodes, then es.port is not necessary // .set("es.port", "24100") .set("es.nodes.discovery", "true") .set("es.index.auto.create", "true") .set("es.internal.spark.sql.pushdown", "true") // 要返回的字段,多个时以“,”分隔,此配置的性能要高于 es.read.field.include .set("es.read.source.filter", "name,age") // 滚动查询时,批大小最大值默认为10000 .set("es.scroll.size", "10000") // 每个分区最多处理100万条数据 .set("es.input.max.docs.per.partition", "1000000"); JavaSparkContext jsc = new JavaSparkContext(conf); // Group data from ES groupBySpark(jsc); jsc.stop(); } catch (IOException e) { LOG.error("***** There are exceptions in main.", e); } } /** * 1. query all data from ES by JavaEsSpark, * 2. group by specified field and sort the result, * 3. save the result to HDFS or local files * * @param jsc the Java Spark Context which has already initialization */ public static void groupBySpark(JavaSparkContext jsc) { long begin = System.currentTimeMillis(); JavaPairRDD
3 - 运行任务
3.1 打包项目
在 FI 8.0.0 中下载 Spark 客户端,获取样例代码后解压,用 IDEA 打开:File -> Open,选中项目的 pom.xml 文件 -> OK,即可完成项目的导入。
(1)修改项目中的相关配置,与要测试集群中的信息一致;
(2) 通过 IDEA 自带的 Maven 工具,打包项目,生成 target\SparkOnES-1.0.jar;
(3)将打包生成的 jar 包上传到 Spark 客户端所在的服务器下,这里以 /opt/spark-on-es/ 为例;
(4)将 esParams.properties、user.keytab、krb5.conf 三个文件上传到 /opt/spark-on-es/ 下;
(5)将项目所需的 jar 包上传到 /opt/spark-on-es/libs/ 下。
说明:样例代码运行至少需要如下 jar 包,请从 Elasticsearch 的客户端、Maven 中心仓等处获取相关包。
3.2 client 模式提交 Spark 任务
运行命令如下:
cd /opt/spark-on-es/ # 下述命令为一条命令,其中 /opt/spark-on-es/libs/ 是外部依赖的jar包路径: spark-submit --class com.huawei.bigdata.spark.examples.SparkOnEs \ --master yarn --deploy-mode client \ --jars $(files=(/opt/spark-on-es/libs/*.jar); IFS=,; echo "${files[*]}") \ ./SparkOnEs-1.0.jar
3.3 cluster 模式提交 Spark 任务
运行命令如下:
cd /opt/spark-on-es/ # 下述命令为一条命令,其中 --files 参数指定配置文件: spark-submit --class com.huawei.bigdata.spark.examples.SparkOnEs \ --master yarn --deploy-mode cluster \ --jars $(files=(/opt/spark-on-es/libs/*.jar); IFS=,; echo "${files[*]}") \ --files ./user.keytab,./krb5.conf,./esParams.properties \ --driver-memory 6g \ --executor-cores 5 \ --num-executors 150 \ --executor-memory 5g \ ./SparkOnEs-1.0.jar
3.4 查看运行结果
(1) 查询 Elasticsearch 中的数据:
# 安全模式集群下:kinit认证后,查看Elasticsearch中的index: curl -XGET --tlsv1.2 --negotiate -k -u : 'https://10.10.10.11:24100/_cat/indices?v' # 普通模式集群下:使用http(而非https)查询即可: curl -XGET 'http://10.10.10.11:24100/_cat/indices?v' # 通过下述命令对people索引中的数据进行范围查询: curl -XPOST 'http://10.10.10.11:24100/people/_search?pretty' -H 'Content-Type:application/json' -d ' { "query": { "range": { "createdTime": {"gte": "2010-01-01T00:00:00Z", "lt": "2015-12-31T23:59:59Z"} } } }'
注:Elasticsearch相关查询命令,请参考【业务操作指南】-【Elasticsearch】-【Linux下curl命令的使用】。
(2) 查询 HDFS 中的分组文件:
样例代码中将分组结果保存到 HDFS 中,安装客户端、kinit 认证后,可通过下述命令进行统计:
# 查看所有的文件,及其大小: [root@10.10.10.11 spark-on-es]# hdfs dfs -du -s -h /user/spark-on-es/result/* 0 0 /user/spark-on-es/result/_SUCCESS 709 2.1 K /user/spark-on-es/result/part-00000 3.5 K 10.4 K /user/spark-on-es/result/part-00001 2.1 K 6.4 K /user/spark-on-es/result/part-00002 3.1 K 9.4 K /user/spark-on-es/result/part-00003 ...... # 统计结果集的个数: [root@10.10.10.11 spark-on-es]# hdfs dfs -cat /user/spark-on-es/result/* | wc -l 2000000 # 查看某个文件中的内容: [root@10.10.10.11 spark-on-es]# hdfs dfs -cat /user/spark-on-es/result/part-00000 (573267,99) (1929095,98)
4 - 性能优化方法
此样例代码的性能瓶颈:Spark 读取 Elasticsearch 中全量数据的过程,耗时最久,优化思路有:
1)增加索引的分片个数:elasticsearch-spark 工具读取 Elasticsearch 中的数据时,任务的并行度默认是索引的分片个数,因此分片个数越多,并行度越高;
Elasticsearch 中索引的分片个数不宜太大,此时可通过 es.input.max.docs.per.partition 参数规划 Spark 读取 Elasticsearch 中数据的 Partition 个数,也可提升并行度。(详见示例代码)
2)增大 scroll.size 的值:elasticsearch-spark 工具通过 Scroll 滚动读取数据,其大小默认是50,可以提高至10000,建议不高于50000,否则容易产生 OOM;
3)合理使用 Yarn 资源:参考 《产品文档-业务操作指南-Yarn-性能调优-节点配置调优》 中的说明,合理设置 Yarn 的资源,以 cluster 模式运行任务时,应适当修改 spark-submit 的参数:
--driver-memory 6g ## Driver的内存大小 --executor-cores 5 ## 每个Executor可用的 CPU 核数 --num-executors 150 ## Executor的个数,num-executors * executor-cores 不能超过 Yarn的VCores数 --executor-memory 5g ## 每个Executor的内存大小,num-executors * executor-memory 不能超过Yarn的最大内存
5 - 参考资料
Elasticsearch for Apache Hadoop 7.6 » Configuration
6 - 其他代码参考
通过 Scroll 滚动查询大批量数据的逻辑: /** * Query data from ES by ES rest client */ private static void queryDataByRestHighLevelClient(JavaSparkContext jsc) { LOG.info("=====> Query data from ES by Rest High Level Client beginning..."); LOG.info("=====> Query string: {}", esQueryJsonString); long begin = System.currentTimeMillis(); List
MapReduce服务 spark Elasticsearch
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。