RxJava- Group,Emit和Zip将具有共同属性的“Chunks”排序?

我和我的同事经常遇到挑战,我希望反应式编程可以解决它.它可能需要我自己实现的Operator或Transformer.

我想采取任何Observable< T>发送T项,但是我希望运算符将它们分组为T的映射并将每个分组作为List< T>发出,或者甚至更好地发布一些通用累加器,就像Java 8流中的收集器一样.

但是这里是我认为groupBy()无法做到的棘手部分.我想通过这个运算符获取两个Observable,并假设发出的项目在该属性上排序(传入的数据将从排序的SQL查询中发出并映射到T对象).运算符将连续累积项目直到属性更改,然后它将发出该组并继续进行下一个项目.这样我就可以从每个Observable中获取每组数据,压缩并处理这两个块,然后扔掉它们继续前进到下一个块.这样我可以保持半缓冲状态并保持低内存使用率.

因此,如果我对PARTITION_ID进行排序,分组和压缩,这在视觉上是我想要完成的.

我只是这样做,因为我可以有两个查询,每个查询超过一百万条,我需要并排进行复杂的比较.我没有内存可以同时从双方导入所有数据,但我可以将其范围缩小到每个已排序的属性值并将其分解为批次.每批后,GC将丢弃它,运算符可以继续下一个.

这是我到目前为止的代码,但我有点不清楚如何继续,因为我不想发出任何东西,直到批处理完成.我到底该怎么做?

public final class  SortedPartitioner<T,P,C,R> implements Transformer<T,R> {

 private final Function<T,P> mappedPartitionProperty;
 private final Supplier<C> acculatorSupplier;
 private final BiConsumer<T,R> accumulator;
 private final Function<C,R> finalResult;


 private SortedPartitioner(Function<T, P> mappedPartitionProperty, Supplier<C> acculatorSupplier,
   BiConsumer<T, R> accumulator, Function<C, R> finalResult) {
  this.mappedPartitionProperty = mappedPartitionProperty;
  this.acculatorSupplier = acculatorSupplier;
  this.accumulator = accumulator;
  this.finalResult = finalResult;
 }
 public static <T,P,C,R> SortedPartitioner<T,P,C,R> of(
   Function<T,P> mappedPartitionProperty, 
   Supplier<C> accumulatorSupplier,
   BiConsumer<T,R> accumulator,
   Function<C,R> finalResult) {

  return new SortedPartitioner<>(mappedPartitionProperty, accumulatorSupplier, accumulator, finalResult);

 }
 @Override
 public Observable<R> call(Observable<T> t) {
  return null;
 }

}

最佳答案 另一个答案是在Maven Central上使用库而且更短.

将此依赖项添加到pom.xml.

<dependency>
    <groupId>com.github.davidmoten</groupId>
    <artifactId>rxjava-extras</artifactId>
    <version>0.5.13</version>
</dependency>

在使用相同partition_id对项目进行分组方面,请执行以下操作:

import com.github.davidmoten.rx.Transformers;

Observable<List<Item>> grouped = items.compose(
    Transformers.toListWhile(
        (list, item) -> list.isEmpty() || list.get(0).partitionId == item.partitionId));

对于此方法,测试非常全面(另请参阅Transformers.collectWhile以了解列表以外的数据结构),但您可以在github上自行检查源代码.

然后继续拉链.

点赞