一、MRUnit简介
官网地址:https://mrunit.apache.org/
Apache MRUnit ™ is a Java library that helps developers unit test Apache Hadoop map reduce jobs.
MRUnit是一个帮助开发者测试map reduce 作业的单元测试库。
二、代码示例
以maven项目为例,演示如何使用MRUnit进行MR单元测试。
关于示例的讲解,请参考:https://cwiki.apache.org/confluence/display/MRUNIT/MRUnit+Tutorial
项目pom.xml文件,重点关注mrunit,mockito-all, junit三个类库的引入,MRUnit是利用mockito+junit针对MR程序进行模拟测试。
MR单元测试类
package mrunit;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import mrunit.SMSCDRMapper.CDRCounter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Before;
import org.junit.Test;
/** * 测试数据说明 CDRID;CDRType;Phone1;Phone2;SMS Status Code * 655209;1;796764372490213;804422938115889;6 * 353415;0;356857119806206;287572231184798;4 * 835699;1;252280313968413;889717902341635;0 * */
public class SMSCDRMapperReducerTest {
Configuration conf = new Configuration();
MapDriver<LongWritable, Text, Text, IntWritable> mapDriver;
ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver;
MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, IntWritable> mapReduceDriver;
@Before
public void setUp() {
//测试mapreduce
SMSCDRMapper mapper = new SMSCDRMapper();
SMSCDRReducer reducer = new SMSCDRReducer();
mapDriver = MapDriver.newMapDriver(mapper);
reduceDriver = ReduceDriver.newReduceDriver(reducer);
mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
//测试配置参数
mapDriver.setConfiguration(conf);
conf.set("myParameter1", "20");
conf.set("myParameter2", "23");
}
@Test
public void testMapper() throws IOException {
mapDriver.withInput(new LongWritable(), new Text(
"655209;1;796764372490213;804422938115889;6"));
mapDriver.withOutput(new Text("6"), new IntWritable(1));
mapDriver.runTest();
}
@Test
public void testReducer() throws IOException {
List<IntWritable> values = new ArrayList<IntWritable>();
values.add(new IntWritable(1));
values.add(new IntWritable(1));
reduceDriver.withInput(new Text("6"), values);
reduceDriver.withOutput(new Text("6"), new IntWritable(2));
reduceDriver.runTest();
}
@Test
public void testMapperReducer() throws IOException {
mapReduceDriver.withInput(new LongWritable(), new Text(
"655209;1;796764372490213;804422938115889;6"));
mapReduceDriver.withOutput(new Text("6"), new IntWritable(1));
}
@Test
public void testMapperCount() throws IOException {
mapDriver.withInput(new LongWritable(), new Text(
"655209;0;796764372490213;804422938115889;6"));
// mapDriver.withOutput(new Text("6"), new IntWritable(1));
mapDriver.runTest();
assertEquals("Expected 1 counter increment", 1, mapDriver.getCounters()
.findCounter(CDRCounter.NonSMSCDR).getValue());
}
}
Mapper类
package mrunit;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SMSCDRMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text status = new Text();
private final static IntWritable addOne = new IntWritable(1);
static enum CDRCounter {
NonSMSCDR;
};
/** * Returns the SMS status code and its count */
protected void map(LongWritable key, Text value, Context context)
throws java.io.IOException, InterruptedException {
//655209;1;796764372490213;804422938115889;6 is the Sample record format
String[] line = value.toString().split(";");
// If record is of SMS CDR
if (Integer.parseInt(line[1]) == 1) {
status.set(line[4]);
context.write(status, addOne);
}else{
// CDR record is not of type SMS so increment the counter
context.getCounter(CDRCounter.NonSMSCDR).increment(1);
}
}
}
Reducer类
package mrunit;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SMSCDRReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws java.io.IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
context.write(key, new IntWritable(sum));
}
}
项目的pom.xml文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cdh</groupId>
<artifactId>cdh-test</artifactId>
<version>SNAPSHOT-1.0.0</version>
<packaging>jar</packaging>
<name>cdh-test</name>
<url>http://maven.apache.org</url>
<properties>
<hadoop.version>2.0.0-mr1-cdh4.4.0</hadoop.version>
<hbase.version>0.94.6-cdh4.4.0</hbase.version>
<project.build.sourceEncoding>utf-8</project.build.sourceEncoding>
<maven.compiler.encoding>utf-8</maven.compiler.encoding>
</properties>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<encoding>utf-8</encoding>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.9</version>
<configuration>
<buildOutputDirectory>eclipse-classes</buildOutputDirectory>
<downloadSources>true</downloadSources>
<downloadJavadocs>false</downloadJavadocs>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.6</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>mockito-all</artifactId>
<groupId>org.mockito</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.0.0-cdh4.4.0</version>
<exclusions>
<exclusion>
<artifactId>
jersey-test-framework-grizzly2
</artifactId>
<groupId>
com.sun.jersey.jersey-test-framework
</groupId>
</exclusion>
<exclusion>
<artifactId>netty</artifactId>
<groupId>org.jboss.netty</groupId>
</exclusion>
</exclusions>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>${hbase.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.hadoop.gplcompression</groupId>
<artifactId>hadoop-lzo-cdh4</artifactId>
<version>0.4.15-gplextras</version>
</dependency>
<dependency>
<groupId>org.hsqldb</groupId>
<artifactId>hsqldb</artifactId>
<version>2.2.9</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.5.1</version>
</dependency>
<!-- junit test -->
<dependency>
<groupId>org.apache.mrunit</groupId>
<artifactId>mrunit</artifactId>
<version>1.1.0</version>
<classifier>hadoop2</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
</project>