Flink程序分为三大部分:
Source:读取数据源
Transformation:处理数据,对数据做转换
Sink:将处理结果输出到一个目的地
flink提供了sum(),map(),flatMap(),keyBy(),timeWindow()等函数接口,我们通过调用并实现这些函数即可对数据加工,通常一个或多个函数组成一个算子。下面对部分函数做简要介绍,后面学习过程中详细了解:
flatMap:对输入进行处理,生成0到多个输出,输出数据类型可与输如数据类型不一致。可类比Java Stream中的flatMap。本例中输入为一个字符串,输出为一个二元组,二元组记录单词的一次出现。
keyBy:根据某个key对数据重新分组。keyBy(0)表示按照二元组中的第一个元素分组,本例中按照单词分组。
timeWindoew:指定窗口。
sum:求和函数。分组后,同一个单词的数据在一组,sum(1)表示对二元组中的第二个元素求和,sum后就可以得到每个单词出现的次数。
环境
IDEA+JAVA8+maven
依赖
在pom.xml中添加以下依赖:
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.1</version>
<!--<scope>provided</scope>-->
</dependency>
流处理代码
流处理代码如下:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class StreamTest {
public static void main(String[] args) {
try {
//1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.获取数据流源,方式很多,以后慢慢了解,DataStream API用于流处理
DataStream<String> stringDataStream = env.fromElements("flink", "flink", "world,world,world", "count");
//3.指定、实现操作数据算子,处理加工数据
SingleOutputStreamOperator<Tuple2<String, Integer>> streamOperator =
stringDataStream.flatMap(new MyFlatMapper())
.keyBy(0)//按二元组中的第一个元素分组
.sum(1);//分组后的数据流,对二元组中的第二个元素求和
//4.输出,这里使用标准IO打印,输出方式很多,以后慢慢了解
streamOperator.print();
//5.出发执行程序
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
private static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] strings = value.split(",");
for (String s : strings) {
collector.collect(new Tuple2<String, Integer>(s, 1));
}
}
}
}
批处理代码如下:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class BatchTest {
public static void main(String[] args) {
try {
//1.获取执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//2.获取数据流源,方式很多,以后慢慢了解,DataSet API用于批处理
DataSource<String> dataSource = env.fromElements("flink", "flink", "world,world,world", "count");
//3.指定、实现操作数据算子,处理加工数据
DataSet<Tuple2<String, Integer>> result = dataSource.flatMap(new MyFlatMapper())
.groupBy(0)
.sum(1);
//4.输出,这里使用标准IO打印,输出方式很多,以后慢慢了解
result.print();
} catch (Exception e) {
e.printStackTrace();
}
}
private static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] strings = value.split(",");
for (String s : strings) {
collector.collect(new Tuple2<String, Integer>(s, 1));
}
}
}
}