利用RXAndroid优雅地实现事件总线

最近迷上了RXAndroid,看到了网上有人用RXAndroid实现的事件总线,但是功能非常不完善,例如只能通过在观察者那里通过filter来过滤感兴趣的事件,但是现在有一个问题,如果我过滤到了感兴趣的事件但是每一个事件又有一个标志用来标识要实现什么功能。例如一个音乐播放的事件,我需要知道到底是开始播放还是暂停播放亦或是停止播放,这些功能通过网上的RXBus是无法实现的,只能在观察者的call里面使用if-elseif来判断,这样就极大的增加了代码的复杂程度,迷之缩进,在后期维护起来非常的不方便,于是我就写了一个职责链来处理这个问题。

思路:使用职责链模式来分配职责,使用模板方法模式来实现具体的操作。

具体实现代码如下:

package com.mitnick.root.mycample.util.rxbus;

import rx.Observable;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;

/**
 * Created by root on 16-4-12.
 * 事件体只会被发射到相对应的主题的订阅者
 */
public class RxBus {
    private static volatile RxBus defaultInstance;
    // 主题
    private final Subject bus;
    public RxBus() {
        bus = new SerializedSubject<>(PublishSubject.create());
    }
    // 单例RxBus
    public static RxBus getDefault() {
        RxBus rxBus = defaultInstance;
        if (defaultInstance == null) {
            synchronized (RxBus.class) {
                rxBus = defaultInstance;
                if (defaultInstance == null) {
                    rxBus = new RxBus();
                    defaultInstance = rxBus;
                }
            }
        }
        return rxBus;
    }

    /**
     * 事件总线发射一个事件
     * @Param rxBusEvent : 事件体
     * */
    public void post (RxBusEvent rxBusEvent) {
        bus.onNext(rxBusEvent);
    }

    /**
     * 订阅事件总线的某一个主题
     * @Param tag : 订阅的主题,事件总线只会对感兴趣的主题发射事件
     * @Param eventType : 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
     * */
    public <T> Observable<T> toObserverable (String tag, Class<T> eventType) {
        return bus.ofType(eventType);
//        这里感谢小鄧子的提醒: ofType = filter + cast
//        return bus.filter(new Func1<Object, Boolean>() {
//            @Override
//            public Boolean call(Object o) {
//                return eventType.isInstance(o);
//            }
//        }) .cast(eventType);
    }
}
package com.mitnick.root.mycample.util.rxbus;

/**
 * Created by root on 16-4-12.
 */
public class RxBusEvent {
    private String tag;
    private String event;
    private Object object;

    public RxBusEvent(){}

    public RxBusEvent(String tag, String event, Object object){
        this.tag = tag;
        this.event = event;
        this.object = object;
    }

    public String getEvent() {
        return event;
    }

    public void setEvent(String event) {
        this.event = event;
    }

    public String getTag() {
        return tag;
    }

    public void setTag(String tag) {
        this.tag = tag;
    }

    public Object getObject() {
        return object;
    }

    public void setObject(Object object) {
        this.object = object;
    }
}
package com.mitnick.root.mycample.util.rxbus.responslink;


import com.mitnick.root.mycample.util.rxbus.RxBusEvent;

/**
 * Created by root on 16-5-17.
 */
public interface Handler {
    void operator(RxBusEvent rxBusEvent);
}
package com.mitnick.root.mycample.util.rxbus.responslink;


/**
 * Created by root on 16-5-17.
 */
public class HandlerFactory {
    public static ResponseNode getHandler(){
        return new ResponseNode(new DoSomthing(null) {
            @Override
            public boolean dosomthing() {
                return false;
            }
        });
    }
}
package com.mitnick.root.mycample.util.rxbus.responslink;


import com.mitnick.root.mycample.util.rxbus.RxBusEvent;

/**
 * Created by root on 16-5-17.
 */
public class ResponseNode implements Handler {

    private DoSomthing doSomthing;
    private Handler handler;

    public ResponseNode(DoSomthing doSomthing){
        this.doSomthing = doSomthing;
    }

    public Handler getHandler() {
        return handler;
    }

    public ResponseNode setHandler(ResponseNode handler) {
        this.handler = handler;
        return handler;
    }

    @Override
    public void operator(RxBusEvent rxBusEvent) {
        //本节点操作
        if(!doSomthing.startDo(rxBusEvent)){
            //本节点无法处理事件,交给下一个节点处理
            if(getHandler() != null){
                getHandler().operator(rxBusEvent);
            }
        }
    }
}
package com.mitnick.root.mycample.util.rxbus.responslink;


import android.support.annotation.Nullable;
import android.util.Log;

import com.mitnick.root.mycample.util.rxbus.RxBusEvent;


/**
 * Created by root on 16-5-17.
 */
public abstract class DoSomthing {

    private String event = null;
    private RxBusEvent rxBusEvent = null;

    public DoSomthing(@Nullable String event){
        this.event = event;
    }

    /**
     * 开始链循环
     * */
    public boolean startDo(RxBusEvent rxBusEvent){
        setRxBusEvent(rxBusEvent);
        if(null == event){
            return dosomthing();
        }else{
            if(filter()){
                //匹配成功
                Log.e("DoSomthing",rxBusEvent.getEvent());
                return dosomthing();
            }
            return false;
        }
    }

    public boolean filter(){
        if(null == event){
            return true;
        }
        return event.equals(rxBusEvent.getEvent());
    }

    public abstract boolean dosomthing();

    public RxBusEvent getRxBusEvent() {
        return rxBusEvent;
    }

    public void setRxBusEvent(RxBusEvent rxBusEvent) {
        this.rxBusEvent = rxBusEvent;
    }
}

以下为示例代码:

package com.mitnick.root.mymusic.fragment.defaultfragment;

import android.app.Service;
import android.content.Intent;
import android.media.AudioManager;
import android.media.MediaPlayer;
import android.os.IBinder;
import android.support.annotation.Nullable;
import android.util.Log;

import com.mitnick.root.mymusic.APP;
import com.mitnick.root.mymusic.rxbus.RxBus;
import com.mitnick.root.mymusic.rxbus.RxBusEvent;
import com.mitnick.root.mymusic.rxbus.responslink.DoSomthing;
import com.mitnick.root.mymusic.rxbus.responslink.HandlerFactory;
import com.mitnick.root.mymusic.rxbus.responslink.ResponseNode;

import java.io.IOException;

import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

/**
 * Created by root on 16-5-9.
 */
public class MusicService extends Service {

    private APP app;
    private MediaPlayer mediaPlayer;
    public Subscription rxSubscription;//订阅RxBus事件

    @Override
    public void onCreate() {
        super.onCreate();

        final ResponseNode responseNode = HandlerFactory.getHandler();

        app = (APP) getApplication();
        mediaPlayer = new MediaPlayer();
        mediaPlayer.setAudioStreamType(AudioManager.STREAM_MUSIC);
        if(null != mediaPlayer && !mediaPlayer.isPlaying()){
            try {
                mediaPlayer.setDataSource(app.getMusicPath());
                mediaPlayer.prepare();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        final Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                //耗时任务
                if(null != mediaPlayer){
                    while(mediaPlayer.isPlaying()){
                        RxBus.getDefault().post(new RxBusEvent("MusicService", "upDatePosition", new Integer(mediaPlayer.getCurrentPosition())));
                    }
                }
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());

        responseNode.setHandler(new ResponseNode(new DoSomthing("play") {
            @Override
            public boolean dosomthing() {
                if(null != mediaPlayer && !mediaPlayer.isPlaying()){
                    int total = mediaPlayer.getDuration();
                    mediaPlayer.start();
                    RxBus.getDefault().post(new RxBusEvent("MusicService", "isPlaying", new Integer(total)));
                    observable.subscribe(new Subscriber<Integer>() {
                        @Override
                        public void onCompleted() {

                        }

                        @Override
                        public void onError(Throwable e) {

                        }

                        @Override
                        public void onNext(Integer integer) {

                        }
                    });
                }
                return filter();
            }
        })).setHandler(new ResponseNode(new DoSomthing("pause") {
            @Override
            public boolean dosomthing() {
                if(null != mediaPlayer && mediaPlayer.isPlaying()){
                    mediaPlayer.pause();
                    RxBus.getDefault().post(new RxBusEvent("MusicService", "isPauseed", null));
                }
                return filter();
            }
        })).setHandler(new ResponseNode(new DoSomthing("stop") {
            @Override
            public boolean dosomthing() {
                if(null != mediaPlayer && mediaPlayer.isPlaying()){
                    mediaPlayer.stop();
                    RxBus.getDefault().post(new RxBusEvent("MusicService", "isStoped", null));
                }
                return filter();
            }
        })).setHandler(new ResponseNode(new DoSomthing("next") {
            @Override
            public boolean dosomthing() {
                if(null != mediaPlayer){
                    if(mediaPlayer.isPlaying()){
                        mediaPlayer.stop();
                    }
                }
                return filter();
            }
        })).setHandler(responseNode);

        rxSubscription = RxBus.getDefault().toObserverable("DefaultFragment", RxBusEvent.class)
                .filter(new Func1<RxBusEvent, Boolean>() {
                    @Override
                    public Boolean call(RxBusEvent rxBusEvent) {
                        boolean flag = false;
                        if(rxBusEvent.getTag().equals("DefaultFragment")){
                            flag = true;
                        }
                        return flag;
                    }
                })
                .subscribe(new Action1<RxBusEvent>() {
                               @Override
                               public void call(final RxBusEvent rxBusEvent) {
                                   responseNode.operator(rxBusEvent);//开启职责链处理事件
                               }
                           },
                        new Action1<Throwable>() {
                            @Override
                            public void call(Throwable throwable) {
                                // TODO: 处理异常
                                Log.e("事件总线",throwable.toString());
                            }
                        });
    }

    @Override
    public void onDestroy() {
        if(!rxSubscription.isUnsubscribed()){
            rxSubscription.unsubscribe();
        }
        super.onDestroy();
    }

    @Override
    public int onStartCommand(Intent intent, int flags, int startId) {

        return super.onStartCommand(intent, flags, startId);
    }

    @Nullable
    @Override
    public IBinder onBind(Intent intent) {
        return null;
    }
}

 

    原文作者:移动开发
    原文地址: https://my.oschina.net/tannotour/blog/683091
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞