HBase快速入门系列(8) | 一文教你HBase与Hive如何集成
794
2022-05-30
过滤器在get和scan的基础上,进行进一步的过滤,如列名、具体值等。Hbase提供了很多自带的实现类,也可以自定义filter。
谓词下推(predicate push down),所有的过滤器都在服务端生效,所以过滤掉的数据不会传到客户端。使用者的自己的代码实现也尽量不要做客户端的过滤。
过滤器每region/scan一个实例
通用接口为org.apache.hadoop.hbase.filter.Filter,已有的接口实现中:
大部分实体过滤器类继承自org.apache.hadoop.hbase.filter.FilterBase
还有一组继承自org.apache.hadoop.hbase.filter.CompareFilter,比FilterBase多一个compare()方法
其他的接口实现可以参考Filter接口的API说明
CompareFilter需要两个参数,一个是CompareFilter.CompareOp,即比较运算符;一个是WritableByteArrayComparable,即比较器。
语义上,比较过滤器是返回成功匹配的值,和hbase过滤器原有的目的(筛掉无用信息)不同
枚举类型
WritableByteArrayComparable类,实现了org.apache.hadoop.io.Writable和Comparable
接口。Hbase自带了几个已实现的子类:
基于行健过滤数据,比较过程中,是按照字典顺序排序的,比如筛选小于“row2”的行,会返回row1、row11、row100等,比较常见的避免这种语义上的差别的方法,就是存的时候补位数据。
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "t1");
Scan scan = new Scan(Bytes.toBytes("row0"), Bytes.toBytes("row9"));
Filter f1 = new RowFilter(CompareOp.LESS, new BinaryComparator(Bytes.toBytes("row2")));
scan.setFilter(f1);
ResultScanner rs = table.getScanner(scan);
for (Result r : rs) {
System.out.println(r);
}
rs.close();
System.out.println("===");
Filter f2 = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("row[1,3]"));
scan.setFilter(f2);
ResultScanner rs2 = table.getScanner(scan);
for (Result r : rs2) {
System.out.println(r);
}
rs2.close();
System.out.println("===");
Filter f3 = new RowFilter(CompareOp.EQUAL, new SubstringComparator("ro"));
scan.setFilter(f3);
ResultScanner rs3 = table.getScanner(scan);
for (Result r : rs3) {
System.out.println(r);
}
rs3.close();
table.close();
与行过滤器使用方式类似,只不过用来比较列族
new FamilyFilter(CompareOp.LESS, new BinaryComparator(Bytes.toBytes("f2")));
过滤特定列
new QualifierFilter(CompareOp.LESS, new BinaryComparator(Bytes.toBytes("c2")));
常用的方式是与RegexStringComparator或SubstringComparator配合使用
new ValueFilter(CompareOp.EQUAL,new RegexStringComparator("v*2"));
指定一个列作为基准,过滤其他列,过滤条件是基准列的时间戳。这个过滤器是基于列值进行筛选的,也就是说,可以理解成一个ValueFilter和时间戳过滤器的组合。这个过滤器与scan.setBatch不兼容,因为可能会导致取不到基准列的值。
dropDependentColumn参数可以控制是否丢弃过滤掉的数据,从实测结果来看,基准列本身不会被查出来,除非dropDependentColumn=false
new DependentColumnFilter(Bytes.toBytes("f1"), Bytes.toBytes("c1"), true);
还有有几种可选的构造函数,不同范围的过滤。
Hbase第二类过滤器是继承自FilterBase,部分过滤器只适用于scan,因为用在get上,会要么包含整行,要么都不包含
用一列的值判断本行数据是否整体过滤掉。SingleColumnValueFilter使用了比较过滤器类似的参数风格,但是注意,并没有继承关系。
new SingleColumnValueFilter(Bytes.toBytes("f1"), Bytes.toBytes("c1"), CompareOp.EQUAL, new RegexStringComparator("v*1*"));
当参考列不存在时,默认这行是包含在结果中的,可以使用setFilterIfMissing方法排除。
默认检查参考列的最新版本,可以使用setLatestVersionOnly(false)方法检查所有版本。
继承SingleColumnValueFilter,略不同的语义是参考列不被包含到结果中。
构造一个前缀,匹配前缀的行会返回客户端。也是按字典顺序查找。一般scan的时候使用。
new PrefixFilter(Bytes.toBytes("row1"));
会返回row1开头的行
指定pageSize参数后,可以对结果进行分页。其实就是过滤返回的行数,下一行的位置需要客户端来维护。一次扫描的结果可能大于分页大小,因为这个过滤器是分别作用于不同的regionserver的,并行执行不能共享他们现在的状态和边界,可能每个server上都获取分页大小的数据。所以客户端程序要处理这种情况,如果需要的话。
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "t1");
final byte[] POSTFIX = new byte[] { 0x00 };
Filter filter = new PageFilter(2);
int totalRows = 0;
byte[] lastRow = null;
while (true) {
System.out.println("=======");
Scan scan = new Scan();
scan.setFilter(filter);
if (lastRow != null) {
// 加一个最小的增量new byte[] { 0x00 };
byte[] startRow = Bytes.add(lastRow, POSTFIX);
System.out.println("start row: " + Bytes.toStringBinary(startRow));
scan.setStartRow(startRow);
}
ResultScanner scanner = table.getScanner(scan);
int localRows = 0;
Result result;
while ((result = scanner.next()) != null) {
System.out.println(localRows++ + ": " + result);
totalRows++;
lastRow = result.getRow();
}
scanner.close();
if (localRows == 0)
break;
}
System.out.println("total rows: " + totalRows);
table.close();
针对只需要key的场景,这个过滤器可以只返回KV中的key,而把value覆写成为空。
构造函数KeyOnlyFilter(boolean lenAsVal) 可以改变覆写策略。无参构造函数默认为false,即覆写为长度为0的字节数组,而设置为true时,value会被覆写为原值长度的字节数组,这个长度可以用来做二次排序或其他场景。
这个过滤器只返回每行的第一个KV,排序是Hbase的隐式排序。
一般用在行数统计的场景,因为列式数据库中,某行存在,则这一行必定有列。因为检查完第一列的时候,过滤器框架就会通知region server结束本行的扫描,并跳到下一行,所以比全表扫描有很大的性能提升。
scan的范围是[startrow, stoprow),使用这个过滤器可以包含最后一行,同时也定义了scan的stoprow,如下面的代码是从表开始,扫描到row2,且包含row2
Filter f = new InclusiveStopFilter(Bytes.toBytes("row2"));
scan.setFilter(f);
ResultScanner rs = table.getScanner(scan);
命名是时间戳,实际上是版本的控制,如下面代码返回两个特定版本的值
FilterList ts = Arrays.asList(new long [] {1,3});
FilterFilter f = new TimestampsFilter(ts);
Filter也支持和scan的setTimeRange方法联合缩小范围。
限制每行最多取回多少列,列数达到设定的值时,过滤器会停止整个扫描,所以一般不和scan配合使用,更适合get。列数可以直接在构造函数中设置
new ColumnCountGetFilter(2);
与PageFilter类似的功能,不过是在列上实现数目的限制返回。
ColumnPaginationFilter(int limit, int offset)
构造函数有两个参数,就是返回偏移量在[offset, limit]的列。
与PrefixFilter类似,只不过作用在列上,返回所有前缀匹配的列
结果包含的行是随机的。构造函数RandomRowFilter(float chance) 会传入一个chance,取值范围在0~1,内部是用了Java的Random.nextFloat()方法和chance的比较结果,来决定一行是否过滤掉,所以,如果chance<0则查询结果全部过滤掉,而chance>1则会包含所有结果。
所以这个过滤器一般可以用于采样,参数chance其实就是采样比,数值越大,留下的数据越多。
这类过滤器采用装饰者模式,可以装饰在其他过滤器上使用。
包装一个过滤器F,如果过滤器F检查任何一个KV不满足条件的时候,包装成SkipFilter就会把这个KV的整行过滤掉。被包装的过滤器必须实现filterKeyValue()方法,因为SkipFilter是判断这个方法的结果来决定如何处理这一行的,所以和部分Filter不兼容。后面会有总结。
如下面代码只会返回所有列值都大于value1的行:
Filter f1 = new ValueFilter(CompareOp.GREATER,new BinaryComparator(Bytes.toBytes("value1")));
Filter f = new SkipFilter(f1);
这个包装后,一旦发现不符合包装过滤器F的条件,就终止scan,这之前的结果回返回客户端。下面的代码,如果不加这个过滤器,会返回row2之外的所有行,加上之后,扫描到row2就停止了,所以只会扫描row2之前的行。
Filter f1 = new RowFilter(CompareOp.NOT_EQUAL,new BinaryComparator(Bytes.toBytes("row2")));
Filter f = new WhileMatchFilter(f1);
自定义Filter一般继承FilterBase类,也可以继承Filter接口,前者把后者所有的方法提供了默认实现,按需覆写即可。
Filter接口中有个枚举Filter.ReturnCode,被filterKeyValue()方法用于通知执行框架,决定如何执行下一步。
Filter接口定义了若干方法,在客户端的检索操作的不同阶段调用,按下面顺序执行:
1. filterRowKey(byte[],int,int):返回true,则丢弃此行。
2. filterKeyValue(KeyValue):上面没有被过滤掉,检查KeyValue按照 Filter.ReturnCode处理当前值
3. filterRow(List
kvs): 让用户可以访问上两个方法筛选后的KV实例。DependentColumnFilter过滤器用这个方法来过滤与基准列不匹配的数据。
4. filterRow():最后一道判断是否过滤掉行。PageFilter使用当前方法检查一次迭代分页中返回的行数是否达到预期分页大小,如果达到返回true。默认返回false,即结果包含当前行。
5. reset() :迭代扫描中,为每个新行重置过滤器。服务端读一行数据后,此方法被隐式调用。
6. filterAllRemaining():返回true,则整个scan结束。返回false继续执行,主要用户提前结束的优化场景
注意,使用filterRow(List
kvs)或filterRow(),必须重载hasRowFilter()方法,并返回true。框架用这个标志保证过滤器和scan操作的各个参数的兼容。当扫描使用batch时,之前方法不会在每次batch操作时调用,而是在当前行数据结束时被调用。
自定义Filter编译成jar包后,上传到region server上,并在hbase-env.sh的HBASE_CLASSPATH配置上jar包的路径。重启hbase生效。
代码样例:
public class CustomFilter extends FilterBase {
private byte[] value = null;
private boolean filterRow = true;
public CustomFilter(byte[] value) {
// 设置要比较的值
this.value = value;
}
@Override
public void reset() {
// 每个新行都重置
this.filterRow = true;
}
@Override
public ReturnCode filterKeyValue(KeyValue kv) {
if (Bytes.compareTo(value, kv.getValue()) == 0) {
// 策略是先包含进来,在filterRow判断是否过滤掉行
filterRow = false;
}
return ReturnCode.INCLUDE;
}
@Override
public boolean filterRow() {
return filterRow;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
// 设定值写入DataOutput中,服务端实例化Filter时可以读到要比较的这个value
Bytes.writeByteArray(dataOutput,this.value);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
// 服务端用这个方法初始化Filter实例,比较值设定进来
this.value = Bytes.readByteArray(dataInput);
}
public static void main(String[] args) throws IOException {
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "t1");
Scan scan = new Scan(Bytes.toBytes("row0"), Bytes.toBytes("row9"));
Filter filter = new CustomFilter(Bytes.toBytes("value1"));
scan.setFilter(filter);
ResultScanner rs = table.getScanner(scan);
for (Result r : rs) {
System.out.println(r);
}
table.close();
}
}
运行时报错//TODO
2015-09-10 11:30:08,588 WARN org.apache.hadoop.ipc.HBaseServer: Unable to read call parameters for client 11.13.1.30
java.io.IOException: Error in readFields
FilterList也实现了Filter接口,所以使用方式相同。但是FilterList提供的是一种多个过滤器组合的方式使用。有几种构造函数
FilterList(Filter... rowFilters)
FilterList(FilterList.Operator operator)
FilterList(FilterList.Operator operator, Filter... rowFilters)
FilterList(FilterList.Operator operator, List
rowFilters)
FilterList(List
rowFilters)
核心的参数其实就是两个,一个是组合逻辑FilterList.Operator,一个是需要组合的filter集合。FilterList.Operator是个枚举类型,默认是FilterList.Operator.MUST_PASS_ALL,即所有过滤器都要通过才保留结果。可以改为FilterList.Operator.MUST_PASS_ONE。
可以控制List中的Filter添加顺序去保证过滤器的执行顺序,如使用ArrayList就可以精准的控制过滤器执行顺序是添加顺序。
[a] Filter supports Scan.setBatch(), i.e., the scanner batch mode.
[b] Filter can be used with the decorating SkipFilter class.
[c] Filter can be used with the decorating WhileMatchFilter class.
[d] Filter can be used with the combining FilterList class.
[e] Filter has optimizations to stop a scan early, once there are no more matchingrows ahead.
[f] Filter can be usefully applied to Get instances.
[g] Filter can be usefully applied to Scan instances.
[h] Depends on the included filters.
转载请注明出处:华为云博客 https://portal.hwclouds.com/blogs
hbase
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。