kafka测试数据生成:
package com.dx.kafka; import java.util.Properties; import java.util.Random; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; public class KafkaProducer { public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.0.141:9092,192.168.0.142:9092,192.168.0.143:9092,192.168.0.144:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer(props); int i = 0; Random random=new Random(); while (true) { i++; producer.send(new ProducerRecord<String, String>("my-topic", "key-" + Integer.toString(i), i%3+","+random.nextInt(100))); System.out.println(i); Thread.sleep(1000); if(i%100==0) { Thread.sleep(60*1000); } } // producer.close(); } }
Stream join Stream测试代码:
要求:使用spark structured streaming实时读取kafka中的数据,kafka中的数据包含字段int_id;kafka上数据需要关联资源信息(通过kafka的int_id与资源的int_id进行关联),同时要求资源每天都更新。
使用spark structured streaming实时读取kafka中的数据
Dataset<Row> linesDF = this.sparkSession.readStream()// .format("kafka")// .option("failOnDataLoss", false)// .option("kafka.bootstrap.servers", "192.168.0.141:9092,192.168.0.142:9092,192.168.0.143:9092,192.168.0.144:9092")// .option("subscribe", "my-topic")// .option("startingOffsets", "earliest")// .option("maxOffsetsPerTrigger", 10)// .load(); StructType structType = new StructType(); structType = structType.add("int_id", DataTypes.StringType, false); structType = structType.add("rsrp", DataTypes.StringType, false); structType = structType.add("mro_timestamp", DataTypes.TimestampType, false); ExpressionEncoder<Row> encoder = RowEncoder.apply(structType); Dataset<Row> mro = linesDF.select("value").as(Encoders.STRING()).map(new MapFunction<String, Row>() { private static final long serialVersionUID = 1L; @Override public Row call(String t) throws Exception { List<Object> values = new ArrayList<Object>(); String[] fields = t.split(","); values.add(fields.length >= 1 ? fields[0] : "null"); values.add(fields.length >= 2 ? fields[1] : "null"); values.add(new Timestamp(new Date().getTime())); return RowFactory.create(values.toArray()); } }, encoder); mro=mro.withWatermark("mro_timestamp", "15 minutes"); mro.printSchema();
加载资源信息
StructType resulStructType = new StructType(); resulStructType = resulStructType.add("int_id", DataTypes.StringType, false); resulStructType = resulStructType.add("enodeb_id", DataTypes.StringType, false); resulStructType = resulStructType.add("res_timestamp", DataTypes.TimestampType, false); ExpressionEncoder<Row> resultEncoder = RowEncoder.apply(resulStructType); Dataset<Row> resDs = sparkSession.readStream().option("maxFileAge", "1ms").textFile(resourceDir) .map(new MapFunction<String, Row>() { private static final long serialVersionUID = 1L; @Override public Row call(String value) throws Exception { String[] fields = value.split(","); Object[] objItems = new Object[3]; objItems[0] = fields[0]; objItems[1] = fields[1]; objItems[2] = Timestamp.valueOf(fields[2]); return RowFactory.create(objItems); } }, resultEncoder); resDs = resDs.withWatermark("res_timestamp", "1 seconds"); resDs.printSchema();
kafka上数据与资源关联
关联条件int_id相同,同时要求res.timestamp<=mro.timestmap & res.timestamp<(mro.timestmap-1天)
res如果放入broadcast经过测试发现也是可行的。
// JavaSparkContext jsc = // JavaSparkContext.fromSparkContext(sparkSession.sparkContext()); Dataset<Row> cellJoinMro = mro.as("t10")// .join(resDs.as("t11"),// jsc.broadcast(resDs).getValue() functions.expr("t11.int_id=t10.int_id "// + "and t11.res_timestamp<=t10.mro_timestamp "// + "and timestamp_diff(t11.res_timestamp,t10.mro_timestamp,'>','-86400000')"),// "left_outer")// .selectExpr("t10.int_id", "t10.rsrp", "t11.enodeb_id", "t10.mro_timestamp", "t11.res_timestamp"); StreamingQuery query = cellJoinMro.writeStream().format("console").outputMode("update") // .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))// .start();
udf:timestamp_diff定义
sparkSession.udf().register("timestamp_diff", new UDF4<Timestamp, Timestamp, String, String, Boolean>() { private static final long serialVersionUID = 1L; @Override public Boolean call(Timestamp t1, Timestamp t2, String operator, String intervalMsStr) throws Exception { long diffValue=t1.getTime()-t2.getTime(); long intervalMs=Long.valueOf(intervalMsStr); if(operator.equalsIgnoreCase(">")){ return diffValue>intervalMs; }else if(operator.equalsIgnoreCase(">=")){ return diffValue>=intervalMs; }else if(operator.equalsIgnoreCase("<")){ return diffValue<intervalMs; }else if(operator.equalsIgnoreCase("<=")){ return diffValue<=intervalMs; }else if(operator.equalsIgnoreCase("=")){ return diffValue==intervalMs; }else{ throw new RuntimeException("unknown error"); } } },DataTypes.BooleanType);
如果删除资源历史数据,不会导致正在运行的程序抛出异常;当添加新文件到res hdfs路径下时,可以自动被加载进来。
备注:要求必须每天资源文件只能有一份,否则会导致kafka上数据关联后结果重复,同时,res上的每天的文件中包含timestmap字段格式都为yyyy-MM-dd 00:00:00。