业务需求,有一部分动态字段,需要在程序中动态加载并解析表达式:
实现方案1):在MapFunction、MapPartitionFunction中使用FelEngine进行解析:
FelEngine fel = FelEngine.instance; FelContext ctx = fel.getContext(); ctx.set("rsrp", 100); ctx.set("rsrq", 80); expValue = Double.valueOf(String.valueOf(fel.eval("rsrp*10-rsrq*8")));
实现方案2):采用selectExpr()函数
package com.dx.streaming.drivers.test; import org.apache.spark.api.java.function.MapPartitionsFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; import org.apache.spark.sql.catalyst.encoders.RowEncoder; import org.apache.spark.sql.streaming.OutputMode; import org.apache.spark.sql.streaming.StreamingQueryException; import org.apache.spark.sql.streaming.Trigger; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; import scala.collection.JavaConversions; import scala.collection.Seq; import java.util.*; import java.util.concurrent.TimeUnit; public class MrsExpressionDoWithSelectExp { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder().appName("test").master("local[*]").getOrCreate(); StructType type = new StructType(); type = type.add("id", DataTypes.StringType); type = type.add("cellname", DataTypes.StringType); type = type.add("rsrp", DataTypes.StringType); type = type.add("rsrq", DataTypes.StringType); ExpressionEncoder<Row> encoder = RowEncoder.apply(type); Dataset<String> ds = sparkSession.readStream().textFile("E:\\test-structured-streaming-dir\\*"); Dataset<Row> rows = ds.mapPartitions(new MapPartitionsFunction<String, Row>() { private static final long serialVersionUID = -1988302292518096148L; @Override public Iterator<Row> call(Iterator<String> input) throws Exception { List<Row> rows = new ArrayList<>(); while (input.hasNext()) { String line = input.next(); String[] items = line.split(","); rows.add(RowFactory.create(items)); } return rows.iterator(); } }, encoder); rows.printSchema(); int dynamicExprLength=10; Map<String, String> expMap = new LinkedHashMap<>(); // 从配置文件加载配置公式 expMap.put("rsrpq_count", "rsrp+rsrp"); expMap.put("rsrpq_sum", "rsrp*10+rsrq*10"); for(int i=0;i<dynamicExprLength;i++){ expMap.put("rsrpq_sum"+i, "rsrp*10+rsrq*10"); } expMap.put("$rsrpq_avg", "rsrpq_sum/rsrpq_count"); List<String> firstLayerExpList = new ArrayList<>(); List<String> secondLayerExpList = new ArrayList<>(); firstLayerExpList.add("*"); secondLayerExpList.add("*"); for (Map.Entry<String, String> kv : expMap.entrySet()) { if (kv.getKey().startsWith("$")) { secondLayerExpList.add("(" + kv.getValue() + ") as " + kv.getKey().replace("$", "")); } else { firstLayerExpList.add("(" + kv.getValue() + ") as " + kv.getKey()); } } // 第一层计算:select *,(rsrp+rsrp) as rsrpq_count,(rsrp*10+rsrq*10) as rsrpq_sum //rows = rows.selectExpr(firstLayerExpList.toArray(new String[firstLayerExpList.size()] )); Seq<String> firstLayerExpSeq = JavaConversions.asScalaBuffer(firstLayerExpList); rows = rows.selectExpr(firstLayerExpSeq); //rows.show(); // 第二层计算:select *,(rsrpq_sum/rsrpq_count) as rsrpq_avg //rows = rows.selectExpr(secondLayerExpList.toArray(new String[secondLayerExpList.size()] )); Seq<String> secondLayerExpSeq = JavaConversions.asScalaBuffer(secondLayerExpList); rows = rows.selectExpr(secondLayerExpSeq); rows.printSchema(); //rows.show(); rows.writeStream().format("console").outputMode(OutputMode.Append()).trigger(Trigger.ProcessingTime(1,TimeUnit.MINUTES)).start(); try { sparkSession.streams().awaitAnyTermination(); } catch (StreamingQueryException e) { e.printStackTrace(); } } }
此时动态列dynamicExprLength为10,可以正常输出。
ds.selectExpr()问题发现:
当列设置为500或者1000时,本地测试出现以下问题:
19/07/18 14:18:18 INFO CodeGenerator: Code generated in 105.715218 ms 19/07/18 14:18:19 WARN CodeGenerator: Error calculating stats of compiled class. java.io.EOFException at java.io.DataInputStream.readFully(DataInputStream.java:197) at java.io.DataInputStream.readFully(DataInputStream.java:169) at org.codehaus.janino.util.ClassFile.loadAttribute(ClassFile.java:1509) at org.codehaus.janino.util.ClassFile.loadAttributes(ClassFile.java:644) at org.codehaus.janino.util.ClassFile.loadFields(ClassFile.java:623) at org.codehaus.janino.util.ClassFile.<init>(ClassFile.java:280) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anonfun$recordCompilationStats$1.apply(CodeGenerator.scala:996) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anonfun$recordCompilationStats$1.apply(CodeGenerator.scala:993) at scala.collection.Iterator$class.foreach(Iterator.scala:750) at scala.collection.AbstractIterator.foreach(Iterator.scala:1202) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.recordCompilationStats(CodeGenerator.scala:993) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:961) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1027) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1024) at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257) at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000) at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004) at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:906) at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:412) at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:366) at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:32) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:890) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.extractProjection$lzycompute(ExpressionEncoder.scala:263) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.extractProjection(ExpressionEncoder.scala:263) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:287) at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:573) at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:573) at scala.collection.Iterator$$anon$11.next(Iterator.scala:370) at scala.collection.Iterator$$anon$11.next(Iterator.scala:370) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:235) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 19/07/18 14:18:19 INFO CodeGenerator: Code generated in 1354.475257 ms
当发布到yarn上不管是yarn-client还是yarn-cluster都会出现卡死问题,executor/driver创建起来,并且都分配了资源,但是没有任务被分配。
而且没有任何错误日志抛出,一直卡顿,可以持续到无限时间。