我正在尝试使用Spark JdbcRDD从SQL Server数据库加载数据.我正在使用Microsoft JDBC驱动程序的4.0版.这是一段代码:
public JdbcRDD<Object[]> load(){
SparkConf conf = new SparkConf().setMaster("local").setAppName("myapp");
JavaSparkContext context = new JavaSparkContext(conf);
DbConnection connection = new DbConnection("com.microsoft.sqlserver.jdbc.SQLServerDriver","my-connection-string","test","test");
JdbcRDD<Object[]> jdbcRDD = new JdbcRDD<Object[]>(context.sc(),connection,"select * from <table>",1,1000,1,new JobMapper(),ClassManifestFactory$.MODULE$.fromClass(Object[].class));
return jdbcRDD;
}
public static void main(String[] args) {
JdbcRDD<Object[]> jdbcRDD = load();
JavaRDD<Object[]> javaRDD = JavaRDD.fromRDD(jdbcRDD, ClassManifestFactory$.MODULE$.fromClass(Object[].class));
List<String> ids = javaRDD.map(new Function<Object[],String>(){
public String call(final Object[] record){
return (String)record[0];
}
}).collect();
System.out.println(ids);
}
我得到以下异常:
java.lang.AbstractMethodError: com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.isClosed()Z
at org.apache.spark.rdd.JdbcRDD$$anon$1.close(JdbcRDD.scala:109)
at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
at org.apache.spark.rdd.JdbcRDD$$anon$1$$anonfun$1.apply(JdbcRDD.scala:74)
at org.apache.spark.rdd.JdbcRDD$$anon$1$$anonfun$1.apply(JdbcRDD.scala:74)
at org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:49)
at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68)
at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:58)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:695)
这是JobMapper的定义:
public class JobMapper extends AbstractFunction1<ResultSet, Object[]> implements Serializable {
private static final Logger logger = Logger.getLogger(JobMapper.class);
public Object[] apply(ResultSet row){
return JdbcRDD.resultSetToObjectArray(row);
}
}
最佳答案 我发现了我正在做的事情.有几件事:
>它似乎不适用于驱动程序的4.0版.所以我将其更改为使用3.0版
> JdbcRDD的文档指出SQL字符串必须包含两个指示查询范围的参数.所以我不得不改变查询.
JdbcRDD<对象[]> jdbcRDD = new JdbcRDD< Object []>(context.sc(),connection,“SELECT * FROM< table> where Id> =?and Id< =?”,1,20,1,new JobMapper( ),ClassManifestFactory $$.MODULE .fromClass(对象[]类)); 参数1和20表示查询的范围.