Flink-Table-SQL source

source作为Table&SQL API的数据源,同时也是程序的入口。当前Flink的Table&SQL API整体而言支持三种source:Table source、DataSet以及DataStream,它们都通过特定的API注册到Table环境对象。

我们先来看Table source,它直接以表对象作为source。这里的表对象可细分为:

Flink以Table类定义的关系表对象,通过TableEnvironment的registerTable方法注册;
外部source经过桥接而成的表对象,基础抽象为TableSource,通过具体环境对象的registerTableSource;
下图展示了,Table source被注册时,对应的内部转化图(虚线表示对应关系):

《Flink-Table-SQL source》

由上图可见,不管是直接注册Table对象还是注册外部source,在内部都直接对应了特定的XXXTable对象。

TableSource trait针对Streaming和Batch分部扩展有两个trait,它们是StreamTableSource和BatchTableSource,它们各自都提供了从数据源转换为核心对象(DataStream跟DataSource)的方法。

除了这三个基本的trait之外,还有一些特定对source的需求以独立的trait提供以方便实现者自行组合,比如ProjectableTableSource这一trait,它支持将Projection下推(push-down)到TableSource。Flink内置实现的CsvTableSource就继承了这一trait。

当前Flink所支持的TableSource大致上分为两类:

  • CsvTableSouce:同时可用于Batch跟Streaming模式;
  • kafka系列TableSource:包含Kafka的各个版本(0.8,0.9,0.10)以及各种不同的格式(Json、Avro),基本上它们只支持Streaming模式,它们都依赖于各种kafka的connector;
    使用方式如下:
// specify JSON field names and types
val typeInfo = Types.ROW(
  Array("id", "name", "score"),
  Array(Types.INT, Types.STRING, Types.DOUBLE)
)

val kafkaTableSource = new Kafka08JsonTableSource(
    kafkaTopic,
    kafkaProperties,
    typeInfo)
tableEnvironment.registerTableSource("kafka-source", kafkaTableSource);

CsvTableSource的构建方式如下:

val csvTableSource = CsvTableSource
    .builder
    .path("/path/to/your/file.csv")
    .field("name", Types.STRING)
    .field("id", Types.INT)
    .field("score", Types.DOUBLE)
    .field("comments", Types.STRING)
    .fieldDelimiter("#")
    .lineDelimiter("$")
    .ignoreFirstLine
    .ignoreParseErrors
    .commentPrefix("%")

除了以TableSource作为Table&SQL的source,还支持通过特定的环境对象直接注册DataStream、DataSet。注册DataStream的示例如下:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

val cust = env.fromElements(...)
val ord = env.fromElements(...)

// register the DataStream cust as table "Customers" with fields derived from the datastream
tableEnv.registerDataStream("Customers", cust)

// register the DataStream ord as table "Orders" with fields user, product, and amount
tableEnv.registerDataStream("Orders", ord, 'user, 'product, 'amount)

注册DataSet的示例如下:

val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

val cust = env.fromElements(...)
val ord = env.fromElements(...)

// register the DataSet cust as table "Customers" with fields derived from the dataset
tableEnv.registerDataSet("Customers", cust)

// register the DataSet ord as table "Orders" with fields user, product, and amount
tableEnv.registerDataSet("Orders", ord, 'user, 'product, 'amount)

以上,通过调用环境对象的registerXXX方法是一种显式注册的方式,除此之外,还有隐式注册方式。隐式注册方式,通过对DataStream跟DataSet对象增加的toTable方法来实现,使用方式示例如下:

val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// read a DataSet from an external source
val ds: DataSet[(Long, String, Integer)] = env.readCsvFile(...)

val table = ds.toTable(tableEnv, 'user, 'product, 'amount)
val result = tableEnv.sql(
  s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'")

我们知道DataStream跟DataSet原先是没有toTable API的,如何为它们增加该API的呢?答案是利用了Scala的包对象(package object),该特性主要用于兼容旧版本的库或对某些类型的API进行增强。具体而言,toTable API其实是实现在DataSetConversions和DataStreamConversions两个类中,然后在包对象中对他们进行实例化。而定位到toTable的实现时,会看到它们其实是间接调用了特定环境对象的fromDataStream/fromDataSet方法并将当前的DataStream跟DataSet传递给这两个方法并通过方法返回得到Table对象。fromDataStream/fromDataSet方法对在实现时会调用跟registerDataStream/registerDataSet方法对相同的内部注册方法。
TableEnvironment内部会以一个SchemaPlus类型的数据结构,它是Calcite中的数据结构,用来存储被注册的表、函数等在内的一系列对象(这些对象统称为Calcite中的Schema)。由此可见它无法直接接受Flink自定义的类似于TableSouce这样的对象,那么这里存在一个问题就是两个框架的衔接问题。这就是Flink定义那么多内部XXXTable类型的原因之一,我们来梳理一下它们之间的关系如下:

《Flink-Table-SQL source》

上图中的XXXTable对象同时以括号标明了在注册时它是由什么对象转化而来。

原文:
https://blog.csdn.net/yanghua_kobe/article/details/73143367

    原文作者:丹之
    原文地址: https://www.jianshu.com/p/6ed368272916
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞