我们先来看看Hadoop源码中对DFS Client的注释说明:
/********************************************************
* DFSClient can connect to a Hadoop Filesystem and
* perform basic file tasks. It uses the ClientProtocol
* to communicate with a NameNode daemon, and connects
* directly to DataNodes to read/write block data.
*
* Hadoop DFS users should obtain an instance of
* DistributedFileSystem, which uses DFSClient to handle
* filesystem tasks.
********************************************************/
显然,DFSClient在DistributedFileSystem和NameNode和之间起到了桥梁的作用。hdfs用户需要获取一个DistributedFileSystem实例对象来处理文件系统的一些任务。而DistributedFileSystem实例对象又需要使用DFSClient来完成用户提出的任务需求。而DFSClient则又通过ClientProtocol来连接Namenode,以及直接连接Datanode来读取block数据。这就是它们三者的协作关系。
接下来,我们来看看它们具体的实现过程。接上一篇,ls方法里的第二个步骤,需要根据Path对象来获取FileSystem对象:
FileSystem srcFs = srcPath.getFileSystem(this.getConf());
srcPath变量是Path对象,接着我们来看看Path对象里的getFileSystem这个方法:
public FileSystem getFileSystem(Configuration conf) throws IOException {
return FileSystem.get(this.toUri(), conf);
}
马上,又获取到Path对象的uri格式,接着调用FileSystem对象的get方法。
String scheme = uri.getScheme();
String authority = uri.getAuthority();
if (scheme == null && authority == null) { // use default FS
return get(conf);
}
if (scheme != null && authority == null) { // no authority
URI defaultUri = getDefaultUri(conf);
if (scheme.equals(defaultUri.getScheme()) // if scheme matches default
&& defaultUri.getAuthority() != null) { // & default has authority
return get(defaultUri, conf); // return default
}
}
在FileSystem对象的get方法中会先解析uri的scheme和authority。scheme确定文件系统的类型,authority确定访问地址(ip)和端口(port)。如果scheme和authority都不为null,则正常获取FileSystem对象;如果scheme不为null,而authority为null,且scheme和默认的匹配,则根据默认的scheme和authority来创建FileSystem对象;若scheme和authority同为null,则也根据默认的scheme和authority来创建FileSystem对象。
在我们之前的例子里“-ls /test”,是scheme和authority同为null的情况,所以需要默认的scheme和authority来创建FileSystem对象。那么,我们来看看默认获取scheme和authority的过程。
跳转到FileSystem对象的get(Conf)方法:
public static FileSystem get(Configuration conf) throws IOException {
return get(getDefaultUri(conf), conf);
}
public static URI getDefaultUri(Configuration conf) {
return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, "file:///")));
}
其中,FS_DEFAULT_NAME_KEY=”fs.default.name“。conf.get(FS_DEFAULT_NAME_KEY, “file:///”)从配置文件里获取属性fs.default.name的值(这和我们对配置文件的配置有关,我们配置的是”hdfs://localhost“),如果获取为空,则设为”file:///”。所以我们可以预见,默认的uri的scheme是hdfs,默认的uri的authority是localhost。获取到默认的uri后,然后再返回上面的get方法重新解析uri。解析获取scheme和authority都不为空,于是继续执行。
String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
if (conf.getBoolean(disableCacheName, false)) {
return createFileSystem(uri, conf);
}
return CACHE.get(uri, conf);
继续执行过程有一步是说,看看配置中有没有配置fs.hdfs.impl.disable.cache这个参数,如果设置里这个参数为true,则不在缓存中创建文件系统。如果设置了false或者没有设置,则在缓存中创建文件系统。
因为,我们是没有设置这个参数的,所以接下来会在缓存中创建文件系统。缓存的实现具体实现我先不看,但是个值得关注的点。需要注意一点的是,Cache是FileSystem的内部静态类。
我们继续看到Cache里的get方法:
FileSystem get(URI uri, Configuration conf) throws IOException{
Key key = new Key(uri, conf);
FileSystem fs = null;
synchronized (this) {
fs = map.get(key);
}
if (fs != null) {
return fs;
}
需要指出的是,这里的map是Cache类的一个成员变量,map的key是封装了uri和conf的一个Key对象(是Cache类的一个内部静态类),value是FileSystem类型。
首先,get方法将uri和conf封装成一个key值,然后企图从map中根据key值(以多线程并发)来获取相应的FileSystem。如果获取到了,则直接返回从map中获取的FileSystem,否则创建新的FileSystem:
fs = createFileSystem(uri, conf); // 创建文件系统
private static FileSystem createFileSystem(URI uri, Configuration conf
) throws IOException {
Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
LOG.debug("Creating filesystem for " + uri);
if (clazz == null) {
throw new IOException("No FileSystem for scheme: " + uri.getScheme());
}
FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
fs.initialize(uri, conf);
return fs;
}
我们可以看到,在FileSystem类的createFileSystem方法中,先用scheme在conf中找到对应的FileSystem的类类型(例如,scheme:hdfs对应了DistributedFileSystem),然后利用Java反射机制来实例化该类类型,即创建一个具体的FileSystem对象。
接下来,还有非常重要的一步,那就是DistributedFileSystem.initialize(uri, conf)。
这里,要提两点。第二点非常重要!
一、我自己目前看到的,hadoop的源码中很多对类的构造就分两步,先是构造函数初始化一些内部变量(其实,DistributedFileSystem类的构造函数已经不做任何事了),然后再用initialize方法初始化一些变量。这肯定有它的设计目的在,暂不深究,先知道有这么个意思就好。
二、这里的initialize方法是DistributedFileSystem类的方法,不是FileSystem类的。之前提到过,FileSystem是一个抽象类,DistributedFileSystem继承了它。所以,这里fs是父抽象类对子类的一个引用。如果子类没有重写父类的方法,则调用父类方法;如果子类重写了父类的方法,则默认调用子类重写的方法。另外加一句,如果是子类新添加的方法,那么这种方法对该引用是不可见的。
而这里的initialize方法就属于子类重写了父方法的那种。我一开始没发现(java基础薄弱),然后在FileSystem类里的initialize方法里打转转。
好,言归正传,继续看DistributedFileSystem类的initialize方法做了些什么。
public void initialize(URI uri, Configuration conf) throws IOException {
super.initialize(uri, conf);
setConf(conf);
String host = uri.getHost();
if (host == null) {
throw new IOException("Incomplete HDFS URI, no host: "+ uri);
}
InetSocketAddress namenode = NameNode.getAddress(uri.getAuthority());
this.dfs = new DFSClient(namenode, conf, statistics);
this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
this.workingDir = getHomeDirectory();
}
首先,它调用了super.initialize(uri, conf)方法,其实就是FileSystem.initialize(uri, conf),也就是我掉过的坑,现在知道为什么掉入了那个坑就出不来了吧。
然后,设置了配置setConf(conf)。
接着,创建了一个InetSocketAddress对象namenode(一个由hostname、IP、port构成的对象,并实现了序列化),用来socket通信,还没深入了解。
以及,初始化uri、workingDir,以及我们今天的另一个主角DFSClient。
我们接下来就来看看DFSClient创建的过程(这是DistributedFileSystem类的initialize过程的一部分,我们不要忘了我们的目的,以及我们开篇提到过的,DistributedFileSystem需要使用DFSCclient类来完成在文件系统上的任务。)
public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf,
FileSystem.Statistics stats)
throws IOException {
this(nameNodeAddr, null, conf, stats);
}
提一句,FileSystem.Statistics对象记录了整个文件系统的一些信息,包括读/写字节数、读/写次数等等。
/**
* Create a new DFSClient connected to the given nameNodeAddr or rpcNamenode.
* Exactly one of nameNodeAddr or rpcNamenode must be null.
* 创建一个DFSClient用来连接到给定的nameNodeAddr或者rpcNameNode, 两者必有一个为null(为什么呢?)
*/
DFSClient(InetSocketAddress nameNodeAddr, ClientProtocol rpcNamenode,
Configuration conf, FileSystem.Statistics stats)
throws IOException {
this.conf = conf;
this.stats = stats;
this.nnAddress = nameNodeAddr;
this.socketTimeout = conf.getInt("dfs.socket.timeout",
HdfsConstants.READ_TIMEOUT);
this.datanodeWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",
HdfsConstants.WRITE_TIMEOUT);
this.timeoutValue = this.socketTimeout;
this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
// dfs.write.packet.size is an internal config variable
this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf);
this.hdfsTimeout = Client.getTimeout(conf);
ugi = UserGroupInformation.getCurrentUser();
this.authority = nameNodeAddr == null? "null":
nameNodeAddr.getHostName() + ":" + nameNodeAddr.getPort();
String taskId = conf.get("mapred.task.id", "NONMAPREDUCE");
this.clientName = "DFSClient_" + taskId + "_" +
r.nextInt() + "_" + Thread.currentThread().getId();
defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
defaultReplication = (short) conf.getInt("dfs.replication", 3);```
DFSClient初始化了很多变量,有配置信息conf、文件系统的统计信息stats、Namenode的socket地址、socket连接超时上限、datanode写超时上限、socket工厂、写数据包的大小、block获取重复次数、默认块大小、默认块备份个数等等。
**这里我先留一个问题,hadoop源码的DFSClient类的注释里说明:创建一个DFSClient用来连接到给定的nameNodeAddr或者rpcNameNode, 两者必有一个为null。为什么两者必有一个是null?这个我不明白。这应该和hadoop ipc机制有关。也是我接下来需要关注的重点。**
接下来,DFSClient的创建来到一个**非常重要的步骤**:
if (nameNodeAddr != null && rpcNamenode == null) {
this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
this.namenode = createNamenode(this.rpcNamenode, conf);
} else if (nameNodeAddr == null && rpcNamenode != null) {
//This case is used for testing.
this.namenode = this.rpcNamenode = rpcNamenode;
} else {
throw new IllegalArgumentException(
“Expecting exactly one of nameNodeAddr and rpcNamenode being null: “
+ “nameNodeAddr=” + nameNodeAddr + “, rpcNamenode=” + rpcNamenode);
}
在nameNodeAddr不为null,而rpcNamenode为null的情况下(例如,我在调试代码事,nameNodeAddr=localhost/127.0.0.1:8020,rpcNamenode为null),会创建RPCNamenode对象(createRPCNamenode)和Namenode对象(createNamenode),用来建立到Namenode节点的IPC连接。这里就涉及hadoop ipc机制的核心,我现在还不理解,需要等学习之后再来讲。
// read directly from the block file if configured.
this.shortCircuitLocalReads = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
if (LOG.isDebugEnabled()) {
LOG.debug(“Short circuit read is ” + shortCircuitLocalReads);
}
this.connectToDnViaHostname = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME,
DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
if (LOG.isDebugEnabled()) {
LOG.debug(“Connect to datanode via hostname is ” + connectToDnViaHostname);
}
String localInterfaces[] =
conf.getStrings(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES);
if (null == localInterfaces) {
localInterfaces = new String[0];
}
this.localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces);
if (LOG.isDebugEnabled() && 0 != localInterfaces.length) {
LOG.debug(“Using local interfaces [” +
StringUtils.join(“,”,localInterfaces)+ “] with addresses [” +
StringUtils.join(“,”,localInterfaceAddrs) + “]”);
}
接着,DFSClient又设置了一些配置参数。好吧,这些参数意思也不是很明确。shortCircuitLocalReads的意思是说是否支持本地读取;connectToDnViaHostname是说是否支持用hostname来连接DataNode。
至此,DFSClient就创建好了(其实我们还没讲它的核心T^T)。那么,DistributedFileSystem的initialize过程也Ok了。接下来要回到哪儿了呢?
就要返回到FsShell里ls第二个步骤(获取FileSystem)完了之后的那个步骤(在FileSystem上获取文件信息)。
第三个步骤,我们简略的讲,但我们需要明确一点,看下面。
获取文件信息这个过程会从FsShell类通过方法调用到FileSystem类,再到DistributedFileSystem类,再到DFSClient类里的getFileInfo方法。这个getFileInfo方法又调用了namenode.getFileInfo方法。**注意,namenode是ClientProtocol类型的,是一个接口,没有任何方法的实现**
// DFSClient类里的getFileInfo
public HdfsFileStatus getFileInfo(String src) throws IOException {
checkOpen(); // 检查路径是否为空或null
try {
System.out.println(“getFileInfo’s src ” + src);
return namenode.getFileInfo(src);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class);
}
}
那么,namenode的getFileInfo方法又怎能获取到文件信息呢?
我们想到一点,这个namenode就是之前我们创建DFSClient是初始化的namenode变量,它建立了DFSClient和Namenode节点之间的连接。那么,会不会因为通信的关系,其实是调用了Namenode里的getFileInfo方法呢?(其实这样想还有另外一个原因,那就是这个变量的名字)
然后我们看到,在Namenode类里,果然有getFileInfo方法,而且有具体的实现:
public HdfsFileStatus getFileInfo(String src) throws IOException {
System.out.println(“call this function NameNode->getFileInfo()”);
myMetrics.incrNumFileInfoOps();
return namesystem.getFileInfo(src);
}
而且,我们通过跟踪调用,确实证明了调用Namenode的getFileInfo这个方法。我们通过这个方法继续跟踪下去,就能看到获取文件信息的具体操作:
/**
- Create FileStatus by file INode
*/
private static HdfsFileStatus createFileStatus(byte[] path, INode node) {
// length is zero for directories
return new HdfsFileStatus(
node.isDirectory() ? 0 : ((INodeFile)node).computeContentSummary().getLength(),
node.isDirectory(),
node.isDirectory() ? 0 : ((INodeFile)node).getReplication(),
node.isDirectory() ? 0 : ((INodeFile)node).getPreferredBlockSize(),
node.getModificationTime(),
node.getAccessTime(),
node.getFsPermission(),
node.getUserName(),
node.getGroupName(),
path);
}
获取的文件信息最终以FileStatus对象的形式返回给FsShell里的ls方法,ls再从FileStatus对象中获取具体的文件信息,将其打印出来:
上面所说,我们需要明确的一点就是,**DFSClient会把方法的调用发给Namenode节点来执行,且执行的是Namenode自己的方法。**提一点,我们发现这里的方法接口是一样的,因为Namnode也实现了ClientProtocol接口。
###总结一下###
* DFSClient在DistributedFileSystem和NameNode和之间起到了桥梁的作用
* 创建DistributedFileSystem类用到了Java反射机制
* DistributedFileSystem.initialize方法初始化了uri、workingDir变量,以及非常重要的创建DFSClient对象
* 创建DFSClient对象除了初始化一些变量外,还建立了和Namenode节点的连接