Hive RuntimeFilter

网友投稿 614 2022-05-30

1      介绍

select *

from store join store_sales on (store.id   = store_sales.store_id)

where store.s_store_name = 'My Store'

在这个语句中,事实表store_sales与维度表store通过store_id进行jion,维度表store通过s_store_name进行过滤。Store表经过过滤后得到的记录条数可能非常少,store_id的值也非常少。所以可以将store_id的最大最小值,及bloomFilter作为store_sales表的过滤条件,过滤条件甚至可以下推至存储层,减少数据的读取IO。

Hive Runtime Filter的优化是基于动态分区剪裁优化,推荐先阅读《Hive 动态分区剪裁原理》之后再阅读本文。

2      Runtime filter使用

针对Hive3.1.0版本,有如下的参数与Runtime Filter有关。需要注意的是,只有开启动态分区剪裁才能开启runtime filter。

参数名

默认值

描述

hive.tez.dynamic.partition.pruning

true

动态分区剪裁

hive.tez.dynamic.semijoin.reduction

true

是否开启runtime   filter

hive.tez.min.bloom.filter.entries

1000000L

Bloom过滤器最小条数

hive.tez.max.bloom.filter.entries

100000000L

Bloom过滤器最大条数

hive.tez.bloom.filter.factor

1.0

hive.tez.bigtable.minsize.semijoin.reduction

100000000L

大表的最小值

hive.tez.dynamic.semijoin.reduction.threshold

0.5

计算的cost大于0.5才会执行runtime filter

3      原理

3.1      物理优化

Runtime filter的逻辑优化与动态分区剪裁的逻辑优化相同,都是先合成动态filter。这里不再赘述。Runtime filter的物理优化发生在DynamicPartitionPruningOptimization类中,当动态过滤条件的列是分区列时,执行动态分区剪裁优化,如果只是普通的列,就会执行runtime filter优化。

如上图所示,当判断列为非分区字段时,执行runtime filter优化。针对每一个dynamic value filter,hive执行以下操作:

1.         取出当前filter的predicate表达式中的所有列,遍历所有列,如果是非分区列执行以下runtimefilter优化,否则执行动态分区剪裁优化。

2.         给动态值表达式指向的RS的父节点增加runtime filter分支。分支的内容包括:SelectOperator、GroupByOperator、ReduceSinkOperator。其中GroupByOperator会通过聚合函数计算列的最大值、最小值、和bloom filter。Reduce Operator会将最大值、最小值和bloom filter序列化发送给TableScanOperator。图中用虚线表示runtime filter分支与TableScanOperator的关系,实际上在Operator树中,这两者并没有父子关系。在tez生成task时才会将两者连接起来。

3.         替换TableScanOperator的predicate为min,max和bloomfilter的表达式。min,max和bloomfilter的值在编译阶段都是不可知的,hive使用ExprNodeDynamicValueDesc对象表示动态的值,在运行时可通过该对象得到真实的值。

4.         FilterOperator的predicate设置为true,后续优化将会把predicate的filter移除。

3.2      运行时

Runtime filter在运行时包含两个部分:

1.         Runtime filter分支会生成一个reducer,该reducer运行GroupByOperator,通过定义的min、max、bloomfilter三个聚合函数生成三个值。随后的ReducerSinkOperator会将这三个值序列化。

2.         Mapper在运行TableScanOperator之前会执行初始化操作,初始化操作中会等待runtime filter的输入,并且将runtime filter的输入反序列化为min,max和bloomfilter对象。在构建reader时,会将min,max下推到存储层。而bloom filter作为TableScanOperator的过滤条件。

这里有两个问题需要解决:

l  Mapper如何知道其需要等待哪个runtime filter的输入?

在tez生成task阶段会在Mapper task中注入runtime filter的信息,mapper初始化时拿到这个信息就可以知道需要等待的runtime filter了。

l  TableScanOperator的predicate使用了ExprNodeDynamicValueDesc对象来表示动态的值,那么在运行时如何知道真实的值?

每一个runtime filter分支都由唯一key标识,runtime filter的三个输出由key_min, key_max, key_bloomfilter标识。Mapper在初始化时,会将获取的runtime filter三个值存储到cache中,而ExprNodeDynamicValueDesc对象中也保存了一个key,在计算ExprNodeDynamicValueDesc对象的值时,起始是通过key从cache中读取相应的值。

4      缺陷

目前hive runtime filter和dynamic partition pruning都只是两张表直接通过join生成的,如果多张表通过多个jion间接关联,是否也可以生成runtime filter和动态分区剪裁呢?这可能是hive优化的一个方向。

5      参考文档

Hive RuntimeFilter

1.     https://issues.apache.org/jira/browse/HIVE-15269

大数据 EI企业智能

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:TCL中关于Pins的一些使用方法?
下一篇:2021年,财务软件的未来更具想象力
相关文章