RxJava——基础学习(二)

1.简化输出Hello world

上篇中虽然打印出了Hello world,但代码有点多,就简化一下形式。修改rxHello()

 Observable.just("Hello world_1").subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {
                  System.out.println(s);
            }
        });

just(T...)是一个快捷创建队列的方法。但这代码依然有点多,onCompleted()onError(Throwable e),只是打印一下字符串并不需要用到这两个方法。而且Observable不能每次发送一个事件都要重写3个回调方法,这样多余代码太多。RxJavasubscribe()方法还支持不完整定义的回调Action

2.Action0和Action1

  • 共同点:都是接口,都只有一个方法,都没有返回值
  • 不同点:Action0内的方法是call(),Action1内的方法是call(T t)。Action1有形参而Action0没有。

Action1简单实例:

public class RxJava_Action_1 {
    public static void main(String[] args) {
        rxAction1();
    }

    private static void rxAction1() {
       Observable.just(nowTime()).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println(s);
            }
        });

    private static String nowTime() {
        SimpleDateFormat sim = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String time = sim.format(System.currentTimeMillis());
        return time;
    }

代码运行结果:
2016-08-05 10:53:38

有些时候只关心执行onNext(),并不考虑onCompleted()onError(Throwable e),就可以使用Action1。

Action0就可以理解为是执行onCompleted()。这里引用一下扔物线大神的博客来进行说明

Action1<String> onNextAction = new Action1<String>() {
    // onNext()
    @Override
    public void call(String s) {
        Log.d(tag, s);
    }
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    // onError()
    @Override
    public void call(Throwable throwable) {
        // Error handling
    }
};
Action0 onCompletedAction = new Action0() {
    // onCompleted()
    @Override
    public void call() {
        Log.d(tag, "completed");
    }
};
// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

3.操作符map

map()就是对象的变换。对于Observer只是希望它来响应事件,并不希望参与更改事件。
例如上面的代码打印出了当前时间2016-08-05 10:53:38。假设这样一个场景nowTime()是一个第三库中的方法,返回的对象结果本身不能直接修改,如果想要在时间前加上几个汉字来说明一下,就可以用到操作符map。在上面的代码中加入一个rxMap()方法。

  private static void rxMap() {
        Observable.just(nowTime()).map(new Func1<String, String>() {
            @Override
            public String call(String s) {
                return "北京时间:" + s;
            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println(s);
            }
        });
    }

代码运行结果:
rxAction1() 方法结果—-> 2016-08-05 11:22:40
rxMap()方法结果—-> 北京时间:2016-08-05 11:22:40

map()方法的参数是Func1Func1也是接口,和 Action1 相似,区别就是Func1有返回值。同ActionX一样,FuncX也有多个。
map是一对一的变换。

4.flatMap

感觉这货和Handler一样,总觉得给人一种貌似已经懂了,但真讲起来,总觉得又有点不明白。

个人理解,flatMap是可以一对多的变换。嗯,这里说明一下,不要受3map例子所局限,变换不单单是只字符串内容变化,而是操作对象发生了变化。。。还是直接上代码,借鉴了扔物线大神的举例,有这样一个需求,一个学生有多门课程,要打印出每门课的成绩。这时如果用map,就不够方便。需要用flatMap

public class RxJava_FlatMap_1 {
    private static List<Student> list = new ArrayList<>();

    public static void main(String[] args) {
        addData();
        showScore();
    }

   private static void showScore() {
        Observable.from(list)
                .filter(new Func1<Student, Boolean>() {
                    @Override
                    public Boolean call(Student student) {
                        return student.getId() == 102;
                    }
                })
                .flatMap(new Func1<Student, Observable<Student.Course>>() {
                    @Override
                    public Observable<Student.Course> call(Student student) {
                        return Observable.from(student.getCourseList());
                    }
                })
                .subscribe(new Action1<Student.Course>() {
                    @Override
                    public void call(Student.Course course) {
                        System.out.println(course.getCourseName() + "-->" + course.getScore());
                    }
                });
    }


    private static void addData() {
        Student s_1 = new Student();
        s_1.setName("张三");
        s_1.setId(101);
        List<Student.Course> list_1 = new ArrayList<>();
        list_1.add(new Student.Course("语文", 98));
        list_1.add(new Student.Course("数学", 99));
        s_1.setCourseList(list_1);

        Student s_2 = new Student();
        s_2.setName("李四");
        s_2.setId(102);
        List<Student.Course> list_2 = new ArrayList<>();
        list_2.add(new Student.Course("语文", 96));
        list_2.add(new Student.Course("数学", 97));
        s_2.setCourseList(list_2);

        list.add(s_1);
        list.add(s_2);
    }
}

addData()这个方法就是把Student对象添加个集合中。Student这个对象有名字,ID,还有学科列表。而每个学科有名字有分数。

from()这个方法有好5个重载方法,形参可以是数组,集合,Futurefrom(T[]) / from(Iterable<? extends T>),将传入的数组或 Iterable拆分成具体对象后,依次发送出来。Future这个暂时还不理解。
filter()过滤,本例子中是将ID为102的学生所有的学科成绩打印。

flatMap()参数也是func1(),但和map不同的是func1()泛型不同。返回的对象是Observable

想了想,暂时组织不出语言来说明,就直接引用扔物线大神的说明

flatMap() 和 map() 有一个相同点:它也是把传入的参数转化之后返回另一个对象。但需要注意,和 map() 不同的是, flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中。
flatMap() 的原理是这样的:

  1. 使用传入的事件对象创建一个 Observable 对象;
  2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;
  3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。这三个步骤,把事件拆成了两级,通过一组新创建的 Observable将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是 flatMap() 所谓的 flat。

还有图解,具体的可以去扔物线大神的博客看。

感觉flatMapmap自己动手写一个小例子,比之前光看清晰多了,基本用法懂了一点。和我一样的新人,不要光看,自己敲敲,就会有些理解了。

5.调度器Scheduler

作为一个搞Android开发的,Android的网络请求都要另开子线程,然而刷新UI却不能直在子线程中进行,这就有些痛苦了。感觉这也是为啥Android的开发者们一直都喜欢捣鼓各种网络请求库的原因。感觉RxJava在Android中受欢迎的很大的原因就是因为调度器。

.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程

不再需要Handler,不再需要使用各种回调了。

个人理解,调度器就是可以理解成一个控制线程的工具。

  • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
  • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
  • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
  • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
  • Android 还有一个专用的AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

subscribeOn()用于指定Observable.subscribeOn()方法运行的线程 也就是Observable.OnSubscribe被激活时所处的线程。或者叫做事件产生的线程。很多时候都是指定在Schedulers.io()

observeOn()用于指定Observer或者Subscriber的回调线程。Android开发中往往是AndroidSchedulers.mainThread()

注意:
SubscriberonStart()方法虽然是在事件还没有发出去之前调用,但并不能用来进行UI的操作。因为它总是在subscribe()订阅方法所发生的线程被调用,而不能指定线程。

假如在进行网络请求前,想要有一个Progressbar来提示就可以使用doOnSubscribe()doOnSubscribe()也是在事件没有发出前调用,但它可以指定线程。默认,doOnSubscribe()执行在subscribe()订阅方法所在线程,但如果doOnSubscribe()后有subscribeOn()指定线程的话,doOnSubscribe()就会在离它最近的subscribeOn()指定线程中运行。

doOnNext()这个方法在每次onNext()方法执行前执行。

6.关于取消订阅

Observable.subscribeOn()返回一个接口对象Subscription,有两个方法isUnsubscribed()unsubscribe()

Subscription subscription = observer.subscribe(new Action1<Student.Course>() {
            @Override
            public void call(Student.Course course) {
                System.out.println(course.getCourseName() + "-->" + course.getScore());
            }
        });
        
//取消订阅        
if (!subscription.isUnsubscribed()) {
     subscription.unsubscribe();
}

7.最后

操作符太多,本篇也只是记录了我学习mapflatMap,其他的操作符还有很多,但脑袋这会有点累了,其他的慢慢学了。除了多敲多看多练,没别的方法。
多锻炼身体,按时吃饭。

    原文作者:英勇青铜5
    原文地址: https://www.jianshu.com/p/9ae1424933bb
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞