spark从入门到放弃五十:Spark Streaming(10)实时黑名单过滤

文章地址:http://www.haha174.top/article/details/254946
transform 操作,应用在DStream 上时,可以用于执行任意的RDD 到RDD 转换的操作。他可以用于实现,DStream api 所没有提供的操作。比如说,DStream api 中并没有提供一个DStream 中的每个batch ,与一个特定的RDD 进行join 操作。但是我们自己就可以使用transform 操作来实现该功能。
DStream.join() 只能join 其他的DStream ,在DStream 每个 batch 的RDD 计算出来之后,会去跟其他的DStream 的RDD 行join
下面给出java 示例和注释:

public class TransFormBlackList {
    public static void main(String[] args) throws InterruptedException {
        System.setProperty("HADOOP_USER_NAME", "root");
        SparkConf conf=new SparkConf().setMaster("local[2]").setAppName("TransFormBlackList");
        JavaSparkContext sc=new JavaSparkContext(conf);
        JavaStreamingContext jssc=new JavaStreamingContext(sc, Durations.seconds(5));
        //  用户对我们的网站上面的广告可以进行点击
        //点击之后,要进行实时的计费,点一下,算一次钱
        //但是对于某些无良商家刷广告的人,那么我们就有一个黑名单
        //只要是黑名单中点击的都过滤掉
        //  日志格式  date  username
        //  先做一份模拟的黑名单
        List<Tuple2<String,Boolean>> blackListData=new ArrayList<>();
        blackListData.add(new Tuple2<>("tom",true));
       final JavaPairRDD<String,Boolean> blackListRDD=sc.parallelizePairs(blackListData);
        JavaReceiverInputDStream<String> adsClickLog=jssc.socketTextStream("www.codeguoj.cn",9999);
        //  首先需要转换一个格式(date,date username)
        //以便于后面 于黑名单RDD  进行join
        JavaPairDStream<String,String> userAdsClickDStream=adsClickLog.mapToPair(new PairFunction<String, String, String>() {
            @Override
            public Tuple2<String, String> call(String s) throws Exception {
                return new Tuple2<>(s.split(" ")[1],s);
            }
        });
        //执行transform  操作实时进行黑名单过滤

        JavaDStream<String> validAdfClickLogDStream= userAdsClickDStream.transform(new Function<JavaPairRDD<String, String>, JavaRDD<String>>() {
           private static final  long serialVersionUID=1L;
            @Override
            public JavaRDD<String> call(JavaPairRDD<String, String> v1) throws Exception {
                //  并不是每个用户都存在给名单中所以使用做外链接

                JavaPairRDD<String, Tuple2<String, org.apache.spark.api.java.Optional<Boolean>>> joindRDD= v1.leftOuterJoin(blackListRDD);
                //这里得到的是用户的点击日志  和在黑名单的状态
                JavaPairRDD<String, Tuple2<String, org.apache.spark.api.java.Optional<Boolean>>> filterRDD=joindRDD.filter(new Function<Tuple2<String, Tuple2<String, org.apache.spark.api.java.Optional<Boolean>>>, Boolean>() {
                    private static final  long serialVersionUID=1L;
                    @Override

                    public Boolean call(Tuple2<String, Tuple2<String, org.apache.spark.api.java.Optional<Boolean>>> v1) throws Exception {
                        if(v1._2._2().isPresent()&&v1._2._2().get()){
                            return false;
                        }
                        return true;
                    }
                });
                JavaRDD<String> validAdfClickRDD=filterRDD.map(new Function<Tuple2<String, Tuple2<String, org.apache.spark.api.java.Optional<Boolean>>>, String>() {
                    @Override
                    public String call(Tuple2<String, Tuple2<String, org.apache.spark.api.java.Optional<Boolean>>> v1) throws Exception {
                        return v1._2._1;
                    }
                });
                return validAdfClickRDD;

            }
        });
        validAdfClickLogDStream.print();
        jssc.start();
        jssc.awaitTermination();
        jssc.stop();
    }
}

下面给出scala 示例和注释

object TransFormBlackList {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("updateStateByKeyWorldCount").setMaster("local[2]")

    val jssc = new StreamingContext(conf, Durations.seconds(5))
    //首先创建输入DStream   代表一个数据源 (比如kafka  ,socket)  来持续不断的实时数据流
    //调用JavaStreamingContext 的socketTextStream   方法可以创建一个数据源为socket  的网路端口的数据源
    val blackList=Array(("tom",true))
    val blackListRDD=jssc.sparkContext.parallelize(blackList,5);
    val lines = jssc.socketTextStream("www.codeguoj.cn", 9999)

    val userAdsClickLogDStream=lines.map(adfLog=>(adfLog.split(" ")(1),adfLog))

    val validAdfClickRDD=userAdsClickLogDStream.transform(userAdsClickLogRDD=>{
      val joinedRDD=userAdsClickLogRDD.leftOuterJoin(blackListRDD)
      val fliterRDD=joinedRDD.filter(tupe=>{
        if(tupe._2._2.getOrElse(false)){
          false
        }else{
          true
        }
      })
      val clickedLogRDD=fliterRDD.map(tupe=>tupe._2._1)
      clickedLogRDD
    })
    validAdfClickRDD.print()
    jssc.start()
    jssc.awaitTermination()
  }
}

欢迎关注,更多福利

《spark从入门到放弃五十:Spark Streaming(10)实时黑名单过滤》 这里写图片描述

    原文作者:意浅离殇
    原文地址: https://www.jianshu.com/p/53856e53bb55
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞