Table API和SQL通过join API集成在一起,这个join API的核心概念是Table,Table可以作为查询的输入和输出。这篇文档展示了使用Table API和SQL查询的程序的通用结构,如何注册一个Table,如何查询一个Table以及如何将数据发给Table。
Table API和SQL查询程序的结构
所有批处理和流处理的Table API、SQL程序都有如下相同的模式,下面例子的代码展示了Table API和SQL程序的通用结构:
// 对于批处理程序来说使用 ExecutionEnvironment 来替换 StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建一个TableEnvironment
// 对于批处理程序来说使用 BatchTableEnvironment 替换 StreamTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// 注册一个 Table
tableEnv.registerTable("table1", ...) // 或者
tableEnv.registerTableSource("table2", ...); // 或者
tableEnv.registerExternalCatalog("extCat", ...);
// 从Table API的查询中创建一个Table
Table tapiResult = tableEnv.scan("table1").select(...);
// 从SQL查询中创建一个Table
Table sqlResult = tableEnv.sql("SELECT ... FROM table2 ... ");
// 将Table API 种的结果 Table 发射到TableSink中 , SQL查询也是一样的
tapiResult.writeToSink(...);
// 执行
env.execute();
// 对于批处理程序来说使用 ExecutionEnvironment 来替换 StreamExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 创建一个TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 注册一个 Table
tableEnv.registerTable("table1", ...) // 或者
tableEnv.registerTableSource("table2", ...) // 或者
tableEnv.registerExternalCatalog("extCat", ...)
// 从Table API的查询中创建一个Table
val tapiResult = tableEnv.scan("table1").select(...)
// 从SQL查询中创建一个Table
val sqlResult = tableEnv.sql("SELECT ... FROM table2 ...")
// 将Table API 种的结果 Table 发射到TableSink中 , SQL查询也是一样的
tapiResult.writeToSink(...)
// 执行
env.execute()
注意:Table API 和 SQL查询可以轻易地进行集成并嵌入到DataStream或者DataSet程序中,请参考Integration With DataStream and DataSet API部分来了解DataStream和DataSet如何转换成Table及Table如何转换成DataStream和DataSet。
创建一个TableEnvironment
TableEnvironment是Table API和SQL集成的核心概念,它主要负责:
1、在内部目录中注册一个Table
2、注册一个外部目录
3、执行SQL查询
4、注册一个用户自定义函数(标量、表及聚合)
5、将DataStream或者DataSet转换成Table
6、持有ExecutionEnvironment或者StreamExecutionEnvironment的引用
一个Table总是会绑定到一个指定的TableEnvironment中,相同的查询不同的TableEnvironment是无法通过join、union合并在一起。
TableEnvironment可以通过调用带有参数StreamExecutionEnvironment或者ExecutionEnvironment和一个可选参数TableConfig的静态方法TableEnvironment.getTableEnvironment()
来创建。TableConf可以用来配置TableEnvironment或者自定义查询优化器和翻译过程(参考查询优化器)
// ***************
// STREAMING QUERY
// ***************
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
// 为streaming查询创建一个 TableEnvironment
StreamTableEnvironment sTableEnv = TableEnvironment.getTableEnvironment(sEnv);
// ***********
// BATCH QUERY
// ***********
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
// 为批查询创建一个 TableEnvironment
BatchTableEnvironment bTableEnv = TableEnvironment.getTableEnvironment(bEnv);
// ***************
// STREAMING QUERY
// ***************
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
// 为流查询创建一个 TableEnvironment
val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)
// ***********
// BATCH QUERY
// ***********
val bEnv = ExecutionEnvironment.getExecutionEnvironment
// 为批查询创建一个 TableEnvironment
val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)
在Catalog(目录)中注册一个Table
TableEnvironment有一个在内部通过表名组织起来的表目录,Table API或者SQL查询可以访问注册在目录中的表,并通过名称来引用它们。
TableEnvironment允许通过各种源来注册一个表:
1、一个已存在的Table对象,通常是Table API或者SQL查询的结果
2、TableSource,可以访问外部数据如文件、数据库或者消息系统
3、DataStream或者DataSet程序中的DataStream或者DataSet
将DataStream或者DataSet注册为一个表将在Integration With DataStream and DataSet API中讨论。
注册一个Table
一个Table可以在TableEnvironment中按照下面程序注册:
// 获取一个 StreamTableEnvironment, BatchTableEnvironment也是同样的方法
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// Table 是简单的投影查询的结果
Table projTable = tableEnv.scan("X").project(...);
// 将 Table projTable 注册为表 "projectedX"
tableEnv.registerTable("projectedTable", projTable);
// 获取一个TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// Table 是简单的投影查询的结果
val projTable: Table = tableEnv.scan("X").project(...)
// 将 Table projTable 注册为表 "projectedX"
tableEnv.registerTable("projectedTable", projTable)
注意:一个注册的Table被当做是与关系型数据库中的视图类似,即定义Table的查询不会被优化,但是当其他查询引用到已注册的Table时会被内联。如果多个查询引用同一个已注册的Table,这个Table会跟每个查询内联并进行多次执行,即:已注册的Table的结果不会共享。
注册一个TableSource
TableSource可以访问保存在外部存储系统如数据库系统(MySQL、HBase…),指定编码格式的文件(CSV, Apache [Parquet, Avro, ORC],…)或者消息系统(Apache Kafka,RabbitMQ,…)中的数据。
Flink的目标是为通用的数据格式和存储系统提供TableSource,请参考Table Sources和Sinks页来了解Flink所支持的TableSource列表及如何自定义一个TableSource。
一个TableSource可以在TableEnvironment中按如下方式来定义:
// 获取一个StreamTableEnvironment, 同样适用于BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// 创建一个 TableSource
TableSource csvSource = new CsvTableSource("/path/to/file", ...);
// 将TableSource注册为表 "CsvTable"
tableEnv.registerTableSource("CsvTable", csvSource);
// 获取一个 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 创建一个TableSource
val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)
// 将 TableSource 注册为表 "CsvTable"
tableEnv.registerTableSource("CsvTable", csvSource)
注册一个外部Catalog(目录)
一个外部目录提供了关于外部数据库和表的信息如:它们的名称、模式、统计及如何访问保存在外部数据库、表和文件中的数据。
一个外部目录可以通过实现ExternalCatalog接口来创建并在TableEnvironment中注册,如下:
// 获取一个 StreamTableEnvironment, 同样适用于 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// 创建一个外部catalog
ExternalCatalog catalog = new InMemoryExternalCatalog();
// 注册 ExternalCatalog
tableEnv.registerExternalCatalog("InMemCatalog", catalog);
// 获取一个 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 创建一个 catalog
val catalog: ExternalCatalog = new InMemoryExternalCatalog
// 注册 ExternalCatalog
tableEnv.registerExternalCatalog("InMemCatalog", catalog)
一旦在TableEnvironment中注册之后,所有定义在ExternalCatalog中的表都可以通过指定全路径如:catalog.database.table
在Table API或者SQL查询来访问。
目前,Flink提供InMemoryExternalCatalog来做demo或者测试。然而,ExternalCatalog接口还可以被用来连接HCatalog或者Metastore到Table API。
查询一个Table
Table API
Table API是一个Scala和Java的语言集成查询API,与SQL相反,查询并不指定为字符串而是根据主机语言一步一步的构建。
Table API是基于Table类来的,Table类代表了一个流或者批表,并且提供方法来使用关系型操作。这些方法返回一个新的Table对象,这个Table对象代表着输入的Table应用关系型操作后的结果。一些关系型操作是由多个方法调用组成的如:table.groupBy(...).select()
, 其中groupBy(...)
指定了table的分组,而select(...)
则是table分组的映射。
Table API文档描述了streaming和batch表所支持的所有Table API操作。
下面的例子展示了一个简单的Table API聚合查询:
// 获取一个 StreamTableEnvironment, 同样适用于 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// 注册一个名叫 Orders 的表
// 扫描注册的 Orders 表
Table orders = tableEnv.scan("Orders");
// 计算所有来自法国的客户的收入
Table revenue = orders
.filter("cCountry === 'FRANCE'")
.groupBy("cID, cName")
.select("cID, cName, revenue.sum AS revSum");
// 发射或者转换一个 Table
// 执行查询
// 获取一个 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 注册一个名叫 Orders 的表
// 扫描已注册的 Orders 表
Table orders = tableEnv.scan("Orders")
// 计算所有来自法国偶的客户的收入
Table revenue = orders
.filter('cCountry === "FRANCE")
.groupBy('cID, 'cName)
.select('cID, 'cName, 'revenue.sum AS 'revSum)
// 发射或者转换一个Table
// 执行查询
注意:Scala Table API使用Scala的符号在引用表属性时,以’`’开始,Table API使用Scala的隐式转换,为了使用Scala的隐式转换,请确保导入org.apache.flink.api.scala._
和org.apache.flink.table.api.scala._
。
SQL
Flink的SQL集成是基于Apache Calcite的,Apache Calcite实现了标准的SQL,SQL查询被指定为常规字符串。
SQL文档描述了Flink对流和批表的SQL支持。
下面的例子展示了如何指定一个查询并返回一个Table结果;
// 获取一个 StreamTableEnvironment, 同样适用于BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// 注册一个名叫Orders 的表
// 计算所有来自法国的客户的收入
Table revenue = tableEnv.sql(
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
// 发射或者转换一个Table
// 执行查询
// 获取一个 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
//注册一个名叫 Orders的表
// 计算所有来自法国的客户的收入
Table revenue = tableEnv.sql("""
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)
// 发射或者转换 Table
// 执行查询
混合使用Table API和SQL
Table API和SQL查询可以很容易地合并因为它们都返回Table对象:
1、Table API查询可以基于SQL查询结果的Table来进行
2、SQL查询可以基于Table API查询的结果来定义
发射一个Table
为了发射一个Table,可以将其写入一个TableSink中,TableSink 是支持各种文件格式(如:CSV, Apache Parquet, Apache Avro)、存储系统(如:JDBC, Apache HBase, Apache Cassandra, Elasticsearch)或者消息系统(如:Apache Kafka,RabbitMQ)的通用接口。
一个批Table只能写入BatchTableSink中,而流Table需要一个AppendStreamTableSink
、RetractStreamTableSink
或者UpsertStreamTableSink
请参考Table Sources & Sinks文档来了解更多可用sink的信息和如何实现一个自定义的TableSink。
// 获取一个StreamTableEnvironment, 同样适用于 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// 使用Table API和/或SQL查询获取一个 Table
Table result = ...
// 创建一个TableSink
TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|");
// 将结果Table写入TableSink中
result.writeToSink(sink);
// 执行程序
// 获取一个TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 使用Table API和/或SQL查询获取一个 Table
val result: Table = ...
//创建一个 TableSink
val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")
// 将结果 Table写入TableSink中
result.writeToSink(sink)
// 执行程序
翻译和执行一个查询
Table API和SQL查询根据输入是流还是批翻译成DataStream或者DataSet,查询内部表示为一个逻辑查询计划,并分两个阶段进行翻译:
1、优化逻辑计划
2、翻译成一个DataStream或者DataSet程序
Table API或者SQL查询会在下面情况下触发:
当调用Table.writeToSink()
时,Table会发射到TableSink中
Table转换DataStream或者DataSet时(参考与DataStream和DataSet API集成)
一旦翻译,Table API或者SQL查询就会像常规DataStream或DataSet处理一样,并且当StreamExecutionEnvironment.execute()
或者ExecutionEnvironment.execute()
调用时执行。
与DataStream和DataSet API集成
Table API和SQL查询可以很容易地进行集成并嵌入到DataStream和DataSet程序中。例如:我们可以查询一个外部表(如:来自关系型数据库的表)、做一些预处理,如过滤、映射、聚合或者与元数据关联,然后使用DataStream或者DataSet API(及其他基于这些API的库,如CEP或Gelly)进行进一步处理。同样,Table API或者SQL查询也可以应用于DataStream或者DataSet程序的结果中。
这种交互可以通过将DataStream或者DataSet转换成一个Table及将Table转换成DataStream或者DataSet来实现。在本节,我们将描述这些转换是如何完成的。
Scala 隐式转换
Scala Table API为DataSet、DataStream和Table类提供了隐式转换功能。这些转换可以通过导入Scala DataStream API中的org.apache.flink.table.api.scala._
和org.apache.flink.api.scala._
包来启用。
注册一个DataStream或者DataSet为Table
一个DataStream或者DataSet可以在TableEnvironment中注册为Table,表的结果模式根据注册的DataStream或者DataSet的数据类型来定。请参考数据类型映射到表模式来了解更详细的信息。
// 获取 StreamTableEnvironment
// 注册一个DataSet 到BatchTableEnvironment也是等效的
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Tuple2<Long, String>> stream = ...
// 注册DataStream 为表 "myTable" ,并有两个字段 "f0", "f1"
tableEnv.registerDataStream("myTable", stream);
// 注册 DataStream 为表 "myTable2" 并有两个字段 "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, "myLong, myString");
// 获取 TableEnvironment
// 注册一个 DataSet 是等价的
val tableEnv = TableEnvironment.getTableEnvironment(env)
val stream: DataStream[(Long, String)] = ...
// 注册 DataStream 为表 "myTable" 并有两个字段 "f0", "f1"
tableEnv.registerDataStream("myTable", stream)
// 注册 DataStream 为 "myTable2" 并有两个字段 "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, 'myLong, 'myString)
将Table转换为DataStream或者DataSet
Table可以转换为DataStream或者DataSet,这样的话,自定义的DataStream或者DataSet程序就可以基于Table API或者SQL查询的结果来执行了。
当将一个Table转换为DataStream或者DataSet时,你需要指定生成的DataStream或者DataSet的数据类型,即需要转换表的行的数据类型,通常最方便的转换类型是Row,下面列表概述了不同选项的功能:
1、Row:字段通过位置映射、可以是任意数量字段,支持空值,非类型安全访问
2、POJO:字段通过名称(POJO字段作为Table字段时,必须命名)映射,可以是任意数量字段,支持空值,类型安全访问
3、Case Class:字段通过位置映射,不支持空值,类型安全访问
4、Tuple:字段通过位置映射,不得多于22(Scala)或者25(Java)个字段,不支持空值,类型安全访问
5、Atomic Type:Table必须有一个字段,不支持空值,类型安全访问。
将Table转换为DataStream
流式查询的结果Table会被动态地更新,即每个新的记录到达输入流时结果就会发生变化。因此,转换此动态查询的DataStream需要对表的更新进行编码。
有两种模式来将Table转换为DataStream:
1、Append Mode:这种模式只适用于当动态表仅由INSERT更改修改时,即仅附加,之前发射的结果不会被更新。
2、Retract Mode:始终都可以使用此模式,它使用一个boolean标识来编码INSERT和DELETE更改。
// 获取一个 StreamTableEnvironment.
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// 有两个字段(String name, Integer age)的Table
Table table = ...
// 通过指定类将Table转换为Row的Append DataStream
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
// 通过一个TypeInformation将Table转换为Tuple2<String, Integer> 类型的Append DataStream
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
Types.STRING(),
Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple =
tableEnv.toAppendStream(table, tupleType);
// 将Table转换为Row的react形式的DataStream
// 一个reactDataStream的类型X为 DataStream<Tuple2<Boolean, X>>.
// boolean字段指定了更改的类型.
// True 是 INSERT, false 是 DELETE.
DataStream<Tuple2<Boolean, Row>> retractStream =
tableEnv.toRetractStream(table, Row.class);
// get TableEnvironment.
// registration of a DataSet is equivalent
val tableEnv = TableEnvironment.getTableEnvironment(env)
// Table with two fields (String name, Integer age)
val table: Table = ...
// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)
// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String, Int)] dsTuple =
tableEnv.toAppendStream[(String, Int)](table)
// convert the Table into a retract DataStream of Row.
// A retract stream of type X is a DataStream[(Boolean, X)].
// The boolean field indicates the type of the change.
// True is INSERT, false is DELETE.
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)
注意:有关动态表及其属性的详细讨论在Streaming Queries文档中给出。
将Table转换为DataSet
Table可以按照如下方式转换为DataSet:
// 获取 BatchTableEnvironment
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// 有两个字段(String name, Integer age)的Table
Table table = ...
// 通过指定类将Table转换为Row类型的DataSet
DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);
// 通过TypeInformation 将Table转换为Tuple2<String, Integer>类型的DataSet
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
Types.STRING(),
Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple =
tableEnv.toAppendStream(table, tupleType);
// 获取 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 有两个字段(String name, Integer age)的Table
val table: Table = ...
// 将Table转换为Row类型的DataSet
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)
// 将Table转换为Tuple2[String, Int]类型的DataSet
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)
数据类型映射到表模式
Flink的DataStream和DataSet API支持多种数据类型,如Tuple,POJO, case class及原始数据类型。接下来我们描述Table API如何将这些类型转换为内部行表示及展示将DataStream转换为Table的例子。
原子类型
Flink将原生类型(如:Integer, Double, String)或者通用类型(不能再被分析或者分解的类型)视为原子类型,一个原子类型的DataStream或者DataSet可以转换为只有一个属性的Table,属性的类型根据原子类型推算,并且必须得指定属性的名称。
// 获取一个 StreamTableEnvironment, 同样原理适用于 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Long> stream = ...
// 将 DataStream转换为具有属性"myLong"的Table
Table table = tableEnv.fromDataStream(stream, "myLong");
// 获取一个 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val stream: DataStream[Long] = ...
// 将 DataStream 转换为具有属性'myLong的Table
val table: Table = tableEnv.fromDataStream(stream, 'myLong)
Tuple(Java和Scala都支持)和Case Class(仅Scala支持)
Flink支持Scala内置的Tuple和Flink为Java提供的Tuple,DataStream和DataSet类型的Tuple都可以被转换为表。字段可以通过为所有字段(通过位置来映射)提供的名称来重命名,如果没有为字段指定名称的话,就会采用默认的字段名。
// 获取一个 StreamTableEnvironment, 同样适用于BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Tuple2<Long, String>> stream = ...
// 将 DataStream为具有字段名为"myLong", "myString"的Table
Table table1 = tableEnv.fromDataStream(stream, "myLong, myString");
// 将 DataStream 转换为具有默认字段名 "f0", "f1"的 Table
Table table2 = tableEnv.fromDataStream(stream);
//获取一个 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val stream: DataStream[(Long, String)] = ...
// 将 DataStream 转换为具有字段名 'myLong, 'myString' 的Table
val table1: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)
// 将 DataStream 转换为具有默认字段名 '_1, '_2的Table
val table2: Table = tableEnv.fromDataStream(stream)
// 定义一个 case class
case class Person(name: String, age: Int)
val streamCC: DataStream[Person] = ...
// 将 DataStream 转换为具有默认字段名 'name, 'age'的Table
val tableCC1 = tableEnv.fromDataStream(streamCC)
// 将 DataStream 转换为具有字段名 'myName, 'myAge'的Table
val tableCC1 = tableEnv.fromDataStream(streamCC, 'myName, 'myAge)
POJO(Java 和 Scala)
Flink支持使用POJO作为复合类型,决定POJO规则的文档请参考这里。
当将一个POJO类型的DataStream或者DataSet转换为Table而不指定字段名称时,Table的字段名称将采用JOPO原生的字段名称作为字段名称。重命名原始的POJO字段需要关键字AS,因为POJO没有固定的顺序,名称映射需要原始名称并且不能通过位置来完成。
//获取一个 StreamTableEnvironment, 同样原理适用于 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// Person 是一个有两个字段"name" and "age" 的POJO
DataStream<Person> stream = ...
// 将 DataStream 转换为有字段 "name", "age" 的Table
Table table1 = tableEnv.fromDataStream(stream);
// 将 DataStream 转换为有字段 "myName", "myAge" 的Table
Table table2 = tableEnv.fromDataStream(stream, "name as myName, age as myAge");
// 获取一个 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// Person 是一个有字段 "name" and "age" 的POJO
val stream: DataStream[Person] = ...
// 将 DataStream 转换为具有字段 'name, 'age' 的Table
val table1: Table = tableEnv.fromDataStream(stream)
// 将 DataStream 转换为具有字段 'myName, 'myAge' 的Table
val table2: Table = tableEnv.fromDataStream(stream, 'name as 'myName, 'age as 'myAge)
Row
Row数据类型支持任意数量的字段,并且字段可以是null
值,字段名称可以通过RowTypeInformation来指定或者将一个Row DataStream或者DataSet转换为Table时(根据位置)指定。
// 获取一个 StreamTableEnvironment, 同样原理适用于 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// 在`RowTypeInfo`中指定字段"name" and "age"的Row类型DataStream
DataStream<Row> stream = ...
// 将 DataStream 转换为具有字段 "name", "age" 的Table
Table table1 = tableEnv.fromDataStream(stream);
// 将 DataStream 转换为具有字段 "myName", "myAge" 的Table
Table table2 = tableEnv.fromDataStream(stream, "myName, myAge");
// 获取一个 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 在`RowTypeInfo`中指定字段"name" and "age"的Row类型DataStream
val stream: DataStream[Row] = ...
// 将 DataStream 转换为具有字段 'name, 'age' 的Table
val table1: Table = tableEnv.fromDataStream(stream)
// 将 DataStream 转换为具有字段 'myName, 'myAge' 的Table
val table2: Table = tableEnv.fromDataStream(stream, 'myName, 'myAge)
查询优化
Apache Flink使用Apache Calcite来优化和翻译查询,当前的查询优化包括投影、过滤下推、子查询去相关及各种形式的查询重写。Flink不去优化join的顺序,但是会根据它们的顺序去执行(FROM子句中表的顺序或者WHERE子句中连接谓词的顺序)。
可以通过提供一个CalciteConfig对象来调整在不同阶段应用的优化规则集,这个可以通过调用CalciteConfig.createBuilder())
获得的builder来创建,并且可以通过调用tableEnv.getConfig.setCalciteConfig(calciteConfig)
来提供给TableEnvironment。
解析一个Table
Table API为计算一个Table提供了一个机制来解析逻辑和优化查询计划,这个可以通过调用TableEnvironment.explain(table)
方法来完成,它会返回描述三个计划的字符串:
1、关系查询语法树,即未优化的查询计划
2、优化后的逻辑查询计划
3、物理执行计划
以下代码显示了一个示例和相应的输出:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));
Table table1 = tEnv.fromDataStream(stream1, "count, word");
Table table2 = tEnv.fromDataStream(stream2, "count, word");
Table table = table1
.where("LIKE(word, 'F%')")
.unionAll(table2);
String explanation = tEnv.explain(table);
System.out.println(explanation);
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table = table1
.where('word.like("F%"))
.unionAll(table2)
val explanation: String = tEnv.explain(table)
println(explanation)
输出如下:
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
LogicalFilter(condition=[LIKE($1, 'F%')])
LogicalTableScan(table=[[_DataStreamTable_0]])
LogicalTableScan(table=[[_DataStreamTable_1]])
== Optimized Logical Plan ==
DataStreamUnion(union=[count, word])
DataStreamCalc(select=[count, word], where=[LIKE(word, 'F%')])
DataStreamScan(table=[[_DataStreamTable_0]])
DataStreamScan(table=[[_DataStreamTable_1]])
== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Stage 2 : Data Source
content : collect elements with CollectionInputFormat
Stage 3 : Operator
content : from: (count, word)
ship_strategy : REBALANCE
Stage 4 : Operator
content : where: (LIKE(word, 'F%')), select: (count, word)
ship_strategy : FORWARD
Stage 5 : Operator
content : from: (count, word)
ship_strategy : REBALANCE