Hive RuntimeFilter

网友投稿 629 2022-05-30

1      介绍

select *

from store join store_sales on (store.id   = store_sales.store_id)

where store.s_store_name = 'My Store'

在这个语句中,事实表store_sales与维度表store通过store_id进行jion,维度表store通过s_store_name进行过滤。Store表经过过滤后得到的记录条数可能非常少,store_id的值也非常少。所以可以将store_id的最大最小值,及bloomFilter作为store_sales表的过滤条件,过滤条件甚至可以下推至存储层,减少数据的读取IO。

Hive Runtime Filter的优化是基于动态分区剪裁优化,推荐先阅读《Hive 动态分区剪裁原理》之后再阅读本文。

2      Runtime filter使用

针对Hive3.1.0版本,有如下的参数与Runtime Filter有关。需要注意的是,只有开启动态分区剪裁才能开启runtime filter。

参数名

默认值

描述

hive.tez.dynamic.partition.pruning

true

动态分区剪裁

hive.tez.dynamic.semijoin.reduction

true

Hive RuntimeFilter

是否开启runtime   filter

hive.tez.min.bloom.filter.entries

1000000L

Bloom过滤器最小条数

hive.tez.max.bloom.filter.entries

100000000L

Bloom过滤器最大条数

hive.tez.bloom.filter.factor

1.0

hive.tez.bigtable.minsize.semijoin.reduction

100000000L

大表的最小值

hive.tez.dynamic.semijoin.reduction.threshold

0.5

计算的cost大于0.5才会执行runtime filter

3      原理

3.1      物理优化

Runtime filter的逻辑优化与动态分区剪裁的逻辑优化相同,都是先合成动态filter。这里不再赘述。Runtime filter的物理优化发生在DynamicPartitionPruningOptimization类中,当动态过滤条件的列是分区列时,执行动态分区剪裁优化,如果只是普通的列,就会执行runtime filter优化。

如上图所示,当判断列为非分区字段时,执行runtime filter优化。针对每一个dynamic value filter,hive执行以下操作:

1.         取出当前filter的predicate表达式中的所有列,遍历所有列,如果是非分区列执行以下runtimefilter优化,否则执行动态分区剪裁优化。

2.         给动态值表达式指向的RS的父节点增加runtime filter分支。分支的内容包括:SelectOperator、GroupByOperator、ReduceSinkOperator。其中GroupByOperator会通过聚合函数计算列的最大值、最小值、和bloom filter。Reduce Operator会将最大值、最小值和bloom filter序列化发送给TableScanOperator。图中用虚线表示runtime filter分支与TableScanOperator的关系,实际上在Operator树中,这两者并没有父子关系。在tez生成task时才会将两者连接起来。

3.         替换TableScanOperator的predicate为min,max和bloomfilter的表达式。min,max和bloomfilter的值在编译阶段都是不可知的,hive使用ExprNodeDynamicValueDesc对象表示动态的值,在运行时可通过该对象得到真实的值。

4.         FilterOperator的predicate设置为true,后续优化将会把predicate的filter移除。

3.2      运行时

Runtime filter在运行时包含两个部分:

1.         Runtime filter分支会生成一个reducer,该reducer运行GroupByOperator,通过定义的min、max、bloomfilter三个聚合函数生成三个值。随后的ReducerSinkOperator会将这三个值序列化。

2.         Mapper在运行TableScanOperator之前会执行初始化操作,初始化操作中会等待runtime filter的输入,并且将runtime filter的输入反序列化为min,max和bloomfilter对象。在构建reader时,会将min,max下推到存储层。而bloom filter作为TableScanOperator的过滤条件。

这里有两个问题需要解决:

l  Mapper如何知道其需要等待哪个runtime filter的输入?

在tez生成task阶段会在Mapper task中注入runtime filter的信息,mapper初始化时拿到这个信息就可以知道需要等待的runtime filter了。

l  TableScanOperator的predicate使用了ExprNodeDynamicValueDesc对象来表示动态的值,那么在运行时如何知道真实的值?

每一个runtime filter分支都由唯一key标识,runtime filter的三个输出由key_min, key_max, key_bloomfilter标识。Mapper在初始化时,会将获取的runtime filter三个值存储到cache中,而ExprNodeDynamicValueDesc对象中也保存了一个key,在计算ExprNodeDynamicValueDesc对象的值时,起始是通过key从cache中读取相应的值。

4      缺陷

目前hive runtime filter和dynamic partition pruning都只是两张表直接通过join生成的,如果多张表通过多个jion间接关联,是否也可以生成runtime filter和动态分区剪裁呢?这可能是hive优化的一个方向。

5      参考文档

1.     https://issues.apache.org/jira/browse/HIVE-15269

大数据 EI企业智能

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:TCL中关于Pins的一些使用方法?
下一篇:2021年,财务软件的未来更具想象力
相关文章