Spark中RDD和DataFrame互相转换的原因及方法

本文讲解Spark中的RDD和DataFrame之间的互相转换,主要内容包括以下几点:

1、RDD转DataFrame原因及方式

2、DataFrame转RDD原因及方式

3、DataFrame转RDD的案例

1、RDD转DataFrame原因及方式

可以将RDD转成DataFrame之后,借用sparksql和sql以及HQL语句快速方便的使用sql语句统计和查询,针对HDFS中的数据,直接就可以使用SQL进行查询。

将RDD转化为DataFrame有两种方式:

方式一:通过反射推断schema

方式二、编程指定schema

2、DataFrame转RDD原因及方式

  1. 解决一些使用sql难以处理的统计分析
  2. 将数据写入Mysql
  3. a.DataFrame的write.jdbc,仅支持四种模式:append、overwrite、ignore、default
  4. b.使用rdd的话,除了上述以外还支持insert 和 update操作,还支持数据库连接池 (自定义,第三方:c3p0 hibernate mybatis)方式,批量高效将大量数据写入 Mysql

3、DataFrame转RDD的案例

Java版本

package com.spark.sql;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import scala.Function1;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

public class RddToDFjava {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setMaster("local[2]")
                .setAppName("RddToDF");

        SparkSession spark = SparkSession.builder()
                .config(conf)
                .getOrCreate();

        JavaRDD<String> dept =  spark.sparkContext().textFile("data/dept.txt", 1)

        JavaRDD<DeptBean> deptRDD = dept.map(line -> {
            String[] arr = line.split("\t");
            DeptBean deptBean = new DeptBean();
            int deptid;
            if (!arr[0].isEmpty()) {
                deptid = Integer.valueOf(arr[0]);
            } else {
                deptid = 0;
            }
            deptBean.setDeptid(deptid);
            deptBean.setDetname(arr[1]);
            deptBean.setAdd(arr[2]);
            return deptBean;
        });

        Dataset<Row> deptDF = spark.createDataFrame(deptRDD, DeptBean.class);

        deptDF.printSchema();
        deptDF.show();

    }
}

Scala版本

package com.spark.sql

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types.{FloatType, IntegerType, StructField, StructType,StringType}

object Rdd2DataFrame {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
      .setMaster("local[2]")
      .setAppName(getClass.getSimpleName)

    // 创建SparkSession     val spark = SparkSession
      .builder()
      .config(conf)
// .enableHiveSupport()       .getOrCreate()

    // 导入隐式转换包     import spark.implicits._

    val deptLine: RDD[String] = spark.sparkContext.textFile("data/dept.txt")

    case class Dept(deptid:Int, deptname:String, addr:String)
   
    val dept: RDD[Dept] = deptLine.map(line => {
      val arr: Array[String] = line.split("\t")
      val deptid = arr(0).toInt
      val deptname = arr(1)
      val add = arr(2)
      Dept(deptid,deptname,add)
    })

    val deptDF: DataFrame = dept.toDF()
    deptDF.show()

  }

}

    原文作者:江户小宝
    原文地址: https://zhuanlan.zhihu.com/p/61772329
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞