文章地址:http://www.haha174.top/article/details/255779
1.简述
Spark Sql 支持使用jdbc 从关系型数据库读取数据(比如mysql ),读取的数据依然用DataSet 表示,很方便地使用Spark core 提供的各种算子进行处理。
这里有一个经验之谈,实际上用Spark Sql 处理JDBC 的数据是非常有用的。比如说你的mysql业务数据库中有大量的数据比如1000万,然后你现在需要编写一个程序处理线上脏数据某中复杂的业务逻辑,甚至复杂到可能涉及到用sparkSql 反复查询hive 中的数据,来进行关联处理。
那么此时用spark Sql 来通过jdbc 数据源,加载MySQL 中的数据,然后通过各种算子进行处理,是比较好的选择。因为spark 是分布式计算框架,对于1000万的数据,肯定要用分布式处理的。而如果你自己手动编写一个java 程序,那么估计要计算很久。
2.Demo 依旧是查询成绩大于80分的学生
public class JdbcDataSource {
public static void main(String[] args) {
SparkConf conf=new SparkConf().setAppName("JdbcDataSource");
JavaSparkContext sc=new JavaSparkContext(conf);
SQLContext sqlContext=new SQLContext(sc);
//在两张表中分别取出 转换为 Dataset
Map<String,String> options=new HashMap<String,String>();
options.put("url","jdbc:mysql://haha174:3306/test");
options.put("dbtable","students_infos");
options.put("driver", "com.mysql.jdbc.Driver");
options.put("user","root");
options.put("password","root");
Dataset studentsDS=sqlContext.read().format("jdbc").options(options).load();
options.clear();
options.put("url","jdbc:mysql://haha174:3306/test");
options.put("dbtable","students_scores");
options.put("driver", "com.mysql.jdbc.Driver");
options.put("user","root");
options.put("password","root");
Dataset scoreDS =sqlContext.read().format("jdbc").options(options).load();
//将两个DataSet 转换为JavaRDD
JavaPairRDD<String,Tuple2<Integer,Integer>> studentRDD=studentsDS.javaRDD().mapToPair(new PairFunction<Row ,String,Integer>() {
private static final long serialVersionUID=1L;
@Override
public Tuple2<String,Integer> call(Row row) throws Exception {
return new Tuple2<String, Integer>(row.getString(1),row.getInt(2));
}
}).join(scoreDS.javaRDD().mapToPair(new PairFunction<Row ,String,Integer>() {
private static final long serialVersionUID=1L;
public Tuple2<String,Integer> call(Row row) throws Exception {
return new Tuple2<String, Integer>(row.getString(1),row.getInt(2));
}
}));
//将JavaRDD 转换为JavaRDD<Row>
JavaRDD<Row> StudentRowRDD=studentRDD.map(new Function<Tuple2<String, Tuple2<Integer, Integer>>, Row>() {
@Override
public Row call(Tuple2<String, Tuple2<Integer, Integer>> stringTuple2Tuple2) throws Exception {
return RowFactory.create(stringTuple2Tuple2._1,stringTuple2Tuple2._2._1,stringTuple2Tuple2._2._2);
}
});
JavaRDD<Row> StudentRowRDDS= StudentRowRDD.filter(new Function<Row, Boolean>() {
@Override
public Boolean call(Row row) throws Exception {
if(row.getInt(2)>80)
return true;
return false;
}
});
List<StructField> structFieldList=new ArrayList<StructField>();
structFieldList.add(DataTypes.createStructField("name",DataTypes.StringType,true));
structFieldList.add(DataTypes.createStructField("age",DataTypes.IntegerType,true));
structFieldList.add(DataTypes.createStructField("score",DataTypes.IntegerType,true));
StructType structType=DataTypes.createStructType(structFieldList);
Dataset studentRe=sqlContext.createDataFrame(StudentRowRDDS,structType);
options.clear();
options.put("url","jdbc:mysql://haha174:3306/test");
options.put("dbtable","good_students_infos");
options.put("driver", "com.mysql.jdbc.Driver");
options.put("user","root");
options.put("password","root");
studentRe.write().format("jdbc").options(options).save();
}
}
欢迎关注,更多福利
这里写图片描述