Flink Table API&SQL的概念和通用API

Table API和SQL通过join API集成在一起,这个join API的核心概念是Table,Table可以作为查询的输入和输出。这篇文档展示了使用Table API和SQL查询的程序的通用结构,如何注册一个Table,如何查询一个Table以及如何将数据发给Table。

Table API和SQL查询程序的结构

所有批处理和流处理的Table API、SQL程序都有如下相同的模式,下面例子的代码展示了Table API和SQL程序的通用结构:

// 对于批处理程序来说使用 ExecutionEnvironment 来替换 StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 创建一个TableEnvironment
// 对于批处理程序来说使用 BatchTableEnvironment 替换 StreamTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// 注册一个 Table
tableEnv.registerTable("table1", ...)            // 或者
tableEnv.registerTableSource("table2", ...);     // 或者
tableEnv.registerExternalCatalog("extCat", ...);

// 从Table API的查询中创建一个Table
Table tapiResult = tableEnv.scan("table1").select(...);
// 从SQL查询中创建一个Table
Table sqlResult  = tableEnv.sql("SELECT ... FROM table2 ... ");

// 将Table API 种的结果 Table 发射到TableSink中 , SQL查询也是一样的
tapiResult.writeToSink(...);

// 执行
env.execute();
// 对于批处理程序来说使用 ExecutionEnvironment 来替换 StreamExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment

// 创建一个TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 注册一个 Table
tableEnv.registerTable("table1", ...)           // 或者
tableEnv.registerTableSource("table2", ...)     // 或者
tableEnv.registerExternalCatalog("extCat", ...) 

// 从Table API的查询中创建一个Table
val tapiResult = tableEnv.scan("table1").select(...)
// 从SQL查询中创建一个Table
val sqlResult  = tableEnv.sql("SELECT ... FROM table2 ...")

// 将Table API 种的结果 Table 发射到TableSink中 , SQL查询也是一样的
tapiResult.writeToSink(...)

// 执行
env.execute()

注意:Table API 和 SQL查询可以轻易地进行集成并嵌入到DataStream或者DataSet程序中,请参考Integration With DataStream and DataSet API部分来了解DataStream和DataSet如何转换成Table及Table如何转换成DataStream和DataSet。

创建一个TableEnvironment

TableEnvironment是Table API和SQL集成的核心概念,它主要负责:
  1、在内部目录中注册一个Table
  2、注册一个外部目录
  3、执行SQL查询
  4、注册一个用户自定义函数(标量、表及聚合)
  5、将DataStream或者DataSet转换成Table
  6、持有ExecutionEnvironment或者StreamExecutionEnvironment的引用
一个Table总是会绑定到一个指定的TableEnvironment中,相同的查询不同的TableEnvironment是无法通过join、union合并在一起。

TableEnvironment可以通过调用带有参数StreamExecutionEnvironment或者ExecutionEnvironment和一个可选参数TableConfig的静态方法TableEnvironment.getTableEnvironment()来创建。TableConf可以用来配置TableEnvironment或者自定义查询优化器和翻译过程(参考查询优化器)

// ***************
// STREAMING QUERY
// ***************
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
// 为streaming查询创建一个 TableEnvironment
StreamTableEnvironment sTableEnv = TableEnvironment.getTableEnvironment(sEnv);

// ***********
// BATCH QUERY
// ***********
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
// 为批查询创建一个 TableEnvironment
BatchTableEnvironment bTableEnv = TableEnvironment.getTableEnvironment(bEnv);
// ***************
// STREAMING QUERY
// ***************
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
// 为流查询创建一个 TableEnvironment
val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)

// ***********
// BATCH QUERY
// ***********
val bEnv = ExecutionEnvironment.getExecutionEnvironment
// 为批查询创建一个 TableEnvironment
val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)

在Catalog(目录)中注册一个Table

TableEnvironment有一个在内部通过表名组织起来的表目录,Table API或者SQL查询可以访问注册在目录中的表,并通过名称来引用它们。
TableEnvironment允许通过各种源来注册一个表:
  1、一个已存在的Table对象,通常是Table API或者SQL查询的结果
  2、TableSource,可以访问外部数据如文件、数据库或者消息系统
  3、DataStream或者DataSet程序中的DataStream或者DataSet

将DataStream或者DataSet注册为一个表将在Integration With DataStream and DataSet API中讨论。

注册一个Table

一个Table可以在TableEnvironment中按照下面程序注册:

// 获取一个 StreamTableEnvironment,  BatchTableEnvironment也是同样的方法
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// Table 是简单的投影查询的结果 
Table projTable = tableEnv.scan("X").project(...);

// 将 Table projTable 注册为表 "projectedX"
tableEnv.registerTable("projectedTable", projTable);
// 获取一个TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// Table 是简单的投影查询的结果 
val projTable: Table = tableEnv.scan("X").project(...)

// 将 Table projTable 注册为表 "projectedX"
tableEnv.registerTable("projectedTable", projTable)

注意:一个注册的Table被当做是与关系型数据库中的视图类似,即定义Table的查询不会被优化,但是当其他查询引用到已注册的Table时会被内联。如果多个查询引用同一个已注册的Table,这个Table会跟每个查询内联并进行多次执行,即:已注册的Table的结果不会共享。

注册一个TableSource

TableSource可以访问保存在外部存储系统如数据库系统(MySQL、HBase…),指定编码格式的文件(CSV, Apache [Parquet, Avro, ORC],…)或者消息系统(Apache Kafka,RabbitMQ,…)中的数据。

Flink的目标是为通用的数据格式和存储系统提供TableSource,请参考Table Sources和Sinks页来了解Flink所支持的TableSource列表及如何自定义一个TableSource。

一个TableSource可以在TableEnvironment中按如下方式来定义:

// 获取一个StreamTableEnvironment, 同样适用于BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// 创建一个 TableSource
TableSource csvSource = new CsvTableSource("/path/to/file", ...);

// 将TableSource注册为表 "CsvTable"
tableEnv.registerTableSource("CsvTable", csvSource);
// 获取一个 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 创建一个TableSource
val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)

// 将 TableSource 注册为表 "CsvTable"
tableEnv.registerTableSource("CsvTable", csvSource)

注册一个外部Catalog(目录)

一个外部目录提供了关于外部数据库和表的信息如:它们的名称、模式、统计及如何访问保存在外部数据库、表和文件中的数据。

一个外部目录可以通过实现ExternalCatalog接口来创建并在TableEnvironment中注册,如下:

// 获取一个 StreamTableEnvironment, 同样适用于 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// 创建一个外部catalog
ExternalCatalog catalog = new InMemoryExternalCatalog();

// 注册 ExternalCatalog
tableEnv.registerExternalCatalog("InMemCatalog", catalog);
// 获取一个 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 创建一个 catalog
val catalog: ExternalCatalog = new InMemoryExternalCatalog

// 注册 ExternalCatalog
tableEnv.registerExternalCatalog("InMemCatalog", catalog)

一旦在TableEnvironment中注册之后,所有定义在ExternalCatalog中的表都可以通过指定全路径如:catalog.database.table 在Table API或者SQL查询来访问。

目前,Flink提供InMemoryExternalCatalog来做demo或者测试。然而,ExternalCatalog接口还可以被用来连接HCatalog或者Metastore到Table API。

查询一个Table

Table API

Table API是一个Scala和Java的语言集成查询API,与SQL相反,查询并不指定为字符串而是根据主机语言一步一步的构建。

Table API是基于Table类来的,Table类代表了一个流或者批表,并且提供方法来使用关系型操作。这些方法返回一个新的Table对象,这个Table对象代表着输入的Table应用关系型操作后的结果。一些关系型操作是由多个方法调用组成的如:table.groupBy(...).select(), 其中groupBy(...)指定了table的分组,而select(...)则是table分组的映射。

Table API文档描述了streaming和batch表所支持的所有Table API操作。

下面的例子展示了一个简单的Table API聚合查询:

// 获取一个 StreamTableEnvironment, 同样适用于 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// 注册一个名叫 Orders 的表

// 扫描注册的 Orders 表
Table orders = tableEnv.scan("Orders");
// 计算所有来自法国的客户的收入
Table revenue = orders
  .filter("cCountry === 'FRANCE'")
  .groupBy("cID, cName")
  .select("cID, cName, revenue.sum AS revSum");

// 发射或者转换一个 Table
// 执行查询
// 获取一个 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 注册一个名叫 Orders 的表

// 扫描已注册的 Orders 表
Table orders = tableEnv.scan("Orders")
// 计算所有来自法国偶的客户的收入
Table revenue = orders
  .filter('cCountry === "FRANCE")
  .groupBy('cID, 'cName)
  .select('cID, 'cName, 'revenue.sum AS 'revSum)

// 发射或者转换一个Table
// 执行查询

注意:Scala Table API使用Scala的符号在引用表属性时,以’`’开始,Table API使用Scala的隐式转换,为了使用Scala的隐式转换,请确保导入org.apache.flink.api.scala._org.apache.flink.table.api.scala._

SQL

Flink的SQL集成是基于Apache Calcite的,Apache Calcite实现了标准的SQL,SQL查询被指定为常规字符串。

SQL文档描述了Flink对流和批表的SQL支持。
下面的例子展示了如何指定一个查询并返回一个Table结果;

// 获取一个 StreamTableEnvironment, 同样适用于BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// 注册一个名叫Orders 的表

// 计算所有来自法国的客户的收入
Table revenue = tableEnv.sql(
    "SELECT cID, cName, SUM(revenue) AS revSum " +
    "FROM Orders " +
    "WHERE cCountry = 'FRANCE' " +
    "GROUP BY cID, cName"
  );

// 发射或者转换一个Table
// 执行查询
// 获取一个 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

//注册一个名叫 Orders的表

// 计算所有来自法国的客户的收入
Table revenue = tableEnv.sql(""" 
  |SELECT cID, cName, SUM(revenue) AS revSum
  |FROM Orders
  |WHERE cCountry = 'FRANCE'
  |GROUP BY cID, cName
  """.stripMargin)

// 发射或者转换 Table
// 执行查询

混合使用Table API和SQL

Table API和SQL查询可以很容易地合并因为它们都返回Table对象:
  1、Table API查询可以基于SQL查询结果的Table来进行
  2、SQL查询可以基于Table API查询的结果来定义

发射一个Table

为了发射一个Table,可以将其写入一个TableSink中,TableSink 是支持各种文件格式(如:CSV, Apache Parquet, Apache Avro)、存储系统(如:JDBC, Apache HBase, Apache Cassandra, Elasticsearch)或者消息系统(如:Apache Kafka,RabbitMQ)的通用接口。

一个批Table只能写入BatchTableSink中,而流Table需要一个AppendStreamTableSinkRetractStreamTableSink或者UpsertStreamTableSink

请参考Table Sources & Sinks文档来了解更多可用sink的信息和如何实现一个自定义的TableSink。

// 获取一个StreamTableEnvironment, 同样适用于 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// 使用Table API和/或SQL查询获取一个 Table
Table result = ...

// 创建一个TableSink
TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|");

// 将结果Table写入TableSink中
result.writeToSink(sink);

// 执行程序
// 获取一个TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 使用Table API和/或SQL查询获取一个 Table
val result: Table = ...

//创建一个 TableSink
val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")

// 将结果 Table写入TableSink中
result.writeToSink(sink)

// 执行程序

翻译和执行一个查询

Table API和SQL查询根据输入是流还是批翻译成DataStream或者DataSet,查询内部表示为一个逻辑查询计划,并分两个阶段进行翻译:
  1、优化逻辑计划
  2、翻译成一个DataStream或者DataSet程序

Table API或者SQL查询会在下面情况下触发:
  当调用Table.writeToSink()时,Table会发射到TableSink中
  Table转换DataStream或者DataSet时(参考与DataStream和DataSet API集成)
一旦翻译,Table API或者SQL查询就会像常规DataStream或DataSet处理一样,并且当StreamExecutionEnvironment.execute()或者ExecutionEnvironment.execute()调用时执行。

与DataStream和DataSet API集成

Table API和SQL查询可以很容易地进行集成并嵌入到DataStreamDataSet程序中。例如:我们可以查询一个外部表(如:来自关系型数据库的表)、做一些预处理,如过滤、映射、聚合或者与元数据关联,然后使用DataStream或者DataSet API(及其他基于这些API的库,如CEP或Gelly)进行进一步处理。同样,Table API或者SQL查询也可以应用于DataStream或者DataSet程序的结果中。

这种交互可以通过将DataStream或者DataSet转换成一个Table及将Table转换成DataStream或者DataSet来实现。在本节,我们将描述这些转换是如何完成的。

Scala 隐式转换

Scala Table API为DataSet、DataStream和Table类提供了隐式转换功能。这些转换可以通过导入Scala DataStream API中的org.apache.flink.table.api.scala._org.apache.flink.api.scala._包来启用。

注册一个DataStream或者DataSet为Table

一个DataStream或者DataSet可以在TableEnvironment中注册为Table,表的结果模式根据注册的DataStream或者DataSet的数据类型来定。请参考数据类型映射到表模式来了解更详细的信息。

// 获取 StreamTableEnvironment
// 注册一个DataSet 到BatchTableEnvironment也是等效的
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Tuple2<Long, String>> stream = ...

// 注册DataStream 为表  "myTable" ,并有两个字段 "f0", "f1"
tableEnv.registerDataStream("myTable", stream);

// 注册 DataStream 为表 "myTable2" 并有两个字段 "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, "myLong, myString");
// 获取 TableEnvironment 
// 注册一个 DataSet 是等价的
val tableEnv = TableEnvironment.getTableEnvironment(env)

val stream: DataStream[(Long, String)] = ...

// 注册 DataStream 为表 "myTable" 并有两个字段 "f0", "f1"
tableEnv.registerDataStream("myTable", stream)

// 注册 DataStream 为 "myTable2" 并有两个字段 "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, 'myLong, 'myString)

将Table转换为DataStream或者DataSet

Table可以转换为DataStream或者DataSet,这样的话,自定义的DataStream或者DataSet程序就可以基于Table API或者SQL查询的结果来执行了。

当将一个Table转换为DataStream或者DataSet时,你需要指定生成的DataStream或者DataSet的数据类型,即需要转换表的行的数据类型,通常最方便的转换类型是Row,下面列表概述了不同选项的功能:
  1、Row:字段通过位置映射、可以是任意数量字段,支持空值,非类型安全访问
  2、POJO:字段通过名称(POJO字段作为Table字段时,必须命名)映射,可以是任意数量字段,支持空值,类型安全访问
  3、Case Class:字段通过位置映射,不支持空值,类型安全访问
  4、Tuple:字段通过位置映射,不得多于22(Scala)或者25(Java)个字段,不支持空值,类型安全访问
  5、Atomic Type:Table必须有一个字段,不支持空值,类型安全访问。

将Table转换为DataStream

流式查询的结果Table会被动态地更新,即每个新的记录到达输入流时结果就会发生变化。因此,转换此动态查询的DataStream需要对表的更新进行编码。

有两种模式来将Table转换为DataStream:
  1、Append Mode:这种模式只适用于当动态表仅由INSERT更改修改时,即仅附加,之前发射的结果不会被更新。
  2、Retract Mode:始终都可以使用此模式,它使用一个boolean标识来编码INSERT和DELETE更改。

// 获取一个 StreamTableEnvironment. 
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

//  有两个字段(String name, Integer age)的Table
Table table = ...

// 通过指定类将Table转换为Row的Append DataStream
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);

// 通过一个TypeInformation将Table转换为Tuple2<String, Integer> 类型的Append DataStream  
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
  Types.STRING(),
  Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple = 
  tableEnv.toAppendStream(table, tupleType);

// 将Table转换为Row的react形式的DataStream
//   一个reactDataStream的类型X为 DataStream<Tuple2<Boolean, X>>. 
//   boolean字段指定了更改的类型. 
//   True 是 INSERT, false 是 DELETE.
DataStream<Tuple2<Boolean, Row>> retractStream = 
  tableEnv.toRetractStream(table, Row.class);
// get TableEnvironment. 
// registration of a DataSet is equivalent
val tableEnv = TableEnvironment.getTableEnvironment(env)

// Table with two fields (String name, Integer age)
val table: Table = ...

// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)

// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String, Int)] dsTuple = 
  tableEnv.toAppendStream[(String, Int)](table)

// convert the Table into a retract DataStream of Row.
//   A retract stream of type X is a DataStream[(Boolean, X)]. 
//   The boolean field indicates the type of the change. 
//   True is INSERT, false is DELETE.
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)

注意:有关动态表及其属性的详细讨论在Streaming Queries文档中给出。

将Table转换为DataSet

Table可以按照如下方式转换为DataSet:

// 获取 BatchTableEnvironment
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// 有两个字段(String name, Integer age)的Table
Table table = ...

// 通过指定类将Table转换为Row类型的DataSet
DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);

// 通过TypeInformation 将Table转换为Tuple2<String, Integer>类型的DataSet
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
  Types.STRING(),
  Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple = 
  tableEnv.toAppendStream(table, tupleType);
// 获取 TableEnvironment 
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 有两个字段(String name, Integer age)的Table
val table: Table = ...

// 将Table转换为Row类型的DataSet
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)

// 将Table转换为Tuple2[String, Int]类型的DataSet
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)

数据类型映射到表模式

Flink的DataStream和DataSet API支持多种数据类型,如Tuple,POJO, case class及原始数据类型。接下来我们描述Table API如何将这些类型转换为内部行表示及展示将DataStream转换为Table的例子。

原子类型

Flink将原生类型(如:Integer, Double, String)或者通用类型(不能再被分析或者分解的类型)视为原子类型,一个原子类型的DataStream或者DataSet可以转换为只有一个属性的Table,属性的类型根据原子类型推算,并且必须得指定属性的名称。

// 获取一个 StreamTableEnvironment, 同样原理适用于 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Long> stream = ...
// 将 DataStream转换为具有属性"myLong"的Table
Table table = tableEnv.fromDataStream(stream, "myLong");
// 获取一个 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

val stream: DataStream[Long] = ...
// 将 DataStream 转换为具有属性'myLong的Table
val table: Table = tableEnv.fromDataStream(stream, 'myLong)
Tuple(Java和Scala都支持)和Case Class(仅Scala支持)

Flink支持Scala内置的Tuple和Flink为Java提供的Tuple,DataStream和DataSet类型的Tuple都可以被转换为表。字段可以通过为所有字段(通过位置来映射)提供的名称来重命名,如果没有为字段指定名称的话,就会采用默认的字段名。

// 获取一个 StreamTableEnvironment, 同样适用于BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Tuple2<Long, String>> stream = ...

// 将 DataStream为具有字段名为"myLong", "myString"的Table
Table table1 = tableEnv.fromDataStream(stream, "myLong, myString");

// 将 DataStream 转换为具有默认字段名 "f0", "f1"的 Table
Table table2 = tableEnv.fromDataStream(stream);
//获取一个 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

val stream: DataStream[(Long, String)] = ...

// 将 DataStream 转换为具有字段名 'myLong, 'myString' 的Table
val table1: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)

// 将 DataStream 转换为具有默认字段名 '_1, '_2的Table
val table2: Table = tableEnv.fromDataStream(stream)

// 定义一个 case class
case class Person(name: String, age: Int)
val streamCC: DataStream[Person] = ...

// 将 DataStream 转换为具有默认字段名 'name, 'age'的Table
val tableCC1 = tableEnv.fromDataStream(streamCC)

// 将 DataStream 转换为具有字段名 'myName, 'myAge'的Table
val tableCC1 = tableEnv.fromDataStream(streamCC, 'myName, 'myAge)
POJO(Java 和 Scala)

Flink支持使用POJO作为复合类型,决定POJO规则的文档请参考这里

当将一个POJO类型的DataStream或者DataSet转换为Table而不指定字段名称时,Table的字段名称将采用JOPO原生的字段名称作为字段名称。重命名原始的POJO字段需要关键字AS,因为POJO没有固定的顺序,名称映射需要原始名称并且不能通过位置来完成。

//获取一个 StreamTableEnvironment, 同样原理适用于 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// Person 是一个有两个字段"name" and "age" 的POJO
DataStream<Person> stream = ...

// 将 DataStream 转换为有字段 "name", "age" 的Table
Table table1 = tableEnv.fromDataStream(stream);

// 将 DataStream 转换为有字段 "myName", "myAge" 的Table
Table table2 = tableEnv.fromDataStream(stream, "name as myName, age as myAge");
// 获取一个 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// Person 是一个有字段 "name" and "age" 的POJO
val stream: DataStream[Person] = ...

// 将 DataStream 转换为具有字段 'name, 'age' 的Table
val table1: Table = tableEnv.fromDataStream(stream)

// 将 DataStream 转换为具有字段 'myName, 'myAge' 的Table
val table2: Table = tableEnv.fromDataStream(stream, 'name as 'myName, 'age as 'myAge)
Row

Row数据类型支持任意数量的字段,并且字段可以是null值,字段名称可以通过RowTypeInformation来指定或者将一个Row DataStream或者DataSet转换为Table时(根据位置)指定。

// 获取一个 StreamTableEnvironment, 同样原理适用于 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// 在`RowTypeInfo`中指定字段"name" and "age"的Row类型DataStream
DataStream<Row> stream = ...

// 将 DataStream 转换为具有字段 "name", "age" 的Table
Table table1 = tableEnv.fromDataStream(stream);

// 将 DataStream 转换为具有字段 "myName", "myAge" 的Table
Table table2 = tableEnv.fromDataStream(stream, "myName, myAge");
// 获取一个 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 在`RowTypeInfo`中指定字段"name" and "age"的Row类型DataStream
val stream: DataStream[Row] = ...

// 将 DataStream 转换为具有字段 'name, 'age' 的Table
val table1: Table = tableEnv.fromDataStream(stream)

// 将 DataStream 转换为具有字段 'myName, 'myAge' 的Table
val table2: Table = tableEnv.fromDataStream(stream, 'myName, 'myAge)

查询优化

Apache Flink使用Apache Calcite来优化和翻译查询,当前的查询优化包括投影、过滤下推、子查询去相关及各种形式的查询重写。Flink不去优化join的顺序,但是会根据它们的顺序去执行(FROM子句中表的顺序或者WHERE子句中连接谓词的顺序)。

可以通过提供一个CalciteConfig对象来调整在不同阶段应用的优化规则集,这个可以通过调用CalciteConfig.createBuilder())获得的builder来创建,并且可以通过调用tableEnv.getConfig.setCalciteConfig(calciteConfig)来提供给TableEnvironment。

解析一个Table

Table API为计算一个Table提供了一个机制来解析逻辑和优化查询计划,这个可以通过调用TableEnvironment.explain(table)方法来完成,它会返回描述三个计划的字符串:
  1、关系查询语法树,即未优化的查询计划
  2、优化后的逻辑查询计划
  3、物理执行计划

以下代码显示了一个示例和相应的输出:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));

Table table1 = tEnv.fromDataStream(stream1, "count, word");
Table table2 = tEnv.fromDataStream(stream2, "count, word");
Table table = table1
  .where("LIKE(word, 'F%')")
  .unionAll(table2);

String explanation = tEnv.explain(table);
System.out.println(explanation);
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table = table1
  .where('word.like("F%"))
  .unionAll(table2)

val explanation: String = tEnv.explain(table)
println(explanation)

输出如下:

== Abstract Syntax Tree ==
LogicalUnion(all=[true])
  LogicalFilter(condition=[LIKE($1, 'F%')])
    LogicalTableScan(table=[[_DataStreamTable_0]])
  LogicalTableScan(table=[[_DataStreamTable_1]])

== Optimized Logical Plan ==
DataStreamUnion(union=[count, word])
  DataStreamCalc(select=[count, word], where=[LIKE(word, 'F%')])
    DataStreamScan(table=[[_DataStreamTable_0]])
  DataStreamScan(table=[[_DataStreamTable_1]])

== Physical Execution Plan ==
Stage 1 : Data Source
  content : collect elements with CollectionInputFormat

Stage 2 : Data Source
  content : collect elements with CollectionInputFormat

  Stage 3 : Operator
    content : from: (count, word)
    ship_strategy : REBALANCE

    Stage 4 : Operator
      content : where: (LIKE(word, 'F%')), select: (count, word)
      ship_strategy : FORWARD

      Stage 5 : Operator
        content : from: (count, word)
        ship_strategy : REBALANCE
    原文作者:写Bug的张小天
    原文地址: https://www.jianshu.com/p/6bc5b4e6f163
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞