java 8流中的一个简单的list.parallelStream()似乎没有工作窃取?

从这个问题“

Will inner parallel streams be processed fully in parallel before considering parallelizing outer stream?“,我知道流执行工作窃取.但是,我注意到它似乎经常不会发生.例如,如果我有一个说100,000个元素的列表,我尝试在parallelStream()中处理它时尚,我经常注意到我的大多数CPU内核都处于“等待”状态.(注意:在列表中的100,000个元素中,有些元素需要很长时间才能处理,而其他元素则很快;而且,这个列表是不平衡的,这就是为什么有些线程可能会“运气不好”并且有很多工作要做,而其他线程则很幸运且没有什么可做的.

所以,我的理论是JIT编译器将100,000个元素初始划分为16个线程(因为我有16个核心),但是在每个线程中,它只是执行一个简单的(顺序)for循环(因为那将是最有效率的,因此不会发生工作偷窃(这是我所看到的).

我认为Will inner parallel streams be processed fully in parallel before considering parallelizing outer stream?显示工作窃取的原因是有一个流式传输的OUTER循环和流式传输的INNER循环,因此在这种情况下,每个内部循环都在运行时进行评估,并创建可以在运行时,被分配给“空闲”线程.思考?有什么我做错了会强制一个简单的list.parallelStream()来使用偷工作吗? (我目前的解决方法是尝试平衡基于各种heurestics的列表,以便每个线程通常看到相同数量的工作;但是,很难预测……)

最佳答案 这与JIT编译器无关,但与Stream API的实现无关.它将工作负载划分为由工作线程顺序处理的块.一般策略是拥有比工作线程更多的工作来实现工作窃取,例如参见
ForkJoinTask.getSurplusQueuedTaskCount(),其可以用于实现这样的自适应策略.

当源是ArrayList时,以下代码可用于检测顺序处理的元素数:

List<Object> list = new ArrayList<>(Collections.nCopies(10_000, ""));
System.out.println(System.getProperty("java.version"));
System.out.println(Runtime.getRuntime().availableProcessors());
System.out.println( list.parallelStream()
    .collect(
        () -> new ArrayList<>(Collections.singleton(0)),
        (l,x) -> l.replaceAll(i -> i + 1),
        List::addAll) );

在我当前的测试机器上,它打印:

1.8.0_60
4
[625, 625, 625, 625, 625, 625, 625, 625, 625, 625, 625, 625, 625, 625, 625, 625]

因此,存在比核心更多的块,以允许工作窃取.但是,一旦块的顺序处理开始,就不能进一步拆分,因此当每个元素的执行时间明显不同时,这种实现具有局限性.这始终是一种权衡.

点赞