HBase 读写过程中的线程池模型原理实现

网友投稿 1130 2022-05-30

【简介】

随着大数据的发展,HBase已经成为大数据高性能读写场景不可或缺的组件,HBase也也提供了丰富的API,应用开发也相对简单,如何在开发时准确使用API就显得比较重要了,如果使用不准确,往往会与设计者背到而驰,并且造成应用访问抖动、资源泄露等问题,等等……

HBase socket连接池原理实现

首先我们来看下HBase的API文档当中对Connection的定义是:A cluster connection encapsulating lower level individual connections to actual servers and a connection to zookeeper.,这句话的理解就是和regionserver(包括HMaster)以及zookeeper直间的连接,总结起来就是如下图:

HBase 读写过程中的线程池模型原理实现

所以对一个connection实列来讲,如果保持单例模式,和regionserver之间的socket连接最大就是五个,并且每次socket连接就不会再次创建,除非这个链接长时间(默认两分钟)和regionserver之间不进行交互,那么client端会自动关闭这个链接,应用开发时一般使用如下方法创建connection

connection = ConnectionFactory.createConnection(conf);

除了此种方式创建连接外,hbase还可通过ConnectionManager获取连接,不过此种方式就要注意,一定要保持conf对象单例,否则也就是不断创建新的连接

conn = HConnectionManager.getConnection(conf);

static ClusterConnection getConnectionInternal(final Configuration conf) throws IOException { HConnectionKey connectionKey = new HConnectionKey(conf); synchronized (CONNECTION_INSTANCES) { HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey); if (connection == null) { connection = (HConnectionImplementation)createConnection(conf, true); CONNECTION_INSTANCES.put(connectionKey, connection); } else if (connection.isClosed()) { ConnectionManager.deleteConnection(connectionKey, true); connection = (HConnectionImplementation)createConnection(conf, true); CONNECTION_INSTANCES.put(connectionKey, connection); } connection.incCount(); return connection; } } HConnectionKey(Configuration conf) { Map m = new HashMap(); if (conf != null) { for (String property : CONNECTION_PROPERTIES) { String value = conf.get(property); if (value != null) { m.put(property, value); } } } this.properties = Collections.unmodifiableMap(m); try { UserProvider provider = UserProvider.instantiate(conf); User currentUser = provider.getCurrent(); if (currentUser != null) { username = currentUser.getName(); } } catch (IOException ioe) { ConnectionManager.LOG.warn( "Error obtaining current user, skipping username in HConnectionKey", ioe); } }

此处这两段代码逻辑是根据传入的conf构建HConnectionKey,然后以HConnectionKey实例为key到连接池Map对象CONNECTION_INSTANCES中去查找connection,如果找到就返回connection,如果找不到就新建,如果找到但已被关闭,就删除再新建;接收conf构造HConnectionKey实例时,其实是将conf配置文件中的属性赋值给HConnectionKey自身的属性,所以使用此种方式时,要保证conf对象不变,new出来的HConnectionKey实例的属性才相同。

HBase 表操作线程接池原理实现

应用端读写hbase时,目前实现都是通过将每次读写操作封装成为一个个task,然后提交到应用端的表操作池中去执行,默认情况下,如果保证connection单例,那么应用将有256个线程、对应每个regionserver5个(由参数hbase.client.ipc.pool.size决定)socket链接,有的人可能会认为,这样会不会造成性能瓶颈,这里256线程是针对所有操作,也就是针对的是整个集群,而5个socket链接是针对单个regionserver,并且和regionserver的socket交互耗时是非常短的,所以一般情况下这个默认配置已经完全够用

相关代码实现原理如下:

public HTableInterface getTable(TableName tableName) throws IOException { return getTable(tableName, getBatchPool()); } private ExecutorService getBatchPool() { if (batchPool == null) { synchronized (this) { if (batchPool == null) { this.batchPool = getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256), conf.getInt("hbase.hconnection.threads.core", 256), "-shared-", null); this.cleanupPool = true; } } } return this.batchPool; }

这里要注意,对于meta表操作来讲 ,默认是10个核心线程,并且线程名也不一样

private ExecutorService getMetaLookupPool() { if (this.metaLookupPool == null) { synchronized (this) { if (this.metaLookupPool == null) { //Some of the threads would be used for meta replicas //To start with, threads.max.core threads can hit the meta (including replicas). //After that, requests will get queued up in the passed queue, and only after //the queue is full, a new thread will be started this.metaLookupPool = getThreadPool( conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128), conf.getInt("hbase.hconnection.meta.lookup.threads.core", 10), "-metaLookup-shared-", new LinkedBlockingQueue()); } } } return this.metaLookupPool; }

HBase 线程池引发的直接内存泄露

JDK对每个线程进行NIO交互时,都会持有一个缓存对象(属于堆外),此对象不限制大小空间,并且随着线程消亡才会释放,线程每次申请直接内存空间时,都是在已有的对象上新增加空间,意味着,此空间可能会无限增大,而hbase的表操作线程池,线程不会消亡,所以此直接内存会不断增大,直到触发了FULL GC,才有可能释放,否则应用就会报直接内存溢出错误。相关源码分析如下:

// Per-thread cache of temporary direct buffers private static ThreadLocal bufferCache = new ThreadLocal() { @Override protected BufferCache initialValue() { return new BufferCache(); } }; /** * Returns the max size allowed for a cached temp buffers, in * bytes. It defaults to Long.MAX_VALUE. It can be set with the * jdk.nio.maxCachedBufferSize property. Even though * ByteBuffer.capacity() returns an int, we're using a long here * for potential future-proofing. */ private static long getMaxCachedBufferSize() { String s = java.security.AccessController.doPrivileged( new PrivilegedAction() { @Override public String run() { return System.getProperty("jdk.nio.maxCachedBufferSize"); } }); if (s != null) { try { long m = Long.parseLong(s); if (m >= 0) { return m; } else { // if it's negative, ignore the system property } } catch (NumberFormatException e) { // if the string is not well formed, ignore the system property } } return Long.MAX_VALUE; } /** * Returns a temporary buffer of at least the given size */ public static ByteBuffer getTemporaryDirectBuffer(int size) { // If a buffer of this size is too large for the cache, there // should not be a buffer in the cache that is at least as // large. So we'll just create a new one. Also, we don't have // to remove the buffer from the cache (as this method does // below) given that we won't put the new buffer in the cache. if (isBufferTooLarge(size)) { return ByteBuffer.allocateDirect(size); } BufferCache cache = bufferCache.get(); ByteBuffer buf = cache.get(size); if (buf != null) { return buf; } else { // No suitable buffer in the cache so we need to allocate a new // one. To avoid the cache growing then we remove the first // buffer from the cache and free it. if (!cache.isEmpty()) { buf = cache.removeFirst(); free(buf); } return ByteBuffer.allocateDirect(size); } }

HBase开源社区针对此问题的解决方式是设置-Djdk.nio.maxCachedBufferSize,保证线程级别的buffercache不会持续增大,开源单号如下:https://issues.apache.org/jira/browse/HBASE-19320

值得注意的是HBase2.x以后使用netty框架实现rpc交互,所以就不存在这个问题了,另外JDK上述修复是 jdk8u102 and jdk9版本及以上才能配置上述参数解决

JDK bug介绍如下:https://dzone.com/articles/troubleshooting-problems-with-native-off-heap-memo

EI企业智能 FusionInsight HBase

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:ByteBuf--Netty的数据容器
下一篇:【收藏】MyBatis 常用语法汇总
相关文章