Dataset的groupBy agg示例
Dataset<Row> resultDs = dsParsed .groupBy("enodeb_id", "ecell_id") .agg( functions.first("scan_start_time").alias("scan_start_time1"), functions.first("insert_time").alias("insert_time1"), functions.first("mr_type").alias("mr_type1"), functions.first("mr_ltescphr").alias("mr_ltescphr1"), functions.first("mr_ltescpuschprbnum").alias("mr_ltescpuschprbnum1"), functions.count("enodeb_id").alias("rows1")) .selectExpr( "ecell_id", "enodeb_id", "scan_start_time1 as scan_start_time", "insert_time1 as insert_time", "mr_type1 as mr_type", "mr_ltescphr1 as mr_ltescphr", "mr_ltescpuschprbnum1 as mr_ltescpuschprbnum", "rows1 as rows");
Dataset Join示例:
Dataset<Row> ncRes = sparkSession.read().option("delimiter", "|").option("header", true).csv("/user/csv"); Dataset<Row> mro=sparkSession.sql("。。。"); Dataset<Row> ncJoinMro = ncRes .join(mro, mro.col("id").equalTo(ncRes.col("id")).and(mro.col("calid").equalTo(ncRes.col("calid"))), "left_outer") .select(ncRes.col("id").as("int_id"), mro.col("vendor_id"), 。。。
);
join condition另外一种方式:
leftDfWithWatermark.join(rightDfWithWatermark,
expr(""" leftDfId = rightDfId AND leftDfTime >= rightDfTime AND leftDfTime <= rightDfTime + interval 1 hour"""),
joinType = "leftOuter" )
BroadcastHashJoin示例:
package com.dx.testbroadcast; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.functions; import java.io.*; public class Test { public static void main(String[] args) { String personPath = "E:\\person.csv"; String personOrderPath = "E:\\personOrder.csv"; //writeToPersion(personPath); //writeToPersionOrder(personOrderPath); SparkConf conf = new SparkConf(); SparkSession sparkSession = SparkSession.builder().config(conf).appName("test-broadcast-app").master("local[*]").getOrCreate(); Dataset<Row> person = sparkSession.read() .option("header", "true") .option("inferSchema", "true") //是否自动推到内容的类型 .option("delimiter", ",").csv(personPath).as("person"); person.printSchema(); Dataset<Row> personOrder = sparkSession.read() .option("header", "true") .option("inferSchema", "true") //是否自动推到内容的类型 .option("delimiter", ",").csv(personOrderPath).as("personOrder"); personOrder.printSchema(); // Default `inner`. Must be one of:`inner`, `cross`, `outer`, `full`, `full_outer`, `left`, `left_outer`,`right`, `right_outer`, `left_semi`, `left_anti`. Dataset<Row> resultDs = personOrder.join(functions.broadcast(person), personOrder.col("personid").equalTo(person.col("id")),"left"); resultDs.explain();
resultDs.show(10); } private static void writeToPersion(String personPath) { BufferedWriter personWriter = null; try { personWriter = new BufferedWriter(new FileWriter(personPath)); personWriter.write("id,name,age,address\r\n"); for (int i = 0; i < 10000; i++) { personWriter.write("" + i + ",person-" + i + "," + i + ",address-address-address-address-address-address-address" + i + "\r\n"); } } catch (Exception e) { e.printStackTrace(); } finally { if (personWriter != null) { try { personWriter.close(); } catch (IOException e) { e.printStackTrace(); } } } } private static void writeToPersionOrder(String personOrderPath) { BufferedWriter personWriter = null; try { personWriter = new BufferedWriter(new FileWriter(personOrderPath)); personWriter.write("personid,name,age,address\r\n"); for (int i = 0; i < 1000000; i++) { personWriter.write("" + i + ",person-" + i + "," + i + ",address-address-address-address-address-address-address" + i + "\r\n"); } } catch (Exception e) { e.printStackTrace(); } finally { if (personWriter != null) { try { personWriter.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
打印结果:
== Physical Plan == *(2) BroadcastHashJoin [personid#28], [id#10], LeftOuter, BuildRight :- *(2) FileScan csv [personid#28,name#29,age#30,address#31] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/E:/personOrder.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<personid:int,name:string,age:int,address:string> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))) +- *(1) Project [id#10, name#11, age#12, address#13] +- *(1) Filter isnotnull(id#10) +- *(1) FileScan csv [id#10,name#11,age#12,address#13] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/E:/person.csv], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:int,name:string,age:int,address:string> +--------+--------+---+--------------------+---+--------+---+--------------------+ |personid| name|age| address| id| name|age| address| +--------+--------+---+--------------------+---+--------+---+--------------------+ | 0|person-0| 0|address-address-a...| 0|person-0| 0|address-address-a...| | 1|person-1| 1|address-address-a...| 1|person-1| 1|address-address-a...| | 2|person-2| 2|address-address-a...| 2|person-2| 2|address-address-a...| | 3|person-3| 3|address-address-a...| 3|person-3| 3|address-address-a...| | 4|person-4| 4|address-address-a...| 4|person-4| 4|address-address-a...| | 5|person-5| 5|address-address-a...| 5|person-5| 5|address-address-a...| | 6|person-6| 6|address-address-a...| 6|person-6| 6|address-address-a...| | 7|person-7| 7|address-address-a...| 7|person-7| 7|address-address-a...| | 8|person-8| 8|address-address-a...| 8|person-8| 8|address-address-a...| | 9|person-9| 9|address-address-a...| 9|person-9| 9|address-address-a...| +--------+--------+---+--------------------+---+--------+---+--------------------+ only showing top 10 rows
SparkSQL Broadcast HashJoin
person.createOrReplaceTempView("temp_person"); personOrder.createOrReplaceTempView("temp_person_order"); Dataset<Row> sqlResult = sparkSession.sql( " SELECT /*+ BROADCAST (t11) */" + " t11.id,t11.name,t11.age,t11.address," + " t10.personid as person_id,t10.name as persion_order_name" + " FROM temp_person_order as t10 " + " inner join temp_person as t11" + " on t11.id = t10.personid "); sqlResult.show(10); sqlResult.explain();
打印日志
+---+--------+---+--------------------+---------+------------------+ | id| name|age| address|person_id|persion_order_name| +---+--------+---+--------------------+---------+------------------+ | 0|person-0| 0|address-address-a...| 0| person-0| | 1|person-1| 1|address-address-a...| 1| person-1| | 2|person-2| 2|address-address-a...| 2| person-2| | 3|person-3| 3|address-address-a...| 3| person-3| | 4|person-4| 4|address-address-a...| 4| person-4| | 5|person-5| 5|address-address-a...| 5| person-5| | 6|person-6| 6|address-address-a...| 6| person-6| | 7|person-7| 7|address-address-a...| 7| person-7| | 8|person-8| 8|address-address-a...| 8| person-8| | 9|person-9| 9|address-address-a...| 9| person-9| +---+--------+---+--------------------+---------+------------------+ only showing top 10 rows 19/06/24 09:35:50 INFO FileSourceStrategy: Pruning directories with: 19/06/24 09:35:50 INFO FileSourceStrategy: Post-Scan Filters: isnotnull(personid#28) 19/06/24 09:35:50 INFO FileSourceStrategy: Output Data Schema: struct<personid: int, name: string> 19/06/24 09:35:50 INFO FileSourceScanExec: Pushed Filters: IsNotNull(personid) 19/06/24 09:35:50 INFO FileSourceStrategy: Pruning directories with: 19/06/24 09:35:50 INFO FileSourceStrategy: Post-Scan Filters: isnotnull(id#10) 19/06/24 09:35:50 INFO FileSourceStrategy: Output Data Schema: struct<id: int, name: string, age: int, address: string ... 2 more fields> 19/06/24 09:35:50 INFO FileSourceScanExec: Pushed Filters: IsNotNull(id) == Physical Plan == *(2) Project [id#10, name#11, age#12, address#13, personid#28 AS person_id#94, name#29 AS persion_order_name#95] +- *(2) BroadcastHashJoin [personid#28], [id#10], Inner, BuildRight :- *(2) Project [personid#28, name#29] : +- *(2) Filter isnotnull(personid#28) : +- *(2) FileScan csv [personid#28,name#29] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/E:/personOrder.csv], PartitionFilters: [], PushedFilters: [IsNotNull(personid)], ReadSchema: struct<personid:int,name:string> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))) +- *(1) Project [id#10, name#11, age#12, address#13] +- *(1) Filter isnotnull(id#10) +- *(1) FileScan csv [id#10,name#11,age#12,address#13] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/E:/person.csv], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:int,name:string,age:int,address:string> 19/06/24 09:35:50 INFO SparkContext: Invoking stop() from shutdown hook