google-cloud-dataflow – 数据流地图侧输入问题

我在使用DataflowRunner创建Map PCollectionView时遇到问题.

下面的管道将未输入的countingInput与来自侧输入的值(包含10个生成的值)聚合在一起.
在gcp上运行管道时,它会被卡在View.asMap()转换中.
更具体地说,ParDo(StreamingPCollectionViewWriter)没有任何输出.

我尝试使用dataflow 2.0.0-beta3,以及beam-0.7.0-SNAPSHOT,没有任何结果.请注意,使用本地DirectRunner时,我的管道正在运行,没有任何问题.

难道我做错了什么?
感谢所有帮助,感谢提前帮助我!

public class SimpleSideInputPipeline {

    private static final Logger LOG = LoggerFactory.getLogger(SimpleSideInputPipeline.class);

    public interface Options extends DataflowPipelineOptions {}

    public static void main(String[] args) throws IOException {
        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
        Pipeline pipeline = Pipeline.create(options);

        final PCollectionView<Map<Integer, String>> sideInput = pipeline
                .apply(CountingInput.forSubrange(0L, 10L))
                .apply("Create KV<Integer, String>",ParDo.of(new DoFn<Long, KV<Integer, String>>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) {
                        c.output(KV.of(c.element().intValue(), "TEST"));
                    }
                }))
                .apply(View.asMap());

        pipeline
            .apply(CountingInput.unbounded().withRate(1, Duration.standardSeconds(5)))
            .apply("Aggregate with side-input",ParDo.of(new DoFn<Long, KV<Long, String>>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    Map<Integer, String> map = c.sideInput(sideInput);

                    //get first segment from map
                    Object[] values = map.values().toArray();
                    String firstVal = (String) values[0];
                    LOG.info("Combined: K: "+ c.element() + " V: " + firstVal + " MapSize: " + map.size());
                    c.output(KV.of(c.element(), firstVal));
                }
            }).withSideInputs(sideInput));

        pipeline.run();
    }
}

最佳答案 无需担心ParDo(StreamingPCollectionViewWriterFn)不记录任何输出 – 它的作用实际上是将每个元素写入内部位置.

您的代码看起来不错,我应该对此进行调查.我已经提交了BEAM-2155.

点赞