Java8 Lambda表达式序列化踩坑记

问题起因:使用Spark2.0的Java接口编写日志处理逻辑,在对日志做Join操作的过程中,遇到了匪夷所思的错误,问题大概可以抽象成下面一段代码:

        @Test
        public void testFilter() throws Exception{
            SparkConf conf = new SparkConf()
                    .setMaster("local[3]")
                    .setAppName("test");
            JavaSparkContext context = new JavaSparkContext(conf);


            List<String> list1 = Arrays.asList("a", "b", null, "c");
            List<Integer> list2 = Arrays.asList(1, 2, null, 3);
            context.parallelize(list1)
                    .filter(Objects::nonNull);

            context.parallelize(list2)
                    .filter(Objects::nonNull)
                    //System.out::println 会报Serializable Issue                     .foreach(a -> System.out.println(a));

        }

这段代码中创建了两个List,一个String的,一个Integer的,然后调用java.util.Objects.nonNull()方法分别执行过滤掉null的逻辑。

这段代码是能够通过编译的,但是一旦运行,就会出现下面的异常:

 java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
	at org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.apply(JavaRDD.scala:76)

可以看到,在list1上的过滤操作甚至都没有真正执行(没有使用Spark的shuffle操作),那么这个问题是怎么产生的呢?最初怀疑是method-ref的引用问题,是不是说在method-ref委派的过程中,由于应用到的类型不同,产生了二义性问题呢?使用下面的代码测试:

        @Test
        public void testBasicFilter() {
            List<String> list1 = Arrays.asList("a", "b", null, "c");
            List<Integer> list2 = Arrays.asList(1, 2, null, 3);
            list1.stream()
                    .filter(Objects::nonNull)
                    .forEach(System.out::println);

            list2.stream()
                    .filter(Objects::nonNull)
                    .forEach(System.out::println);
        }

这段代码使用JDK自带的stream替代Spark进行相同的逻辑,结果是不但正确通过编译,执行结果也正常,执行结果如下:

a
b
c
1
2
3

这就否定了之前的假设,那么究竟是什么导致了异常呢?仔细思考,Java8的Stream和Spark究竟有什么本质的不同呢:都是并行处理框架,但是Spark是分布式的,分布式的涉及到网络传输,这必然涉及到数据处理任务可能会通过网络进行传输(Spark确实会把任务广播到各个节点上),因此一定会涉及到Task的序列化,会不会是序列化出现了问题?

为了验证这个猜测,构造了如下的代码:

import org.apache.commons.lang3.SerializationUtils;
import org.junit.Test;

import java.io.Serializable;
import java.util.Objects;
import java.util.function.Function;

public class LambdaSerializationTest {


    private static <T> T roundtrip(Serializable obj) {
        byte[] bytes = SerializationUtils.serialize(obj);
        @SuppressWarnings("unchecked")
        T t = (T) SerializationUtils.deserialize(bytes);
        return t;
    }

    @Test
    public void test() {
        SerializableFunction<Integer, Boolean> func = Objects::nonNull;
        System.out.println(func.apply(1));
        SerializableFunction<String, Boolean> fun1 = roundtrip(func);
        System.out.println(fun1.apply("a"));
    }

    private interface SerializableFunction<T, R> extends Function<T, R>, Serializable {
    }

}

这段代码先将Objects::nonNull向下转为Function<Integer,Boolean>类型

反序列化回来的过程中反序列化为Function<String,Boolean>类型

Object.nonNull()方法的原型如下:

/**
     * Returns {@code true} if the provided reference is non-{@code null}
     * otherwise returns {@code false}.
     *
     * @apiNote This method exists to be used as a
     * {@link java.util.function.Predicate}, {@code filter(Objects::nonNull)}
     *
     * @param obj a reference to be checked against {@code null}
     * @return {@code true} if the provided reference is non-{@code null}
     * otherwise {@code false}
     *
     * @see java.util.function.Predicate
     * @since 1.8
     */
    public static boolean nonNull(Object obj) {
        return obj != null;
    }

所以对类型是没有要求的,是不是说明这段代码可以正确运行呢?我们执行这段代码:

true

java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer

	...

看,java.lang.ClassCastException又回来了。

这说明:

lambda表达式在序列化的过程中,不同于Java泛型的运行时擦除机制,会对类型进行特化,序列化前后的lambda表达式是携带类型信息的。

之后我又参考了R大在知乎上关于Lambda表达式与序列化的一个回答,证明了之前的假设。

问题解决:

在Spark中,同一个method-ref可能会绑定到同一个serializable lambda,再次重用如果类型不匹配就会引发异常;如果使用匿名函数的形式,匿名函数可以匹配到对应的类型,类型不同对应就是两个serializable lambda,因此不会引发问题。

Objects::nonNull =>    str -> str != null
Objects::nonNull =>    int -> int != null

    原文作者:地狱少女火炮兰
    原文地址: https://zhuanlan.zhihu.com/p/37995638
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞