RxJava——基础学习(六),过滤操作符

学习资料

1.过滤操作符

作用:对Observable发射的 数据序列 进行 过滤或选择

1.1 first

只发射第一个或者第一个满足某个条件的数据项

1.1.1 first()第一项

简单使用:

public class FirstDemo {
    public static void main(String[] args) {
        first();
    }

    /**
     * 只发送第一项数据
     */
    private static void first() {
        Observable
                .just(1,2,3,4)
                .first()
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.println(integer);
                    }
                });
    }
}

运行结果:

1

1.1.2 first(Func1)

只发送第一个满足条件的数据

简单使用:

    /**
     * 第一个偶数
     */
    private static void firstTrue() {
        Observable
                .just(1,3,4,5)
                .first(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer % 2 == 0;
                    }
                })
                .subscribe((i)->System.out.println("第一个偶数:" + i));
    }

运行结果:

第一个偶数:4

last正好与first相反,是只发送最后一个或者最后一个满足条件的数据项

1.2 take

只发送前N项数据

1.2.1 take(int)

只发送前int项数据项,默认不任何特定的调度器上执行

简单使用:

    /**
     * 只发送前5项数据
     */
    private static void takeInt() {
        Observable
                .interval(500, TimeUnit.MILLISECONDS, Schedulers.immediate())
                .take(5)
                .subscribe((i) -> System.out.print(i + " ,"));
    }

运行结果:

0 ,1 ,2 ,3 ,4 ,

1.2.2 take(long,TimeUnit,Scheduler)

在写定的时间段内,会发送Observable发出的数据项,默认在computation运算调度器上执行

简单使用:

    /**
     * 发送1s以内的数据项
     */
    private static void takeTime() {
        Observable
                .interval(100, TimeUnit.MILLISECONDS, Schedulers.immediate())
                .take(1,TimeUnit.SECONDS ,Schedulers.newThread())
                .subscribe((i) -> System.out.print(i + " ,"));
    }

运行结果:

0 ,1 ,2 ,3 ,4 ,5 ,6 ,7 ,8 ,

发送的数据项只是1s以内的,并不包括1s时的

takeLast是只发送后n项数据

1.3 Skip

跳过数据项

1.3.1 skip(int)

跳过Observable发送的前n项数据项,默认不在任何特定的调度器上执行

简单使用:

    /**
     * 跳过前3项数据
     */
    private static void skipInt() {
        Observable
                .range(1,10)
                .skip(3)
                .subscribe((i) -> System.out.print(i + " ,"));
    } 

运行结果:

4 ,5 ,6 ,7 ,8 ,9 ,10 ,

1.3.2 skip(long,TimeUnit,Scheduler)

跳过给定的时间段内Obsvable发送过来的数据项,默认在computation运算调度器上执行

简单使用:

    /**
     * 发送前5个,500毫秒之后的数据项
     */
    private static void skipTime() {
        Observable
                .interval(100, TimeUnit.MILLISECONDS, Schedulers.immediate())
                .skip(500,TimeUnit.MILLISECONDS)
                .take(5)
                .subscribe((i) -> System.out.print(i + " ,"));
    }

运行结果:

4 ,5 ,6 ,7 ,8 ,

SkipLast就是跳过后n个数据项

1.4 Sample

定期发射Observable最近发射的数据项

1.4.1 sample(long,TimeUnit)

定时查看一个Observable,然后将自上次采样后,Observable最近一次发送的数据发送出去,默认在默认在computation调度器上执行

注意:如果从上次采样后,原始的Observable没有发出数据项,sample操作返回的新的Observable在监测期时间内也不会发射任何数据

简单使用:

    /**
     * 每隔100毫秒,将Observable最近一个发送的数据项发送出去
     */
    private static void sampleTime() {
        Observable
                .interval(30,TimeUnit.MILLISECONDS, Schedulers.immediate())
                .sample(100, TimeUnit.MILLISECONDS)
                .take(5)
                .subscribe((i) -> System.out.print(i + " ,"));

    }

运行结果:

1 ,5 ,8 ,11 ,15 ,

1.4.2 sample(Observable)

  • sample(signal)

当监测到名字为signalObaervable发过来一个信号或者终止时,就对原始Observable发送的数据进行采样,然后将自从上次采样以来最近一次发送的数据发送出去

默认不在任何特定的调度器上执行

简单使用:

    /**
     * 每当收到信号时,将最近发送的一个数据项发送出去
     */
    private static void sameSignal() {
        Observable
                .interval(30,TimeUnit.MILLISECONDS, Schedulers.immediate())
                .sample(Observable.interval(100,TimeUnit.MILLISECONDS))
                .take(5)
                .subscribe((i) -> System.out.print(i + " ,"));
    }

运行结果:

2 ,5 ,8 ,12 ,15 ,

第一个输出的数字是2,因为是从0开始的,每次输出的数字中间都会间隔2个

1.5 Debounce

两次发送数据项间隔大于一段指定的时间,才发射一个数据

注意:最后的onCompleted信号,会紧随着最后一项原始Observable数据项,即使是小于时间间隔,一旦结束到onCompleted信号,整个操作也就结束了,onCompleted通知不会触发限流

1.5.1 debounce(long,TimeUnit)

在指定的时间long间隔进行限流,个人理解,过滤两次数据小于指定间隔的数据项,与上次发送的时间差大于间隔的数据项才进行发送

默认在computation调度器上执行

简单使用:

    /**
     * 输出两次间隔大于150秒的数据项
     */
    private static void deBounceTime() {
        Observable
                .create(new Observable.OnSubscribe<Integer>() {
                    @Override
                    public void call(Subscriber<? super Integer> subscriber) {
                        try {
                            for (int i = 0; i < 5; i++) {
                                //产生在100到200间随机时间间隔
                                TimeUnit.MILLISECONDS.sleep((int) (Math.random() * 100 + 100));
                                subscriber.onNext(i);
                            }
                             //延迟结束信号 否则最后一次一定不会发送
                            TimeUnit.MILLISECONDS.sleep(100);
                            subscriber.onCompleted();
                        } catch (InterruptedException e) {
                            subscriber.onError(e);
                        }
                    }
                })
                .debounce(150, TimeUnit.MILLISECONDS, Schedulers.newThread())
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {
                        System.out.println(" --> onCompleted");
                    }

                    @Override
                    public void onError(Throwable e) {
                         System.out.println(e.getMessage());
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.print(integer + " ,");
                    }
                });
    }

运行结果:

2 ,4 , --> onCompleted

注意subscriber.onCompleted()发送结束通知信号的时机

1.5.2 debounce(Func1)

对原始Observable的每一个数据项应用一个函数进行限流,这个函数返回一个Observable。接到通知前,原始Observable发送的数据项将会被抑制

默认不在任何特定的调度器上执行

简单使用:

    /**
     * 在没有接到通知的150毫秒内,原始Observable发送的数据项将会被抑制
     */
    private static void deBounceSignal() {
        Observable
                .create((subscriber) -> {
                    try {
                        for (int i = 0; i < 5; i++) {
                            //产生在100到200间随机时间间隔
                            TimeUnit.MILLISECONDS.sleep((int) (Math.random() * 100 + 100));
                            subscriber.onNext(i);
                        }
                        //延迟结束信号 否则最后一次一定不会发送
                        TimeUnit.MILLISECONDS.sleep(100);
                        subscriber.onCompleted();
                    } catch (InterruptedException e) {
                        subscriber.onError(e);
                    }
                })
                .debounce(new Func1<Object, Observable<Long>>() {
                    @Override
                    public Observable<Long> call(Object o) {
                        //每隔150毫秒发出一个通知
                        return Observable.interval(150, TimeUnit.MILLISECONDS);
                    }
                })
                .subscribe(new Subscriber<Object>() {
                    @Override
                    public void onCompleted() {
                        System.out.println(" --> onCompleted");
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println(e.getMessage());
                    }

                    @Override
                    public void onNext(Object o) {
                        System.out.print(o + " ,");
                    }
                });
    }   

运行结果:

0 ,2 ,4 , --> onCompleted

注意:最后一个数据项一定会被发送,即使在没有接到通知的150毫秒内

1.6 Distinct

过滤掉重复的数据项,默认不在任何特定的调度器上执行

1.6.1 distinct()

简单使用:

    /**
     * 去除重复项
     */
    private static void distinct() {
        Observable
                .just(1,2,2,3,4,4,5,6,6)
                .distinct()
                .subscribe((i) -> System.out.print(i+" ,"));
    }

运行结果:

1 ,2 ,3 ,4 ,5 ,6 ,

1.6.2 distinct(Func1)

将原始Observable发送的数据项应用一个函数,根据这个函数产生不同的key,之后的数据项便是比较key,而不再管数据项

简单使用:

    /**
     * 根据条件指定过滤的key ,将之后出现 key为"1","2",全部过滤
     */
    private static void distinctKey() {
        Observable
                .just(1,2,2,3,3,4,5,6,6)
                .distinct(new Func1<Integer, String>() {
                    @Override
                    public String call(Integer integer) {
                        //设置key
                        return integer / 2 == 0 ? "1" : "2" ;
                    }
                })
                .subscribe((i) -> System.out.print(i+" ,"));
    }

运行结果:

1 ,2 ,

1.7 ElementAt

只发射索引值为N的数据项,索引从0开始

如果传递的索引为负数,或者索引不小于数据项个数,将会抛出一个IndexOutOfBoundsException异常

简单使用:

public class ElementAtDemo {
    /**
     * 输出索引值为5的数据项,从0开始
     */
    public static void main(String[] args) {
        Observable
                .range(1,10)
                .elementAt(5)
                .subscribe(System.out::println);
    }
}

运行结果:

6

1.8 IgnoreElements

不发射任何数据,只发射Observable的终止通知onErroronCompleted

《RxJava——基础学习(六),过滤操作符》 ignoreElements

若不关心Obsvable发送的数据项,只想在完成时,或者遇到错误终止时收到通知,可以使用,这个操作符永远不会调用观察者的onNext()方法

默认不在任何特定的调度器上执行

2. 最后

感觉过滤操作符比变换操作符理解起来要容易一些

本人很菜,有错误请指出

共鸣 :)

    原文作者:英勇青铜5
    原文地址: https://www.jianshu.com/p/da6dae1362da
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞