RxJava,你好

在我研究响应式编程的过程中,我所找到的每一篇文章几乎都以响应式编程很难学习的理念开头。针对响应式编程零基础人员准备的文章少之又少。本文尝试通过在android上使用RxJava为初学者厘清响应式编程的基本概念。

什么是响应式编程?

响应式编程就是编程处理异步数据流。

等等,我使用callback也很容易处理异步数据啊。所以这和响应式编程有什么不同呢?

是的,这个概念并不新鲜。它可以通过命令式(imperatively)编程来完成,而且通常都是这么做的。

如果我们不仅仅考虑回调,同时再考虑一下让回调启动并运行的支持机制。使用命令式方法来支持通常会涉及到状态管理还需要考虑状态改变所带来的副作用。在软件开发界,这些考虑已经成为大量错误的原因。响应式编程采用函数式方法;它处理的是流从而避免了全局状态以及相应的副作用。

什么是流?

万物皆流,无物常住 — 赫拉克利特

流代表一个数据序列。想象一下我们的交通系统。某条高速路上的汽车就是一条一直流动偶尔出现一个瓶颈的对象流。所谓响应式编程,就是我们接收连续流动的数据–数据流–提供处理数据流的方法并将该方法应用到数据流。数据的源头我们并不(也不应)关心。

数据流无处不在。任何物体都可以是数据流:变量,用户输入,属性,缓存,数据结构等等。

什么是声明式编程,什么是命令式编程?

  • 命令式引导你怎么做
  • 声明式告诉你做什么

在深入代码之前,我们还是来看一下我们的交通系统网络。让我们假设市长想临时在一条指定的高速路上分间隔摆放停车标志来中断车流。市长会说:“将高速路分成均匀的几段,在每一段的边界上放一个停车标志”。承包商会说:“等等,在分段之前,我需要确定每一段的长度;为了确定分段长度,我需要知道高速路的总长,我们要放置多少个停车标志,以及车辆的平均长度”。在这个场景里,市长拥有足够多的职能部门(包括DOT(交通部)),她在处理事情时只需要专注于宣布她的意图,而不用关心事情具体怎么完成的细节–这就是声明式方法。而另一方面。承包商需要保证整个流程的每一处细节都要考虑周全并准确的完成–这就是命令式方法。如果你可以像市长建设她的城市一样构建一个软件会是什么样子呢?我们一起来看一个示例:

例:使用命令式方法过滤掉偶数。

Integer[] numbers = {1, 2, 3, 4, 5};
List<Integer> lists = Arrays.asList(numbers);
List<Integer> results = new ArrayList<>();

for (Integer num : numbers) {
    if (num % 2 != 0)
        results.add(num);
}

声明式方法

List<Integer> results = lists.stream()
        .filter(s -> s % 2 != 0)
        .collect(Collectors.toList());

很酷,我喜欢声明式方法,但是如果我们不告诉它怎么做,计算机怎么知道做什么呢?
在现在的世界里,任何事情最终落实到操作系统和硬件时都是命令式的。而响应式编程,是函数式编程的一种抽象。就和我们所使用的高阶命令式编程语言是底层二进制以及汇编命令的抽象一样(市长也需要有她的DOT承包商)。

所以,我们怎样在Java中使用声明式编程风格呢?
Java8有一个很惊艳的Stream API,但是如果你和我一样是一个Android开发者,你不能使用Stream API,因为android还不支持Java8的所有特性。尽管如此,你可以使用RxJava,这是由Netflix的开发者为Java提供的一个响应式扩展。

RxJava怎么工作?

《RxJava,你好》

响应式代码的基础是被观察者(Observable)观察者(Observer)

  • 被观察者(Observable)可以被监听(和观察者模式中的Subject相似)
  • 观察者(Observer)则监听被观察者

被观察者是一个发送数据流或者事件流的类,观察者则对被观察者发送出的数据/事件做出反应。一个被观察者可以有多个观察者,对于被观察者发送出的每一个事件/项目都会被Observer.onNext()方法接收并处理。一旦被观察者发送完了所有的数据它会调用Observer.onComplete()。如果发生错误,被观察者会调用Observer.onError()方法。

注意: 有的被观察者永远都不会终止(比如温度传感器的输出)

观察者和被观察者之间通过Subscription连接,观察者在后面也可以通过Subscription取消订阅被观察者。

听起来和观察者模式很相似,那么观察者模式和RxJava框架之间有什么区别呢?
RxJava的被观察者为观察者模式添加了两个功能。

  • 当不再产生数据时,生产者会通知消费者。(onComplete()
  • 当发生错误时,生产者会通知消费者。(onError()

除此之外,RxJava的威力在于仅仅只需要几行代码就可以变换聚合过滤被观察者发送的数据流,这样可以极大的减少需要维护的状态变量。

给我看代码

创建一个被观察者(Observable):

Integer[] numbers = {1, 2, 3, 4, 5, 6, 7};
List<Integer> lists = Arrays.asList(numbers);
Observable<Integer> integerObservable = Observable.from(lists);

integerObservable将发射数字1、2、3、4、5、6、7然后结束。

注意: 创建被观察者的方式有很多很多。更多信息可以参考官方文档

Subscriber

Subscriber是一种特殊类型的观察者,它可以取消订阅被观察者。

    Observable.
    Subscriber<Integer> mySubscriber = new Subscriber<Integer>() {
        @Override
        public void onNext(Integer data) {
           Log.d("Rx", "onNext:"+data);
         }
    
        @Override
        public void onCompleted() {     
           Log.d("Rx","Complete!"); 
        }
    
        @Override
        public void onError(Throwable e) { 
          // handle your error here
        }
    };

将Subscriber连接到被观察者:

被观察者是惰性的,在没有订阅者监听之前它不会做任何事情。

    myObservable.subscribe(mySubscriber);
    // Outputs:
    // onNext: 1
    // onNext: 2
    // onNext: 3
    // onNext: 4
    // onNext: 5
    // onNext: 6
    // onNext: 7
    // Complete!

改变流:

RxJava提供了许多改变流的运算符。下面几个操作方法是最常用的。

  • Filter:Filter运算符会过滤被观察者,被观察者发射的数据中只有通过你在谓词函数中指定的测试后才能继续往下流动。

      integerObservable.filter(new Func1<Integer, Boolean>() {
          @Override
          public Boolean call(Integer o) {
              return o % 2 == 0;
          }
      }).subscribe(mySubscriber);
      // Outputs :
      // onNext: 2
      // onNext: 4
      // onNext: 6
      // Complete!
    

这里我过滤掉了所有的奇数项。

    ---1---2---3---4----5----6----7---|-->
             filter(x % 2 == 0)
    -------2-------4---------6--------|-->

注意: Func<T, R>表示一个单参数的函数,T是第一个参数的类型,R是返回结果的类型。

  • Map: Map运算符将会将你指定的函数应用到被观察者发射的每一项,并返回一个被观察者,这个被观察者发射的数据就是你指定函数的返回结果。

      integerObservable.map(new Func1<Integer, Integer>() {
          @Override
          public Integer call(Integer value) {
              return value * value;
          }
      }).subscribe(mySubscriber);
      // onNext:1
      // onNext:4
      // onNext:9
      // onNext:16
      // onNext:25
      // onNext:36
      // onNext:49
      // Complete!
    

这里我使用map运算符将发射出的数据改变成另外一个数。我改变了integerObservable所发射出的每一项,所以最后每一个数据都变成了该数据的平方。

    ---1---2---3---4----5----6----7---|-->
            map(x -> x * x)
    ---1---4---9---16---25---36---49---|-->

RxJava中有大量的操作符用于处理流变换。

好吧,你不是说响应式编程是异步的吗?

如果你不告诉它需要使用异步的方式,RxJava默认是同步的。

但是同步是响应式系统必须的行为吗?

确定是使用异步还是同步的被观察者需要根据具体的问题分析。例如:从内存缓存中获取数据并立即返回也许使用同步会更合适。另一方面,如果被观察者会产生网络调用或者一些耗时的数据处理则应该使用异步的方式。总的原则就是:如果是在开发一个图形系统,当一项任务起源于UI线程并且需要阻塞(或者大量的计算操作)时应该采用异步的方式。

在异步从何而来这个问题上,RxJava持不可知论者的态度。

了解,现在告诉我怎么创建一个异步的observable?

首先,我们来看一下使用RxJava之前,将一个密集的长时间的I/O操作转移的其他线程(非ui线程)时的处理方式。

以前的方式

    private class FetchUsersTask 
                extends AsyncTask<String, Void, User> {
    
      protected User doInBackground(String... someData) {
        String userId=params[0];
        User user = UsersService.getUser(userId);
        return user;
      }
    
      protected void onPostExecute(User user) {
        //handle the result and update the view
      }
    }

FetchUsersTask调用usersService.getUsers()并返回一个字符串列表,然后传递给onPostExecute()方法。看起来非常简单,但是这段代码中存在一些问题

  • 错误处理: doInBackground()也许会发生错误或者异常,为了能够从异常中恢复我们需要添加try-catch。通常情况下,当我们捕获到异常时,我们会log输出并且通知用户,这又需要回到UI线程。使用AsyncTask我们也可以将Object作为doInBackground()方法的返回类型,然后在onPostExecute()中使用instanceof来检查类型–这需要更多的代码。
  • 内存泄漏: 即使启动异步任务的Activity/Fragment 被销毁,异步任务仍持续运行直到doInBackground()方法执行完成,因为在后台任务完成后asyncTask需要通知view,所以运行时必须持有Activity/Fragment的引用。如果Activity在后台任务完成之前就被销毁,而开发者又没有采取合适的技术比如弱引用,那么就会导致内存泄漏甚至是应用程序崩溃。也许可以采用另一种方法,就是使用cancel(boolean)来取消正在执行的任务,那么最终会调用onCancelled()方法而不是onPostExecute()。
  • 连续多次网络调用: 编排多个AsyncTask的唯一办法就是使用嵌套,这将使得代码非常复杂。

RxJava方式:

现在我们来看一下怎么使用RxJava来异步加载数据。

    Observable.fromCallable(new Callable<User>() {
          @Override public User call() throws Exception {
             return UsersService.getUser(userId);
           }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<User>() {
          @Override public void onCompleted() {
            Log.d("Rx", "Completed");
          }
    
          @Override public void onError(Throwable e) {
            Log.d("Rx", e.getMessage());
          }
    
          @Override public void onNext(User user) {
            Log.d("Rx", user.getName());
          }
        });

这里subscribeOn(Schedulers.io())将使observable工作在新的线程,而observeOn(AndroidSchedulers.mainThread()))将使订阅者在主UI线程上去处理observable发送出来的结果。

这和AsyncTask很相似但是更简单更简洁。RxJava解决了我前面提到的所有问题。

  • 错误处理:使用RxJava方式以后错误处理变得非常简单,因为所有可能的错误和异常都会抛给onError()方法。由于我们是在主线程监听(AndroidSchedulers.mainThread()),所以我们可以非常便捷的和UI交互从而告知用户相关错误。

  • 内存泄漏:RxJava不会魔力般的减轻内存泄漏的问题,但是要阻止内存泄漏则非常简单。RxJava提供了非常简洁的方式来取消订阅正在执行的异步调用。调用 subscriber 或者 subscription 的 unsubscribe()方法可以让Activity或者Fragment从通知列表中注销掉。如果你有多个subscription,可以使用CompositeSubscription来持有所有的Subscriptions,然后在onDestroy()或者onDestroyView()中一次取消所有的订阅。

      private CompositeSubscription allSubscriptions = 
                new CompositeSubscription();
      //add all the subscription to allSubscriptions
      allSubscriptions.add(subscription1);
      allSubscriptions.add(subscription2); 
      allSubscriptions.add(subscription3);
      //clear all subscription on onDestroy
      @Override    
      public void onDestroy() 
      {        
           super.onDestroy();       
           allSubscriptions.clear();    
      }
    
  • 连续多次网络调用:有许多运算符可以帮助我们串联并修改observable。一旦理解,连续多次进行网络调用将变得非常简单。我们一起来看一下这样的场景,从第一次网络调用中获取到了一个用户ID列表,然后需要对每一个用户ID调用getUser()来获取用户的信息。

      Observable.fromCallable(new Callable<List<String>>() {
        @Override public List<String> call() throws Exception {
          return UserService.getUserIds();
        }
      }).flatMap(new Func1<List<String>, Observable<String>>() {
        @Override public Observable<String> call(List<String> userIds) {
          return Observable.from(userIds);
        }
      }).flatMap(new Func1<String, Observable<User>>() {
        @Override public Observable<User> call(String userId) {
          return Observable.just(UserSerive.getUser(userId));
        }
      })
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(new Subscriber<User>() {
        @Override public void onCompleted() {
          Log.d("Rx", "emit","Completed!");
        }
      
        @Override public void onError(Throwable e) {
          Log.d("Rx", "emit", e.getMessage());
        }
      
        @Override public void onNext(User user) {
          Log.d("Rx", user.getName());
        }
      });
    

使用Lambda后的代码:

    Observable.fromCallable(() -> UsersService.getUserIds())
         .flatMap(userIds -> Observable.from(userIds))
         .flatMap(userId -> Observable.just(UserService.getUser(userId))
         .subscribeOn(Schedulers.io())
         .observeOn(AndroidSchedulers.mainThread()))
         .subscribe(new Subscriber<User>() {
            @Override
            public void onCompleted() {
                Log.d("Rx", "emit","Completed!");
            }
    
            @Override
            public void onError(Throwable e) {
                Log.d("Rx", "emit", e.getMessage());
            }
    
            @Override
            public void onNext(User user) {
                Log.d("Rx", "emit", user.getName());
            }
    });

下面的图标描述了将一个含有单个字符串列表流变化成含有多个用户信息流的过程。

    -------{~~~~~~~~~~~~list of user ids [1,2,3,4,5]~~~~~~~~~}---|-->
       
               flatMap(userIds -> Observable.from(userIds))
    -------1------------2----------3------------4------------5---|--->
              flatMap (userId -> UserService.getUser(userId))
    ----user1--------user2------user3--------user4-------user5---|--->

参考文献

本文译自Howdy RxJava

    原文作者:唐先僧
    原文地址: https://www.jianshu.com/p/be806db4d1e7
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞