如何从Java使用Spark的.newAPIHadoopRDD()

我试图将用
Scala编写的示例(从Apache Spark项目)移植到
Java中,然后遇到一些问题.

代码

val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(),
  classOf[CqlPagingInputFormat],
  classOf[java.util.Map[String,ByteBuffer]],
  classOf[java.util.Map[String,ByteBuffer]])

从最初的Scala示例构建并运行得很好,但是

JavaPairRDD rdd = jsc.newAPIHadoopRDD(job.getConfiguration(),
  CqlPagingInputFormat.class,
  java.util.Map<String, ByteBuffer>.class,
  java.util.Map<String, ByteBuffer>.class);

Java中不允许(无法从参数化类型中选择).

更改

java.util.Map<String, ByteBuffer>.class

Class.forName("java.util.Map<String, ByteBuffer>")

产生一个新错误:

Error:(42, 30) java: method newAPIHadoopRDD in class org.apache.spark.api.java.JavaSparkContext cannot be applied to given types;
required: org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>
found: org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat>,java.lang.Class<capture#1 of ?>,java.lang.Class<capture#2 of ?>
reason: inferred type does not conform to declared bound(s)
inferred: org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
bound(s): org.apache.hadoop.mapreduce.InputFormat<capture#1 of ?,capture#2 of ?>

将其更改为简单的java.util.Map.class会产生类似的错误:

Error:(44, 30) java: method newAPIHadoopRDD in class org.apache.spark.api.java.JavaSparkContext cannot be applied to given types;
required: org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>
found: org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat>,java.lang.Class<java.util.Map>,java.lang.Class<java.util.Map>
reason: inferred type does not conform to declared bound(s)
inferred: org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
bound(s): org.apache.hadoop.mapreduce.InputFormat<java.util.Map,java.util.Map>

那么正确的翻译是什么?值得注意的是,newAPIHadoopRDD()函数是Scala和Java的不同实现.这些方法的文档可以在here中找到Scala,在这里:http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaSparkContext.html#newAPIHadoopRDD(org.apache.hadoop.conf.Configuration,java.lang.Class,java.lang.Class,java.lang.Class)for Java.

CqlPagingInputFormat的声明如下所示

public class CqlPagingInputFormat extends org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat<java.util.Map<java.lang.String,java.nio.ByteBuffer>,java.util.Map<java.lang.String,java.nio.ByteBuffer>> {

最佳答案 最后我在经过多次战斗后得到了解决.

问题是newHadoopAPI需要一个扩展org.apache.hadoop.mapreduce.InputFormat的类和org.apache.cassandra.hadoop.cql3.CqlInputFormat不直接扩展InputFormat,而是扩展org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat扩展InputFormat.

Eclipse使用groovy编译器,它足够智能解决这个问题,但Java的默认编译器无法解决这个问题.此外,Groovy编译器可以正确解析K编译器,V编译器发现这些值不兼容.

您需要将以下更改添加到pom.xml文件以使用groovy编译器:

<properties>
    <groovy-version>1.8.6</groovy-version>
    <maven-comipler-plugin-version>2.5.1</maven-comipler-plugin-version>
    <groovy-eclipse-compiler-version>2.7.0-01</groovy-eclipse-compiler-version>
    <maven-clover2-plugin-version>3.1.7</maven-clover2-plugin-version>
    <groovy-eclipse-batch-version>1.8.6-01</groovy-eclipse-batch-version>
</properties>

>添加groovy作为依赖项

<dependencies>
    <dependency>
        <groupId>org.codehaus.groovy</groupId>
        <artifactId>groovy-all</artifactId>
        <version>${groovy-version}</version>
    </dependency>
<dependencies>

>在构建下添加grovvy插件,将其用作我们代码的编译器

<build>
    <pluginManagement>
        <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>${maven-comipler-plugin-version}</version>
            <configuration>
                <!-- Bind Groovy Eclipse Compiler -->
                <compilerId>groovy-eclipse-compiler</compilerId>
                <source>${jdk-version}</source>
                <target>${jdk-version}</target>
            </configuration>
            <dependencies>
                <!-- Define which Groovy version will be used for build (default is 
                    2.0) -->
                <dependency>
                    <groupId>org.codehaus.groovy</groupId>
                    <artifactId>groovy-eclipse-batch</artifactId>
                    <version>${groovy-eclipse-batch-version}</version>
                </dependency>
                <!-- Define dependency to Groovy Eclipse Compiler (as it's referred 
                    in compilerId) -->
                <dependency>
                    <groupId>org.codehaus.groovy</groupId>
                    <artifactId>groovy-eclipse-compiler</artifactId>
                    <version>${groovy-eclipse-compiler-version}</version>
                </dependency>
            </dependencies>
        </plugin>
        <!-- Define Groovy Eclipse Compiler again and set extensions=true. Thanks 
            to this, plugin will -->
        <!-- enhance default build life cycle with an extra phase which adds 
            additional Groovy source folders -->
        <!-- It works fine under Maven 3.x, but we've encountered problems with 
            Maven 2.x -->
        <plugin>
            <groupId>org.codehaus.groovy</groupId>
            <artifactId>groovy-eclipse-compiler</artifactId>
            <version>${groovy-eclipse-compiler-version}</version>
            <extensions>true</extensions>
        </plugin>
        <!-- Configure Clover for Maven plug-in. Please note that it's not bound 
            to any execution phase, --> 
        <!-- so you'll have to call Clover goals from command line. -->
        <plugin>
            <groupId>com.atlassian.maven.plugins</groupId>
            <artifactId>maven-clover2-plugin</artifactId>
            <version>${maven-clover2-plugin-version}</version>
            <configuration>
                <generateHtml>true</generateHtml>
                <historyDir>.cloverhistory</historyDir>
            </configuration>
        </plugin>
        </plugins>
    </pluginManagement>
</build>

这应该解决它.

点赞