spark从入门到放弃三十四:Spark Sql(7) JDBC

文章地址:http://www.haha174.top/article/details/255779
1.简述

Spark Sql 支持使用jdbc 从关系型数据库读取数据(比如mysql ),读取的数据依然用DataSet 表示,很方便地使用Spark core 提供的各种算子进行处理。
这里有一个经验之谈,实际上用Spark Sql 处理JDBC 的数据是非常有用的。比如说你的mysql业务数据库中有大量的数据比如1000万,然后你现在需要编写一个程序处理线上脏数据某中复杂的业务逻辑,甚至复杂到可能涉及到用sparkSql 反复查询hive 中的数据,来进行关联处理。
那么此时用spark Sql 来通过jdbc 数据源,加载MySQL 中的数据,然后通过各种算子进行处理,是比较好的选择。因为spark 是分布式计算框架,对于1000万的数据,肯定要用分布式处理的。而如果你自己手动编写一个java 程序,那么估计要计算很久。
2.Demo 依旧是查询成绩大于80分的学生

public class JdbcDataSource {
    public static void main(String[] args) {
        SparkConf conf=new SparkConf().setAppName("JdbcDataSource");
        JavaSparkContext sc=new JavaSparkContext(conf);
        SQLContext sqlContext=new SQLContext(sc);
        //在两张表中分别取出  转换为  Dataset
        Map<String,String> options=new HashMap<String,String>();
        options.put("url","jdbc:mysql://haha174:3306/test");
        options.put("dbtable","students_infos");
        options.put("driver", "com.mysql.jdbc.Driver");
        options.put("user","root");
        options.put("password","root");
        Dataset studentsDS=sqlContext.read().format("jdbc").options(options).load();
        options.clear();
        options.put("url","jdbc:mysql://haha174:3306/test");
        options.put("dbtable","students_scores");
        options.put("driver", "com.mysql.jdbc.Driver");
        options.put("user","root");
        options.put("password","root");
        Dataset scoreDS =sqlContext.read().format("jdbc").options(options).load();
        //将两个DataSet  转换为JavaRDD
        JavaPairRDD<String,Tuple2<Integer,Integer>> studentRDD=studentsDS.javaRDD().mapToPair(new PairFunction<Row ,String,Integer>() {
            private static final long serialVersionUID=1L;
            @Override
            public Tuple2<String,Integer> call(Row row) throws Exception {
                return new Tuple2<String, Integer>(row.getString(1),row.getInt(2));
            }
        }).join(scoreDS.javaRDD().mapToPair(new PairFunction<Row ,String,Integer>() {
            private static final long serialVersionUID=1L;

            public Tuple2<String,Integer> call(Row row) throws Exception {
                return new Tuple2<String, Integer>(row.getString(1),row.getInt(2));
            }
        }));

        //将JavaRDD  转换为JavaRDD<Row>
        JavaRDD<Row> StudentRowRDD=studentRDD.map(new Function<Tuple2<String, Tuple2<Integer, Integer>>, Row>() {
            @Override
            public Row call(Tuple2<String, Tuple2<Integer, Integer>> stringTuple2Tuple2) throws Exception {
                return RowFactory.create(stringTuple2Tuple2._1,stringTuple2Tuple2._2._1,stringTuple2Tuple2._2._2);
            }
        });
        JavaRDD<Row> StudentRowRDDS=  StudentRowRDD.filter(new Function<Row, Boolean>() {
            @Override
            public Boolean call(Row row) throws Exception {
                if(row.getInt(2)>80)
                    return true;

                return false;
            }
        });
        List<StructField> structFieldList=new ArrayList<StructField>();
        structFieldList.add(DataTypes.createStructField("name",DataTypes.StringType,true));
        structFieldList.add(DataTypes.createStructField("age",DataTypes.IntegerType,true));
        structFieldList.add(DataTypes.createStructField("score",DataTypes.IntegerType,true));
        StructType structType=DataTypes.createStructType(structFieldList);
        Dataset studentRe=sqlContext.createDataFrame(StudentRowRDDS,structType);
        options.clear();
        options.put("url","jdbc:mysql://haha174:3306/test");
        options.put("dbtable","good_students_infos");
        options.put("driver", "com.mysql.jdbc.Driver");
        options.put("user","root");
        options.put("password","root");
        studentRe.write().format("jdbc").options(options).save();

    }
}

欢迎关注,更多福利

《spark从入门到放弃三十四:Spark Sql(7) JDBC》 这里写图片描述

    原文作者:意浅离殇
    原文地址: https://www.jianshu.com/p/1582f6f93a93
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞