开发环境:
- spark 2.3
- kafka 1.1.1
黑名单数据是从mysql中获取的。源数据是从kafka中获取的,数据格式就是简单的姓名,为了与黑名单数据做join,源数据和黑名单数据都需要转换成键值对的形式。
Java代码:
package cn.spark.streaming;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import kafka.serializer.StringDecoder;
import scala.Tuple2;
/**
* use transform filter balcklist
* base on kafka message queue
*
*/
public class BlackListFilter {
public static void main(String[] args) throws Exception{
SparkConf conf = new SparkConf().setAppName("BlackListFilter");
// create context
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
// open checkpoint mechanism
jssc.checkpoint(args[0]);
// properties map
Map<String, String> KafkaParams = new HashMap<String, String>();
KafkaParams.put("bootstrap.servers", "hserver-1:9092,hserver-2:9092,hserver-3:9092");
KafkaParams.put("gruop.id", "BlackListFilter");
KafkaParams.put("auto.offest.reset", "smallest");
// topic set
Set<String> topics = new HashSet<String>();
topics.add(args[1]);
// create DStream
JavaPairInputDStream<String, String> InputPairDstream =
KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
KafkaParams,
topics
);
// get blocklist from mysql
SparkSession spark = SparkSession
.builder()
.enableHiveSupport()
.getOrCreate();
// read data --> return Dataset
Dataset<Row> BlackList = spark
.read()
.format("jdbc")
.option("url", "jdbc:mysql://hserver-1:3306/retail_db")
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "blacklist")
.option("username", "root")
.option("password", "root")
.load();
// transform Dataset into JavaRDD
JavaRDD<Row> BlackListRDD = BlackList.toJavaRDD();
// transform JavaRDD into JavaPairRDD --> the second element type is Boolean
final JavaPairRDD<String, Boolean> BlackListPairRDD =
BlackListRDD.mapToPair(
new PairFunction<Row, String, Boolean>() {
private static final long serialVersionUID = -6634120981007776151L;
@Override
public Tuple2<String, Boolean> call(Row name) throws Exception {
return new Tuple2<String, Boolean>(name.getString(0), true);
}
});
// transform kafka data flow
JavaDStream<String> VaildListDStream = InputPairDstream.transform(
new Function<JavaPairRDD<String,String>, JavaRDD<String>>() {
private static final long serialVersionUID = -7488950207291980402L;
@Override
public JavaRDD<String> call(JavaPairRDD<String, String> KafkaDataRDD) throws Exception {
// create source RDD --> UserRDD: access log
JavaPairRDD<String, String> UserRDD =
KafkaDataRDD.mapToPair(
new PairFunction<Tuple2<String,String>, String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(Tuple2<String, String> tuple) throws Exception {
return new Tuple2<String, String>(tuple._2, "........");
}
});
// leftOutJoin
JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> JoinRDD =
UserRDD.leftOuterJoin(BlackListPairRDD);
// do blacklist filtering
JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> FilterRDD =
JoinRDD.filter(
new Function<Tuple2<String,Tuple2<String,Optional<Boolean>>>, Boolean>() {
private static final long serialVersionUID = 791090533213057710L;
@Override
public Boolean call(Tuple2<String, Tuple2<String, Optional<Boolean>>> tuple) throws Exception {
if(tuple._2._2.isPresent() && tuple._2._2.get()){
return false;
} else {
return true;
}
}
});
// mapToPair
JavaRDD<String> resultRDD =
FilterRDD.map(
new Function<
Tuple2<String,Tuple2<String,Optional<Boolean>>>, String>() {
private static final long serialVersionUID = -54290472445703194L;
@Override
public String call(Tuple2<String, Tuple2<String, Optional<Boolean>>> tuple)
throws Exception {
return tuple._1 + "--->" + tuple._2._1;
}
});
return resultRDD;
}
});
// print result
VaildListDStream.print();
jssc.start();
jssc.awaitTermination();
jssc.close();
spark.close();
}
}