flink实战开发----flinkSQL入门大全

flinkSQL概念介绍

Table API & SQL

Apache Flink具有两个关系API – 表API和SQL – 用于统一流和批处理。Table API是Scala和Java的语言集成查询API,允许以非常直观的方式组合来自关系运算符的查询,Table API和SQL接口彼此紧密集成,以及Flink的DataStream和DataSet API。您可以轻松地在基于API构建的所有API和库之间切换。例如,您可以使用CEP库从DataStream中提取模式,然后使用Table API分析模式,或者可以在预处理上运行Gelly图算法之前使用SQL查询扫描,过滤和聚合批处理表数据。

TableEnvironment

职责

TableEnvironment是Table API和SQL集成的核心概念。它负责:

  • 在内部目录中注册一个表
  • 注册外部目录
  • 执行SQL查询
  • 注册用户定义的(标量,表或聚合)函数
  • 把一个DataStreamDataSet转换为一个表Table
  • 持有对ExecutionEnvironmentStreamExecutionEnvironment的引用

创建一个TableEnvironment

一个TableEnvironment是通过调用静态创建TableEnvironment.getTableEnvironment()用的方法StreamExecutionEnvironmentExecutionEnvironment与可选的TableConfig。该TableConfig可用于配置TableEnvironment或定制查询优化和翻译过程

代码:

//获取table
val tableEnv = TableEnvironment.getTableEnvironment(env)

TableEnvironment中注册表

TableEnvironment维护一个表的目录,这些表是按名称注册的。有两种类型的表、输入表和输出表。输入表可以在表API和SQL查询中引用,并提供输入数据。输出表可以用来将表API或SQL查询的结果发送到外部系统。可以从各种来源注册输入表:

  • 现有Table对象,通常是Table API或SQL查询的结果。
  • TableSource,访问外部数据,例如文件,数据库或消息传递系统
  •  DataStreamDataSet来自DataStream或DataSet程序。注册一个DataStreamDataSet

一个输出表可以被注册使用TableSink

代码:

val tableEnv = TableEnvironment.getTableEnvironment(env)

val projTable: Table = tableEnv.scan("X").select(...)
//注册表
tableEnv.registerTable("projectedTable", projTable)

注册一个TableSink

一个已注册的表可以用来将表API或SQL查询的结果发送到外部存储系统,比如数据库、键值存储、消息队列或文件系统(在不同的编码中,例如CSV、Apache Parquet、Avro……)。

说白了就是:table sink的作用就是如何将flink sql查询的数据保存到外部系统,如hdfs或者本地文件,数据库,hbase等。

val tableEnv = TableEnvironment.getTableEnvironment(env)
// create a TableSink
val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)
// define the field names and types
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)
// register the TableSink as table "CsvSinkTable"
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)

SQL 语句

FlinkSQL,它实现了SQL标准。SQL查询被指定为常规字符串。

SQL文档描述了Flink对流式和批处理表的SQL支持。

主要包括:sqlQuery和sqlUpdate

sqlQuery:主要用于sql查询

sqlUpdate:用于删除,更新等操作

案例一:如何指定一个查询并将结果作为一张表返回

val tableEnv = TableEnvironment.getTableEnvironment(env)

val revenue = tableEnv.sqlQuery("""
  |SELECT cID, cName, SUM(revenue) AS revSum
  |FROM Orders
  |WHERE cCountry = 'FRANCE'
  |GROUP BY cID, cName
  """.stripMargin)

案例二:指定一个更新查询,该查询将其结果插入到已注册表中

val tableEnv = TableEnvironment.getTableEnvironment(env)

tableEnv.sqlUpdate("""
  |INSERT INTO RevenueFrance
  |SELECT cID, cName, SUM(revenue) AS revSum
  |FROM Orders
  |WHERE cCountry = 'FRANCE'
  |GROUP BY cID, cName
  """.stripMargin)

flinkSQL执行计划

表API和SQL查询将转换为DataStreamDataSet程序,具体取决于它们的输入是流式还是批量输入。查询在内部表示为逻辑查询计划,并分为两个阶段:

  1. 优化逻辑计划,
  2. 转换为DataStream或DataSet程序。

table与DataStream和DataSet API集成

表API和SQL查询可以轻松集成并嵌入到DataStreamDataSet程序中。例如,可以查询外部表(例如来自RDBMS),进行一些预处理,例如过滤,预测,聚合或加入元数据,然后使用DataStream或进一步处理数据。相反,Table API或SQL查询也可以应用于DataStream或DataSet程序的结果。这种相互作用可以通过将一个DataStream或DataSet转换为一个Table来实现,反之亦然。

Scala的隐式转换

scala表API功能的隐式转换DataSetDataStream以及Table类。org.apache.flink.table.api.scala._除了org.apache.flink.api.scala._   Scala DataStream API 之外,还可以通过导入包来启用这些转换

注:flink编程必须导入import org.apache.flink.api.scala._,flinkSQL编程必须导入import org.apache.flink.table.api._

将DataStream或DataSet转换为表

我们可以通过TableEnvironment将获得数据源的DataStream或DataSet转化成Table,在使用flinkSQL的时候这样将会十分便捷。

StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Tuple2<Long, String>> stream = ...
Table table1 = tableEnv.fromDataStream(stream);
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");

将Table转换为DataStream或DataSet

一个Table可以转换为DataStreamDataSet。通过这种方式,可以在Table API或SQL查询的结果上运行自定义DataStream或DataSet程序。

当转换一个TableDataStreamDataSet,需要指定将所得的数据类型DataStreamDataSet,即,数据类型到其中的行Table是要被转换。通常最方便的转换类型是Row。以下列表概述了不同选项的功能:

  • :字段按位置,任意数量的字段映射,支持null值,无类型安全访问。
  • POJO:字段按名称映射(POJO字段必须命名为Table字段),任意数量的字段,支持null值,类型安全访问。
  • 样例Case Class:字段按位置映射,不支持null值,类型安全访问。
  • 元组:字段按位置映射,限制为22(Scala)或25(Java)字段,不支持null值,类型安全访问。
  • 原子类型Table必须具有单个字段,不支持null值,类型安全访问。

将表转换为DataStream

一个Table是流媒体查询的结果将动态更新,即它正在改变,因为新记录的查询的输入流到达。因此,DataStream转换这种动态查询需要对表的更新进行编码。

一个Table转换为一个DataStream有两种模式:

  1. 追加模式:只有在动态Table仅通过INSERT更改修改时才能使用此模式,即它仅附加并且以前发出的结果永远不会更新。
  2. 缩进模式:始终可以使用此模式。它用标志编码INSERTDELETE改变boolean
val tableEnv = TableEnvironment.getTableEnvironment(env)
// Table with two fields (String name, Integer age)
val table: Table = ...
// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)
// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String, Int)] dsTuple = 
  tableEnv.toAppendStream[(String, Int)](table)

val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)

将表转换为DataSet

val tableEnv = TableEnvironment.getTableEnvironment(env)
// Table with two fields (String name, Integer age)
val table: Table = ...
// convert the Table into a DataSet of Row
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)
// convert the Table into a DataSet of Tuple2[String, Int]
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)

flinkSQL语法

SQL查询是使用sqlQuery()方法指定的TableEnvironment。该方法返回SQL查询的结果为Table。A Table可以在后续的SQL和Table API查询中使用,可以转换为DataSet或DataStream,也可以写入TableSink)。要访问SQL查询中的表,必须在TableEnvironment中注册它。可以从TableSourceTableDataStream或DataSet 注册表。或者,用户还可以在TableEnvironment中注册外部目录以指定数据源的位置。

注意: Flink的SQL支持尚未完成。包含不受支持的SQL功能的查询会导致a TableException。以下部分列出了批处理和流表上SQL的受支持功能。

支持的语法

支持标准的ANSI SQL。Flink不支持DDL语句。

入门案例

需求:

使用flinkSQL,获取文本中的用户的姓名

导入依赖

table API和SQL捆绑在flink-tableMaven工件中。必须将以下依赖项添加到项目中才能使用Table API和SQL:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table_2.11</artifactId>
  <version>1.7.0</version>
</dependency>

此外,您需要为Flink的Scala批处理或流API添加依赖项。对于批处理查询,您需要添加:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-scala_2.11</artifactId>
  <version>1.7.0</version>
</dependency>
对于流式查询,您需要添加: 
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-scala_2.11</artifactId>
  <version>1.7.0</version>
</dependency>

数据准备

创建一个person.txt,内容如下:

kebe men
wede men
baby wemen
james men

代码

package flinkSQL
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{Table, TableEnvironment}
import scala.language.postfixOps
/**
  * Created by  ${WangZhiHua} on 2018/11/12
  */
object sql_test {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //获取table
      val tableEnv = TableEnvironment.getTableEnvironment(env)
    //读取数据源
    val source1 = env.readTextFile("C:/flink_data/person.txt")
    val source2: DataStream[Person1] = source1.map(x=>{
      val split = x.split(" ")
      ( Person1(split(0),split(1)))
    })
    //将DataStream转化成Table
    val table1 = tableEnv.fromDataStream(source2)
    //注册表,表名为:person
    tableEnv.registerTable("person",table1)
    //获取表中所有信息
    val rs: Table = tableEnv.sqlQuery("select *  from person ")
    val stream: DataStream[String] = rs
    //过滤获取name这一列的数据
      .select("name")
      //将表转化成DataStream
      .toAppendStream[String]
     stream.print()
    env.execute("flinkSQL")
  }
}

/**
  * 定义样例类封装数据
  */
case class  Person1(name:String ,score:String)

案例二:双流join

数据准备

数据一

2016-07-28 13:00:01.820,000001,10.2
2016-07-28 13:00:01.260,000001,10.2
2016-07-28 13:00:02.980,000001,10.1
2016-07-28 13:00:04.330,000001,10.0
2016-07-28 13:00:05.570,000001,10.0
2016-07-28 13:00:05.990,000001,10.0
2016-07-28 13:00:14.000,000001,10.1
2016-07-28 13:00:20.000,000001,10.2

数据二

2016-07-28 13:00:01.000,000001,10.2
2016-07-28 13:00:04.000,000001,10.1
2016-07-28 13:00:07.000,000001,10.0
2016-07-28 13:00:16.000,000001,10.1

代码

package flinkSQL

import java.text.SimpleDateFormat

import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.scala._

/**
  * Created by  ${WangZhiHua} on 2018/11/13
  */


object JoinDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //获取接口传送的数据
    val dataStream1 = env.readTextFile("C:/flink_data/scoket1.txt")
    val dataStream2 = env.readTextFile("C:/flink_data/scoket2.txt")
    val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
   //使用样例类StockTransaction封装获取的数据
    val dataStreamMap1 = dataStream1.map(f => {
      val tokens1 = f.split(",")
      StockTransaction(tokens1(0), tokens1(1), tokens1(2).toDouble)
    })
      .assignAscendingTimestamps(f => format.parse(f.tx_time).getTime)
    //使用样例类StockSnapshot封装获取的数据
    val dataStreamMap2 = dataStream2.map(f => {
      val tokens2 = f.split(",")
      StockSnapshot(tokens2(0), tokens2(1), tokens2(2).toDouble)
    })
      .assignAscendingTimestamps(f => format.parse(f.md_time).getTime)

    /**
      * 进行双流join
      * 限定范围是:3秒钟的Event time时间窗口
      */

    val joinStream = dataStreamMap1.coGroup(dataStreamMap2)
      .where(_.tx_code)
      .equalTo(_.md_code)
      .window(TumblingEventTimeWindows.of(Time.seconds(3)))

      val innerJoinStream = joinStream.apply(new InnerJoinFunction)
     innerJoinStream.name("innerJoin").print()
    print("===================== end =========================")
    env.execute("join demo")
  }

}
case class StockTransaction(tx_time:String, tx_code:String,tx_value:Double)
case class StockSnapshot(md_time:String, md_code:String,md_value:Double)
class InnerJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)]{
  override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double, Double,String)]): Unit = {

    /**
      * 将Java中的Iterable对象转换为Scala的Iterable
      * scala的集合操作效率高,简洁
      */
    import scala.collection.JavaConverters._
    val scalaT1 = T1.asScala.toList
    val scalaT2 = T2.asScala.toList

    /**
      * Inner Join要比较的是同一个key下,同一个时间窗口内的数据
      */
    if(scalaT1.nonEmpty && scalaT2.nonEmpty){
      for(transaction <- scalaT1){
        for(snapshot <- scalaT2){
          out.collect(transaction.tx_code,transaction.tx_time, snapshot.md_time,transaction.tx_value,snapshot.md_value,"Inner Join Test")
        }
      }
    }
  }
}

class LeftJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)] {
  override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double,Double,String)]): Unit = {
    /**
      * 将Java中的Iterable对象转换为Scala的Iterable
      * scala的集合操作效率高,简洁
      */
    import scala.collection.JavaConverters._
    val scalaT1 = T1.asScala.toList
    val scalaT2 = T2.asScala.toList

    /**
      * Left Join要比较的是同一个key下,同一个时间窗口内的数据
      */
    if(scalaT1.nonEmpty && scalaT2.isEmpty){
      for(transaction <- scalaT1){
        out.collect(transaction.tx_code,transaction.tx_time, "",transaction.tx_value,0,"Left Join Test")
      }
    }
  }
}
class RightJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)] {
  override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double,Double,String)]): Unit = {
    /**
      * 将Java中的Iterable对象转换为Scala的Iterable
      * scala的集合操作效率高,简洁
      */
    import scala.collection.JavaConverters._
    val scalaT1 = T1.asScala.toList
    val scalaT2 = T2.asScala.toList

    /**
      * Right Join要比较的是同一个key下,同一个时间窗口内的数据
      */
    if(scalaT1.isEmpty && scalaT2.nonEmpty){
      for(snapshot <- scalaT2){
        out.collect(snapshot.md_code, "",snapshot.md_time,0,snapshot.md_value,"Right Join Test")
      }
    }
  }
}

运行结果

《flink实战开发----flinkSQL入门大全》

扫一扫加入大数据技术交流群,了解更多大数据技术,还有免费资料等你哦

扫一扫加入大数据技术交流群,了解更多大数据技术,还有免费资料等你哦

扫一扫加入大数据技术交流群,了解更多大数据技术,还有免费资料等你哦

《flink实战开发----flinkSQL入门大全》

 

 

 

 

 

 

    原文作者:阿华田512
    原文地址: https://blog.csdn.net/aA518189/article/details/83992129
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞