我和我的同事经常遇到挑战,我希望反应式编程可以解决它.它可能需要我自己实现的Operator或Transformer.
我想采取任何Observable< T>发送T项,但是我希望运算符将它们分组为T的映射并将每个分组作为List< T>发出,或者甚至更好地发布一些通用累加器,就像Java 8流中的收集器一样.
但是这里是我认为groupBy()无法做到的棘手部分.我想通过这个运算符获取两个Observable,并假设发出的项目在该属性上排序(传入的数据将从排序的SQL查询中发出并映射到T对象).运算符将连续累积项目直到属性更改,然后它将发出该组并继续进行下一个项目.这样我就可以从每个Observable中获取每组数据,压缩并处理这两个块,然后扔掉它们继续前进到下一个块.这样我可以保持半缓冲状态并保持低内存使用率.
因此,如果我对PARTITION_ID进行排序,分组和压缩,这在视觉上是我想要完成的.
我只是这样做,因为我可以有两个查询,每个查询超过一百万条,我需要并排进行复杂的比较.我没有内存可以同时从双方导入所有数据,但我可以将其范围缩小到每个已排序的属性值并将其分解为批次.每批后,GC将丢弃它,运算符可以继续下一个.
这是我到目前为止的代码,但我有点不清楚如何继续,因为我不想发出任何东西,直到批处理完成.我到底该怎么做?
public final class SortedPartitioner<T,P,C,R> implements Transformer<T,R> {
private final Function<T,P> mappedPartitionProperty;
private final Supplier<C> acculatorSupplier;
private final BiConsumer<T,R> accumulator;
private final Function<C,R> finalResult;
private SortedPartitioner(Function<T, P> mappedPartitionProperty, Supplier<C> acculatorSupplier,
BiConsumer<T, R> accumulator, Function<C, R> finalResult) {
this.mappedPartitionProperty = mappedPartitionProperty;
this.acculatorSupplier = acculatorSupplier;
this.accumulator = accumulator;
this.finalResult = finalResult;
}
public static <T,P,C,R> SortedPartitioner<T,P,C,R> of(
Function<T,P> mappedPartitionProperty,
Supplier<C> accumulatorSupplier,
BiConsumer<T,R> accumulator,
Function<C,R> finalResult) {
return new SortedPartitioner<>(mappedPartitionProperty, accumulatorSupplier, accumulator, finalResult);
}
@Override
public Observable<R> call(Observable<T> t) {
return null;
}
}
最佳答案 另一个答案是在Maven Central上使用库而且更短.
将此依赖项添加到pom.xml.
<dependency>
<groupId>com.github.davidmoten</groupId>
<artifactId>rxjava-extras</artifactId>
<version>0.5.13</version>
</dependency>
在使用相同partition_id对项目进行分组方面,请执行以下操作:
import com.github.davidmoten.rx.Transformers;
Observable<List<Item>> grouped = items.compose(
Transformers.toListWhile(
(list, item) -> list.isEmpty() || list.get(0).partitionId == item.partitionId));
对于此方法,测试非常全面(另请参阅Transformers.collectWhile以了解列表以外的数据结构),但您可以在github上自行检查源代码.
然后继续拉链.