问题起因:使用Spark2.0的Java接口编写日志处理逻辑,在对日志做Join操作的过程中,遇到了匪夷所思的错误,问题大概可以抽象成下面一段代码:
@Test
public void testFilter() throws Exception{
SparkConf conf = new SparkConf()
.setMaster("local[3]")
.setAppName("test");
JavaSparkContext context = new JavaSparkContext(conf);
List<String> list1 = Arrays.asList("a", "b", null, "c");
List<Integer> list2 = Arrays.asList(1, 2, null, 3);
context.parallelize(list1)
.filter(Objects::nonNull);
context.parallelize(list2)
.filter(Objects::nonNull)
//System.out::println 会报Serializable Issue .foreach(a -> System.out.println(a));
}
这段代码中创建了两个List,一个String的,一个Integer的,然后调用java.util.Objects.nonNull()方法分别执行过滤掉null的逻辑。
这段代码是能够通过编译的,但是一旦运行,就会出现下面的异常:
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
at org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.apply(JavaRDD.scala:76)
可以看到,在list1上的过滤操作甚至都没有真正执行(没有使用Spark的shuffle操作),那么这个问题是怎么产生的呢?最初怀疑是method-ref的引用问题,是不是说在method-ref委派的过程中,由于应用到的类型不同,产生了二义性问题呢?使用下面的代码测试:
@Test
public void testBasicFilter() {
List<String> list1 = Arrays.asList("a", "b", null, "c");
List<Integer> list2 = Arrays.asList(1, 2, null, 3);
list1.stream()
.filter(Objects::nonNull)
.forEach(System.out::println);
list2.stream()
.filter(Objects::nonNull)
.forEach(System.out::println);
}
这段代码使用JDK自带的stream替代Spark进行相同的逻辑,结果是不但正确通过编译,执行结果也正常,执行结果如下:
a
b
c
1
2
3
这就否定了之前的假设,那么究竟是什么导致了异常呢?仔细思考,Java8的Stream和Spark究竟有什么本质的不同呢:都是并行处理框架,但是Spark是分布式的,分布式的涉及到网络传输,这必然涉及到数据处理任务可能会通过网络进行传输(Spark确实会把任务广播到各个节点上),因此一定会涉及到Task的序列化,会不会是序列化出现了问题?
为了验证这个猜测,构造了如下的代码:
import org.apache.commons.lang3.SerializationUtils;
import org.junit.Test;
import java.io.Serializable;
import java.util.Objects;
import java.util.function.Function;
public class LambdaSerializationTest {
private static <T> T roundtrip(Serializable obj) {
byte[] bytes = SerializationUtils.serialize(obj);
@SuppressWarnings("unchecked")
T t = (T) SerializationUtils.deserialize(bytes);
return t;
}
@Test
public void test() {
SerializableFunction<Integer, Boolean> func = Objects::nonNull;
System.out.println(func.apply(1));
SerializableFunction<String, Boolean> fun1 = roundtrip(func);
System.out.println(fun1.apply("a"));
}
private interface SerializableFunction<T, R> extends Function<T, R>, Serializable {
}
}
这段代码先将Objects::nonNull向下转为Function<Integer,Boolean>类型
反序列化回来的过程中反序列化为Function<String,Boolean>类型
Object.nonNull()方法的原型如下:
/**
* Returns {@code true} if the provided reference is non-{@code null}
* otherwise returns {@code false}.
*
* @apiNote This method exists to be used as a
* {@link java.util.function.Predicate}, {@code filter(Objects::nonNull)}
*
* @param obj a reference to be checked against {@code null}
* @return {@code true} if the provided reference is non-{@code null}
* otherwise {@code false}
*
* @see java.util.function.Predicate
* @since 1.8
*/
public static boolean nonNull(Object obj) {
return obj != null;
}
所以对类型是没有要求的,是不是说明这段代码可以正确运行呢?我们执行这段代码:
true
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
...
看,java.lang.ClassCastException又回来了。
这说明:
lambda表达式在序列化的过程中,不同于Java泛型的运行时擦除机制,会对类型进行特化,序列化前后的lambda表达式是携带类型信息的。
之后我又参考了R大在知乎上关于Lambda表达式与序列化的一个回答,证明了之前的假设。
问题解决:
在Spark中,同一个method-ref可能会绑定到同一个serializable lambda,再次重用如果类型不匹配就会引发异常;如果使用匿名函数的形式,匿名函数可以匹配到对应的类型,类型不同对应就是两个serializable lambda,因此不会引发问题。
Objects::nonNull => str -> str != null
Objects::nonNull => int -> int != null