文档出现乱码怎么回事?(文档为什么突然乱码了)
704
2022-05-30
终于开始了这个很感兴趣但是一直觉得困难重重的源码解析工作,也算是一个好的开端。既然开始了,就认真耐心的看下去吧。废话不多说,开始!
HDFS源码解析之客户端写数据(一)
hdfs源码解析之客户端写数据(二)
在我们客户端写数据的代码大致如下:
Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://172.16.40.119:8020"); String a = "This is my first hdfs file!"; //① 得到DistributedFileSystem FileSystem filesytem = FileSystem.get(conf); //② 得到输出流FSDataOutputStream FSDataOutputStream fs = filesytem.create(new Path("/a.txt"),true); //③ 开始写数据 fs.write(a.getBytes()); fs.flush();
最重要的三步已经在上面标注,通过源码分析每一步所发生的细节是什么?
FileSystem filesytem = FileSystem.get(conf);
其中conf是一个Configuration对象。执行这行代码后就进入到FileSystem.get(Configuration conf)方法中,可以看到,在这个方法中先通过getDefaultUri()方法获取文件系统对应的的URI,该URI保存了与文件系统对应的协议和授权信息,如:hdfs://localhost:9000。这个URI又是如何得到的呢?是在CLASSPATH中的配置文件中取得的,看getDefaultUri()方法中有conf.get(FS_DEFAULT_NAME_KEY, "file:///") 这么一个实参,在笔者项目的CLASSPATH中的core-site.xml文件中有这么一个配置:
而常量FS_DEFAULT_NAME_KEY对应的值是fs.default.name,所以conf.get(FS_DEFAULT_NAME_KEY, "file:///")得到的值是hdfs://localhost:9000。
URI创建完成之后就进入到FileSystem.get(URI uri, Configuration conf)方法。在这个方法中,先执行一些检查,检查URI的协议和授权信息是否为空,然后再直接或简介调用该方法,最重要的是执行
String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme); if (conf.getBoolean(disableCacheName, false)) {//是否使用被Cache的文件系统 return createFileSystem(uri, conf); } return CACHE.get(uri, conf);
常量CACHE用于缓存已经打开的、可共享的文件系统,它是FileSystem类的静态内部类FileSystem.Cache的对象,在其内部使用一个Map存储文件系统
private final Map
这个键值对映射的键是FileSystem.Cache.Key类型,它有三个成员变量:
/**URI模式**/
final String scheme;
/**URI的授权部分**/
final String authority;
/**保存了打开具体文件系统的本地用户信息,不同本地用户打开的具体文件系统也是不能共享的**/
final UserGroupInformation ugi;
由于FileSystem.Cache表示可共享的文件系统,所以这个Key就用于区别不同的文件系统对象,如一个一个文件系统对象可共享,那么FileSystem.Cache.Key的三个成员变量相等,在这个类中重写了hashCode()方法和equals()方法,就是用于判断这三个变量是否相等。根据《Hadoop技术内幕:深入解析Hadoop Common和HDFS架构设计与实现原理》这本书的介绍,在Hadoop1.0版本中FileSystem.Cache.Key类还有一个unique字段,这个字段表示,如果其他3个字段相等的情况,下如果用户不想共享这个文件系统,就设置这个值(默认为0),但是不知道现在为什么去除了,还没搞清楚,有哪位同学知道的话麻烦告知,谢谢。
回到FileSystem.get(final URI uri, final Configuration conf)方法的最后一行语句return CACHE.get(uri, conf),调用了FileSystem.Cahce.get()方法获取具体的文件系统对象,该方法代码如下:
FileSystem get(URI uri, Configuration conf) throws IOException{ Key key = new Key(uri, conf); FileSystem fs = null; synchronized (this) { fs = map.get(key); } if (fs != null) { return fs; } fs = createFileSystem(uri, conf); synchronized (this) { // refetch the lock again FileSystem oldfs = map.get(key); if (oldfs != null) { // a file system is created while lock is releasing fs.close(); // close the new file system return oldfs; // return the old file system } // now insert the new file system into the map if (map.isEmpty() && !clientFinalizer.isAlive()) { Runtime.getRuntime().addShutdownHook(clientFinalizer); } fs.key = key; map.put(key, fs); return fs; } }
在这个方法中先查看已经map中是否已经缓存了要获取的文件系统对象,如果已经有了,直接从集合中去除,如果没有才进行创建,由于FileSystem.CACHE为static类型,所以在同一时刻可能有多个线程在访问,所以需要在Cache类的方法中使用同步的操作来取值和设置值。这个方法比较简单,最核心的就是
fs = createFileSystem(uri, conf);
这行语句,它执行了具体的文件系统对象的创建的功能。createFileSystem()方法是FileSystem的一个私有方法,其代码如下:
private static FileSystem createFileSystem(URI uri, Configuration conf ) throws IOException { Class> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null); LOG.debug("Creating filesystem for " + uri); if (clazz == null) { throw new IOException("No FileSystem for scheme: " + uri.getScheme()); } FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf); fs.initialize(uri, conf); return fs; }
其实现就是先从配置文件取得URI对应的类,如在core-default.xml文件中属性(键)fs.hdfs.impl对应的值是org.apache.hadoop.hdfs.DistributedFileSystem,相应的XML代码如下:
所以若uri对应fs.hdfs.impl,那么createFileSystem中的clazz就是org.apache.hadoop.hdfs.DistributedFileSystem的Class对象。然后再利用反射,创建org.apache.hadoop.hdfs.DistributedFileSystem的对象fs。然后执行fs.initialize(uri, conf);初始化fs对象。DistributedFileSystem是Hadoop分布式文件系统的实现类,实现了Hadoop文件系统的界面,提供了处理HDFS文件和目录的相关事务。
这行代码
FSDataOutputStream fs = filesytem.create(new Path("/a.txt"),true);
主要做了两件事:
①通过rpc调用在namenode命名空间创建文件条目;
②创建该文件对应的输出流。
filesytem.create()最终调用的是DistributedFileSystem的create方法
@Override //返回HdfsDataOutputStream对象,继承FSDataOutputStream public FSDataOutputStream create(final Path f, final FsPermission permission, final EnumSet
在上面代码中首先构建DFSOutputStream,然后传给dfs.createWrappedOutputStream构建HdfsDataOutputStream,看下dfs.create(getPathName(p), permission,cflags, replication, blockSize, progress, bufferSize, checksumOpt)是如何构建输出流DFSOutputStream的。
public DFSOutputStream create(String src, FsPermission permission, EnumSet
再进到DFSOutputStream.newStreamForCreate方法中
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, FsPermission masked, EnumSet
在newStreamForCreate方法中,先定义一个文件状态变量stat,然后不停的尝试通过namenode创建文件条目,创建成功后再创建改文件的输出流,然后通过out.start()启动DataQueue线程开始发送数据。我们重点看一下namenode是怎么创建文件条目的。打开dfsClient.namenode.create方法,dfsClient.namenode是在dfsClient中声明的ClientProtocol对象。ClientProtocol是客户端协议接口,namenode端需要实现该接口的create方法,通过动态代理的方式把结果返回给客户端,即是rpc远程调用。那么看下namenode端是怎么实现这个create方法的,打开这个方法的实现类我们发现了NameNodeRpcServer这个类,这个类是实现namenode rpc机制的核心类,继承了各种协议接口并实现。
打开NameNodeRpcServer的create方法:
@Override // ClientProtocol public HdfsFileStatus create(String src, FsPermission masked, String clientName, EnumSetWritable
打开namesystem.startFile,namesystem是NameNodeRpcServer中声明的FSNamesystem对象:
/** * Create a new file entry in the namespace. * 在命名空间创建一个文件条目 * * For description of parameters and exceptions thrown see * {@link ClientProtocol#create}, except it returns valid file status upon * success */ HdfsFileStatus startFile(String src, PermissionStatus permissions, String holder, String clientMachine, EnumSet * For description of parameters and exceptions thrown see * {@link ClientProtocol#create} */ private BlocksMapUpdateInfo startFileInternal(FSPermissionChecker pc, String src, PermissionStatus permissions, String holder, String clientMachine, boolean create, boolean overwrite, boolean createParent, short replication, long blockSize, boolean isLazyPersist, CipherSuite suite, CryptoProtocolVersion version, EncryptedKeyVersion edek, boolean logRetryEntry) throws FileAlreadyExistsException, AccessControlException, UnresolvedLinkException, FileNotFoundException, ParentNotDirectoryException, RetryStartFileException, IOException { //检查当前线程是否有写锁,没有退出 assert hasWriteLock(); // Verify that the destination does not exist as a directory already. //判断文件是否已经作为目录存在 //INodesInPath:包含从给定路径解析的INode信息。 //获取给定文件或目录的inode信息 final INodesInPath iip = dir.getINodesInPath4Write(src); final INode inode = iip.getLastINode(); if (inode != null && inode.isDirectory()) { throw new FileAlreadyExistsException(src + " already exists as a directory"); } //FileEncryptionInfo封装加密文件的所有加密相关信息 FileEncryptionInfo feInfo = null; if (dir.isInAnEZ(iip)) { // The path is now within an EZ, but we're missing encryption parameters if (suite == null || edek == null) { throw new RetryStartFileException(); } // Path is within an EZ and we have provided encryption parameters. // Make sure that the generated EDEK matches the settings of the EZ. String ezKeyName = dir.getKeyName(iip); if (!ezKeyName.equals(edek.getEncryptionKeyName())) { throw new RetryStartFileException(); } feInfo = new FileEncryptionInfo(suite, version, edek.getEncryptedKeyVersion().getMaterial(), edek.getEncryptedKeyIv(), ezKeyName, edek.getEncryptionKeyVersionName()); Preconditions.checkNotNull(feInfo); } final INodeFile myFile = INodeFile.valueOf(inode, src, true); if (isPermissionEnabled) { if (overwrite && myFile != null) { checkPathAccess(pc, src, FsAction.WRITE); } /* * To overwrite existing file, need to check 'w' permission * of parent (equals to ancestor in this case) */ checkAncestorAccess(pc, src, FsAction.WRITE); } if (!createParent) { verifyParentDir(src); } try { BlocksMapUpdateInfo toRemoveBlocks = null; if (myFile == null) { if (!create) { throw new FileNotFoundException("Can't overwrite non-existent " + src + " for client " + clientMachine); } } else { if (overwrite) { toRemoveBlocks = new BlocksMapUpdateInfo(); List 这个方法就是生成文件条目的核心方法,首先判断检查当前线程是否有写锁,没有退出。FSDirectory dir是一个命名空间的内存树。 hdfs源码解析之客户端写数据(二) EI企业智能 可信智能计算服务 TICS 数据湖治理中心 DGC 智能数据
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
* * Once the file is create the client then allocates a new block with the next * call using {@link ClientProtocol#addBlock}. *