Hadoop开发

HDFS

HDFS提供一套Java API来操作HDFS,包括文件的建立、修改、删除、权限管理等,下面对几个常用的API进行介绍,详细的API接口请参见API文档,可以在${HADOOP_HOME}/share/doc/api/index.html中找到。

IOUtils类

Hadoop IO的基础类,提供一组静态方法来控制HadoopIO。通过IOUtils类,可以使用java.net.URL类来访问HDFS,同时也可以在标准输入流和输出流之间复制数据。需要注意的是,为了是java.net.URL能够识别HDFS的URL方案(hdfs://namenode:port/)需要在使用前设置URL的流处理工厂类为org.apache.hadoop.fs. FsUrlStreamHandlerFactory,详见2.3代码示例。

FileSystem类

FileSystem类是一个抽象文件系统API,在使用该API前需要先确定目标文件系统对应的实现。通过FileSystem的HDFS实现类可以实现对HDFS的一般文件操作,包括读、写、删、追加等。下面对FileSystem类中比较重要的几个方法进行介绍。
get方法
FileSystem类是一个抽象类,实际使用时需要通过静态get方法来获得目标文件系统对应的FileSystem实现。FileSystem类提供了多种get方法,如下所示:
static FileSystem get(Configuration conf) 
static FileSystem get(URI uri, Configuration conf) 
static FileSystem get(URI uri, Configuration conf, String user)
第一个方法获取配置文件中设置的文件系统(通过fs.defaultFS设置)的FileSystem实例,默认为本地文件系统;第二个方法根据URI方案获取相应的FileSystem实例;第三个方法则根据URI方案和配置文件获取相应的FileSystem实例,该实例具有给定用户的权限。
open方法
FileSystem实例通过open方法获取HDFS文件的输入流,该方法返回FSDataInputStream对象,该对象支持随机访问。FileSystem类提供了两种open方法,如下所示:
FSDataInputStream open(Path f) 
abstract FSDataInputStream open(Path f, int bufferSize)
第一个方法使用默认的缓冲区大小(4K),这个值有些偏少,可以通过第二个方法指定合理的缓冲区大小。
可以通过FSDataInputStream来读取文件,FSDataInputStream继承自java.io.DataInputStream,使用FSDataInputStream读取HDFS文件和Java读取本地文件流的方式没有区别。FSDataInputStream还提供seek方法实现文件的随机读写,seek方法是一个开销较大的操作,在批处理应用中应尽可能避免使用。
create方法
FileSystem使用create方法创建文件,该方法返回FSDataOutputStream对象,通过该对象可实现对HDFS文件的写入操作。FileSystem提供多种create方法,如下所示:
static FSDataOutputStream create(FileSystem fs, Path file, FsPermission permission)
FSDataOutputStream create(Path f) 
FSDataOutputStream create(Path f, boolean overwrite) 
FSDataOutputStream create(Path f, boolean overwrite, int bufferSize) 
FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, Progressable progress)
FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, 
short replication, long blockSize) 
FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication,
long blockSize, Progressable progress) 
abstract FSDataOutputStream create(Path f, FsPermission permission, 
boolean overwrite, int bufferSize, 
short replication, long blockSize,
Progressable progress) 
FSDataOutputStream create(Path f, Progressable progress) 
FSDataOutputStream create(Path f, short replication) 
FSDataOutputStream create(Path f, short replication, Progressable progress)
如果指定的路径不存在,create方法会自动为指定文件创建父目录。
FSDataOutputStream对象继承自java.io.DataOutputStream,可以像写本地文件流一样向HDFS文件写入数据。使用方法参见2.3.2 FileSystem类示例。 
mkdirs方法
FileSystem的mkdirs方法可以一次性建立全部不存在的目录。FileSystem提供多种mkdirs方法,如下所示:
static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission) 
abstract boolean mkdirs(Path f, FsPermission permission) 
boolean mkdirs(Path f)
exists方法
FileSystem使用exists方法判断文件或目录是否存在,该方法声明如下:
boolean exists(Path f)
listStatus方法
FileSystem的listStatus方法返回指定目录或文件的元数据列表,文件或目录的元数据由FileStatus类封装,包含了文件长度、块大小、备份数、修改时间、所有者及其权限等信息。FileSystem提供多种listStatus方法,如下所示:
abstract FileStatus[] listStatus(Path f) 
FileStatus[] listStatus(Path[] files) 
FileStatus[] listStatus(Path[] files, PathFilter filter) 
FileStatus[] listStatus(Path f, PathFilter filter)

IOUtils类示例

下面示例给出了IOUtils类的一个示例,该例通过IOUtils的copyBytes方法将读取到的HDFS文件通过标准输出打印。

public class IOUtilsDemo 

static 

// JVM只能调用一次setURLStreamHandlerFactory方法,所以在只能在静态方法中使用了, 
// 若工程中其他类库之前以调用了该方法,那将无法再使用该方法从 hadoop中得到数据 
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); 

public static void main(String[] args) throws Exception 

InputStream in = null
try
in = new URL(arg[0]).openStream(); 
IOUtils.copyBytes(in, System.out, 4091, false); 
}finally
IOUtils.closeStream(in); 


}

FileSystem类示例

下面示例给出了FileSystem的例子,该例子显示了如何使用FileSystem类来与HDFS交互。

 

public class HDFSFileOperator { 

long count = 0; 

public byte[] getBytesFromURI(Configuration conf, String uri) throws IOException { 

byte[] data = null
InputStream in = null
ByteArrayOutputStream out = new ByteArrayOutputStream(); 

FileSystem fs = FileSystem.get(URI.create(uri), conf); 

try

in = fs.open(new Path(uri)); 
IOUtils.copyBytes(in, out, 4096, false); 
data = out.toByteArray(); 
}finally { 

out.close(); 
IOUtils.closeStream(in); 

return data; 

public void printFileOnHDFS(Configuration conf, String uri) throws IOException { 

FileSystem fs = FileSystem.get(URI.create(uri), conf); 

if(fs.exists(new Path(uri)) == falsereturn

InputStream in = null

try

in = fs.open(new Path(uri)); 
IOUtils.copyBytes(in, System.out, 4096, false); 

}finally { 

IOUtils.closeStream(in); 

public String seekFileOnHDFS(Configuration conf, String uri, long startPos) throws IOException { 

FSDataInputStream in = null
ByteArrayOutputStream out = new ByteArrayOutputStream(); 

FileSystem fs = FileSystem.get(URI.create(uri), conf); 
fs.setVerifyChecksum(false); 

if(fs.exists(new Path(uri)) == falsereturn “”; 

try
in = fs.open(new Path(uri)); 
in.seek(startPos); 

IOUtils.copyBytes(in, out, 4096, false); 

return out.toString(); 

}finally { 

IOUtils.closeStream(in); 

public void writeToHDFS(Configuration conf, String uri, String text) throws IOException { 

FileSystem fs = FileSystem.get(conf); 
OutputStream out; 
if(fs.exists(new Path(uri))){ 

out = fs.append(new Path(uri)); 
}else { 

out = fs.create(new Path(uri), false, 4096); 

out.write(text.getBytes()); 
out.close(); 

public void copyToHDFS(Configuration conf, String localPath, String HDFSPath, boolean showProgress) throws IOException{ 

FileSystem fs = FileSystem.get(URI.create(HDFSPath), conf); 
InputStream in = new BufferedInputStream(new FileInputStream(localPath)); 
OutputStream out = null

if(showProgress) { 
out = fs.create(new Path(HDFSPath) 
true 
, 4096 
new Progressable() { 
public void progress() { 
System.out.print(“.”); 

} ); 
}else 

out = fs.create(new Path(HDFSPath) 
true 
, 4096); 

IOUtils.copyBytes(in, out, 4096, true); 

public void deleteFileOnHDFS(Configuration conf, String uri) throws IOException { 

FileSystem fs = FileSystem.get(URI.create(uri), conf); 
if(fs.exists(new Path(uri)) == falsereturn
fs.delete(new Path(uri), false); 

}

    

MapReduce

MapReduce是一个分布式计算框架,计算模型如图4-1所示。在该计算模型中,一个作业由两类任务组成:Map和Reduce。Map任务负责将原始数据转换成Key/Value形式的键值对集合,Reduce任务将Map转换的数据进行处理,抽取需要的数据输出。
《Hadoop开发》
图4-1 MapReduce计算模型
在设计MapReduce任务时,首先,要对业务需求进行抽象,将业务分解成一个或多个MapReduce作业。如果业务流程可以分解成一个线性的作业链的应用场合,可以使用MapReduce客户端来控制作业集的有序执行。对于复杂的业务逻辑,可能涉及到作业有向无环图(DAG)的转换,这种转换通常是十分费力的。Hive、Pig和Cascading可以使用更高级的语言来实现这种转换,开发复杂的MapReduce应用,可以考虑是使用这些工具进行。Apache Oozie是一个以服务器形式运行的工作流服务,可以控制多个MapReduce作业按设计流程运行。在Oozie中,工作流是一个Action节点和控制流节点组成的DAG。CDH v1.0不包含Hive、Pig、Cascading和Oozie组件,但是兼容Hive0.9.0、Pig0.10.0、Oozie3.2.0及以上版本。
注意:

    • 虽然MRv2使用全新的YARN框架,可支持多种分布式计算框架,但是MRv2在对MapReduce框架的支持上兼容MRv1的接口,因此,MRv1的应用程序代码无需修改就可以在MRv2上运行,只需使用MRv2包进行重新编译。
    • 关于MapReduce的详细介绍,请参阅:http://hadoop.apache.org/docs/stable/mapred_tutorial.html

如4.2节所述,一个完整的MapReduce作业首先需要将原始数据切片,然后将切片交给map任务处理,之后将map任务处理的结果传给Reduce处理,最后,将Reduce处理结果输出。这个过程中一共分四部分:切片、map、reduce和输出,与之对应的类分别为InputFormat、OutputFormat、Mapper、Reducer。一个MapReduce作业由Job类进行抽象,客户端配置Job类后调用该类的submit方法将作业提交给MapReduce集群执行。

4.3.1 InputFormat类

InputFormat描述了一个MapReduce任务的输入切片信息,MapReduce作业通过该类实现如下功能:

  1. 检查作业的输入设置是否有效
  2. 将作业切片,每个切片用InputSplit类封装,提交给Mapper类处理;
  3. 通过实现RecordReader类来读取切片中的Key/Value对,该Key/Value对是Mapper类处理的最小单位

MapReduce框架预置了很多有用的InputFormat实现,如表4-1所示:
表4-1 InputFormat类族(主要)

说明

AutoInputFormat

一个可以自动检测文件类型的FileInputFormat类,该类目前支持文本文件和HDFS序列文件。

CombineFileInputFormat

一个用于处理大量小文件的FileInputFormat类,CombineFileInputFormat将多个文件打包到一个输入切片中,每个Mapper类可能得到来自多个文件的数据。

CompositeInputFormat

一个可以实现多数据源Join的InputFormat.

DBInputFormat

一个面向关系型数据库的InputFormat实现,该类使用JDBC从关系型数据库中读取数据。需要注意的是,MapReduce的高并发性可能给源数据库造成巨大压力,甚至导致数据库崩溃,因此,最好在少量数据集中使用。

FileInputFormat

FileInputFormat是所有文件类型输入数据的InputFormat基类。

KeyValueTextInputFormat

读取文本文件的InputFormat,如果行被分隔符分隔为两部分,则第一部分作为Key,第二部分作为Value,如果没有分隔符,则整行作为Key,Value为空。

NLineInputFormat

该InputFormat将将文件数据按指定的行数进行切片。

SequenceFileAsBinaryInputFormat

读取HDFS序列文件的FileInputFormat,该类将序列文件中的键值对都按BytesWritable类型进行解析。

SequenceFileAsTextInputFormat

读取HDFS序列文件的FileInputFormat,该类将序列文件中的键值对都按Text类型进行解析。

SequenceFileInputFormat

读取HDFS序列文件的FileInputFormat

TeraInputFormat

该类将每行的前10个字符作为Key,其余作为Value。该类属于FileInputFormat,按文件进行切片。

TextInputFormat

该类是MapReduce默认的InputFormat,该类将每个文件作为一个切片输入给Mapper,每个切片按行划分,Key为该行第一个字符在文件中的偏移量。

4.3.2 OutputFormat类

OutputFormat类描述了一个MapReduce作业如何输出结果数据。MapReduce通过该类实现下述功能:

  1. 验证输出设置的有效性,如设置的输出目录是否存在;
  2. 实现RecordWriter类,通过RecordWriter来指定数据输出格式。

MapReduce框架同样预置了许多有用的OutputFormat实现,如表4-2所示:

说明

DBOutputFormat

将输出数据写入关系型数据的OutputFormat

FileOutputFormat

将输出数据写入文件的OutputFormat基类

MapFileOutputFormat

将输出结果写入MapFile格式的文件中的FileOutputFormat实现

MultipleOutputFormat

一个将输出结果写入多个文件的FileOutputFormat实现

MultipleSequenceFileOutputFormat

一个将输出结果写入多个文件HDFS序列文件的FileOutputFormat实现

MultipleTextOutputFormat

一个将输出结果写入多个文件文本文件的FileOutputFormat实现

NullOutputFormat

一个特殊的OutputFormat,不输出任何东西

SequenceFileAsBinaryOutputFormat

将输出结果写入HDFS序列文件格式的文件中的FileOutputFormat实现,key和value都设置为BytesWritable格式

SequenceFileOutputFormat

将输出结果写入HDFS序列文件格式的文件中的FileOutputFormat实现

TeraOutputFormat

将输出数据输出到文本文件,每行以”\r\n”结尾

TextOutputFormat

该类是MapReduce默认的OutputFormat,按文本类型解析所有的Key和Value。

4.3.3 Mapper类

Mapper类将输入的一组Key/Value键值对数据映射为中间Key/Value键值对,输出的键值对与输入的键值对数据类型可以不一样,输出键值对应与Reducer输入一致。Mapper的输出会被存储在本地磁盘中,按Reducer进行分组,哪些数据被分给那个Reducer由Partitioner类控制。Mapper类中的map方法实现如下:

/** 
* Called once for each key/value pair in the input split. Most applications 
* should override this, but the default is the identity function. 
*/ 
@SuppressWarnings(“unchecked”) 
protected void map(KEYIN key, VALUEIN value, 
Context context) throws IOException, InterruptedException { 
context.write((KEYOUT) key, (VALUEOUT) value); 
}

KEYIN、VALUEIN为输入切片的Key/Value类型,Context为MapReduce作业上下文,可以通过该类获得作业的配置信息,设置计数器、输出中间结果等操作。一般情况下,开发者只需要继承Mapper类,在该类的map方法中实现自己的业务逻辑即可。如果需要定制map任务流程,如初始化一些环境等,可以重载Mapper类的setup、cleanup、run方法来创建一个完全自定义的map任务。

4.3.4 Reducer类

Reducer类将map输出的属于它的中间数据拉取到Reducer节点进行处理,处理结果根据作业配置的OutputFormat进行输出。Reducer的个数由可由Job类的setNumReduceTasks方法设定。Reducer类主要完成三个过程:Shuffle、Sort和Reduce。Shuffle过程中,Reducer将存储在map端属于该Reducer的中间数据通过http协议拉取到本地节点。Sort过程按Key对拉取的数据进行归并排序,相同key的数据按value排序。Reduce阶段会调用reduce方法处理拉取的数据,并通过Context将数据输出到指定目标中。通常开发者只需重载Reducer类的reduce方法来实现自己的业务逻辑。Reducer类的reduce方法实现如下:

/** 
* This method is called once for each key. Most applications will define 
* their reduce class by overriding this method. The default implementation 
* is an identity function. 
*/ 
@SuppressWarnings(“unchecked”) 
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context 
throws IOException, InterruptedException { 
for(VALUEIN value: values) { 
context.write((KEYOUT) key, (VALUEOUT) value); 

}

类似的,通过重载Reducer的setup、cleanup和run方法,可以创建一个完全自定义的reduce任务。

4.3.5 Job类

Job类是从客户端角度对MapReduce作业进行抽象的类,该类包括了作业定义接口和控制接口,通过该类可以定义、提交和监控MapReduce作业。Job类提供的主要方法如下所示:
void killJob()
void setCombinerClass(Class<? extends Reducer> cls) 
void setInputFormatClass(Class<? extends InputFormat> cls)
void setMapOutputKeyClass(Class<?> theClass) 
void setMapOutputValueClass(Class<?> theClass) 
void setMapperClass(Class<? extends Mapper> cls)
void setNumReduceTasks(int tasks) 
void setOutputFormatClass(Class<? extends OutputFormat> cls) 
void setOutputKeyClass(Class<?> theClass) 
void setOutputValueClass(Class<?> theClass) 
void setPartitionerClass(Class<? extends Partitioner> cls) 
void setReducerClass(Class<? extends Reducer> cls) 
void submit() 
boolean waitForCompletion(boolean verbose)
通常,一个完整的MapReduce作业需要设置如下几个参数:

  • InputFormat类及其参数
  • OutputFormat类及其参数
  • Mapper类
  • Reducer类

复杂一些的MapReduce作业可能需要设定Partioner类和Combiner类。设定Combiner类可以是MapReduce框架在Mapper节点处理存储在本地的中间数据,从而减少Reducer的Shuffle过程需要拉取的数据量,一般Combiner与Reducer设置为同一个类。

 

    原文作者:MapReduce
    原文地址: https://www.cnblogs.com/warmingsun/p/6694146.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞