java – 如何在流中将1个completablefuture划分为多个completablefuture?

例如,我有这样的方法:

public CompletableFuture<Page> getPage(int i) {
    ...
}
public CompletableFuture<Document> getDocument(int i) {
    ...
}
public CompletableFuture<Void> parseLinks(Document doc) {
    ...
}

我的流程:

List<CompletableFuture> list = IntStream
    .range(0, 10)
    .mapToObj(i -> getPage(i))

    // I want method like this:
    .thenApplyAndSplit(CompletableFuture<Page> page -> {
        List<CompletableFuture<Document>> docs = page.getDocsId()
            .stream()
            .map(i -> getDocument(i))
            .collect(Collectors.toList());
        return docs;
    })
    .map(CompletableFuture<Document> future -> {
        return future.thenApply(Document doc -> parseLink(doc);
    })
    .collect(Collectors.toList());

它应该通过flatMap()为CompletableFuture,所以我想实现这个流程:

List<Integer> -> Stream<CompletableFuture<Page>>
              -> Stream<CompletableFuture<Document>>
              -> parse each

UPDATE

Stream<CompletableFuture<Page>> pagesCFS = IntStream
        .range(0, 10)
        .mapToObj(i -> getPage(i));

Stream<CompletableFuture<Document>> documentCFS = listCFS.flatMap(page -> {
    // How to return stream of Document when page finishes?
    // page.thenApply( ... )
})

最佳答案 我还想尝试为CompletableFutures流实现Spliterator,所以这是我的尝试.

请注意,如果您在并行模式下使用此功能,请注意对流使用不同的ForkJoinPool以及在CompletableFuture后面运行的任务.流将等待期货完成,因此如果它们共享相同的执行程序,甚至遇到死锁,您实际上可能会失去性能.

所以这是实现:

public static <T> Stream<T> flattenStreamOfFutures(Stream<CompletableFuture<? extends T>> stream, boolean parallel) {
    return StreamSupport.stream(new CompletableFutureSpliterator<T>(stream), parallel);
}

public static <T> Stream<T> flattenStreamOfFuturesOfStream(Stream<CompletableFuture<? extends Stream<T>>> stream,
                                                           boolean parallel) {
    return flattenStreamOfFutures(stream, parallel).flatMap(Function.identity());
}

public static class CompletableFutureSpliterator<T> implements Spliterator<T> {
    private List<CompletableFuture<? extends T>> futures;

    CompletableFutureSpliterator(Stream<CompletableFuture<? extends T>> stream) {
        futures = stream.collect(Collectors.toList());
    }

    CompletableFutureSpliterator(CompletableFuture<T>[] futures) {
        this.futures = new ArrayList<>(Arrays.asList(futures));
    }

    CompletableFutureSpliterator(final List<CompletableFuture<? extends T>> futures) {
        this.futures = new ArrayList<>(futures);
    }

    @Override
    public boolean tryAdvance(final Consumer<? super T> action) {
        if (futures.isEmpty())
            return false;
        CompletableFuture.anyOf(futures.stream().toArray(CompletableFuture[]::new)).join();
        // now at least one of the futures has finished, get its value and remove it
        ListIterator<CompletableFuture<? extends T>> it = futures.listIterator(futures.size());
        while (it.hasPrevious()) {
            final CompletableFuture<? extends T> future = it.previous();
            if (future.isDone()) {
                it.remove();
                action.accept(future.join());
                return true;
            }
        }
        throw new IllegalStateException("Should not reach here");
    }

    @Override
    public Spliterator<T> trySplit() {
        if (futures.size() > 1) {
            int middle = futures.size() >>> 1;
            // relies on the constructor copying the list, as it gets modified in place
            Spliterator<T> result = new CompletableFutureSpliterator<>(futures.subList(0, middle));
            futures = futures.subList(middle, futures.size());
            return result;
        }
        return null;
    }

    @Override
    public long estimateSize() {
        return futures.size();
    }

    @Override
    public int characteristics() {
        return IMMUTABLE | SIZED | SUBSIZED;
    }
}

它通过变换给定的流< CompletableFuture< T>>来工作.进入这些期货的清单 – 假设建立流是快速的,艰苦的工作由期货本身完成,因此从中制作清单不应该是昂贵的.这也确保所有任务都已被触发,因为它强制处理源流.

为了生成输出流,它只是在流式传输其值之前等待任何未来完成.

一个简单的非并行用法示例(执行程序用于CompletableFutures,以便同时启动它们):

ExecutorService executor = Executors.newFixedThreadPool(20);
long start = System.currentTimeMillis();
flattenStreamOfFutures(IntStream.range(0, 20)
        .mapToObj(i -> CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep((i % 10) * 1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            System.out.println("Finished " + i + " @ " + (System.currentTimeMillis() - start) + "ms");
            return i;
        }, executor)), false)
        .forEach(x -> {
            System.out.println(Thread.currentThread().getName() + " @ " + (System.currentTimeMillis() - start) + "ms handle result: " + x);
        });
executor.shutdown();

输出:

Finished 10 @ 103ms
Finished 0 @ 105ms
main @ 114ms handle result: 10
main @ 114ms handle result: 0
Finished 1 @ 1102ms
main @ 1102ms handle result: 1
Finished 11 @ 1104ms
main @ 1104ms handle result: 11
Finished 2 @ 2102ms
main @ 2102ms handle result: 2
Finished 12 @ 2104ms
main @ 2105ms handle result: 12
Finished 3 @ 3102ms
main @ 3102ms handle result: 3
Finished 13 @ 3104ms
main @ 3105ms handle result: 13
…

正如您所看到的,即使期货未按顺序完成,流也几乎立即产生价值.

将它应用于问题中的示例,这将给出(假设parseLinks()返回CompletableFuture< String>而不是〜< Void>):

flattenStreamOfFuturesOfStream(IntStream.range(0, 10)
                .mapToObj(this::getPage)
                // the next map() will give a Stream<CompletableFuture<Stream<String>>>
                // hence the need for flattenStreamOfFuturesOfStream()
                .map(pcf -> pcf
                        .thenApply(page -> flattenStreamOfFutures(page
                                        .getDocsId()
                                        .stream()
                                        .map(this::getDocument)
                                        .map(docCF -> docCF.thenCompose(this::parseLinks)),
                                false))),
        false)
.forEach(System.out::println);
点赞