小试RxJava原理

Posted by 江湖迈杰的博客 on November 14, 2016

RxJava的Hello World 在开始介绍RxJava的源码之前,我们先来写一个RxJava的Hello world吧!


Observable
        .create(new Observable.OnSubscribe<Integer>() {
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
                subscriber.onCompleted();
            }
        })
        .subscribe(new Subscriber<Integer>() {
            public void onCompleted() {
                System.out.println("onCompleted");
            }
            public void onError(Throwable e) {
                System.out.println("Error");
            }
            public void onNext(Integer integer) {
                System.out.println(integer);
            }
        });

这是我们熟悉的RxJava链式调用,在源码分析上比较麻烦,无法很明确的声明每一个代码块的含义。在这里,我们采用非链式调用的方式对这个Hello world进行重构(我们采用lambda 表达式)。

//创建OnSubscribe,在call()中对应数据源产生形式
Observable.OnSubscribe<Integer> onSubscribe = (subscriber) -> {
    subscriber.onNext(1);
    subscriber.onNext(2);
    subscriber.onNext(3);
    subscriber.onCompleted();
};
//创建Subscriber,定义针对数据流的响应
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
    public void onCompleted() {
        System.out.println("onCompleted");
    }
    public void onError(Throwable e) {
        System.out.println("Error");
    }
    public void onNext(Integer integer) {
        System.out.println(integer);
    }
};
//根据OnSubscribe创建Observable
Observable<Integer> observable = Observable.create(onSubscribe);
//创建observable、subscriber的注册关系
observable.subscribe(subscriber);

Hello World的简单分析

简单的说,RxJava的原理就是观察者模式,不过RxJava比观察者模式强大多了。在RxJava,Observable相当于被观察者,它是事件的源头,而OnSubscribe则是定义数据源如何发送事件,或者如何发送什么样的数据;Subscriber则是观察者(在代码实现上,Subscriber实现了接口Observer),定义了接收数据后对应的反应。observable.subscribe(subscriber)将两者进行了关联:即告诉Observable,它有一个Subscriber;同时触发OnSubscribe.onCall(),开启整个事件流。 如下图,明显问题的关键就在于observable.subscribe(subscriber)。

observable.subscribe(subscriber)

接下来,我们对observable.subscribe(subscriber)进行分析,对应的代码在Observable.java中。

注:如果没有特殊说明,我们使用的RxJava的版本为1.2.2


public final Subscription subscribe(Subscriber<? super T> subscriber) {
    return Observable.subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
    //检查subscriber、observable.onSubscribe是否为空
    if (subscriber == null) {
        throw new IllegalArgumentException("subscriber can not be null");
    }
    if (observable.onSubscribe == null) {
        throw new IllegalStateException("onSubscribe function can not be null.");
    }
    //调用subscriber的onStart()
    subscriber.onStart();
    //将subscriber封装成SafeSubscriber
    if (!(subscriber instanceof SafeSubscriber)) {
        subscriber = new SafeSubscriber<T>(subscriber);
    }
    try {
        //通过RxJavaHook对observable, observable.onSubscribe进行封装,同时调用Observable.OnSubscribe类的call(Subscriber)
        RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
        //通过RxJavaHook对subscriber进行封装,并返回结果
        return RxJavaHooks.onObservableReturn(subscriber);
    } catch (Throwable e) {
        //判断异常,必要时抛出
        Exceptions.throwIfFatal(e);
        //当Subscriber不再订阅时,有RxJavaHook负责处理
        if (subscriber.isUnsubscribed()) {
            RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
        } else {
            // if an unhandled error occurs executing the onSubscribe we will propagate it
            try {
                subscriber.onError(RxJavaHooks.onObservableError(e));
            } catch (Throwable e2) {
                //判断异常,必要时抛出
                Exceptions.throwIfFatal(e2);
                //封装异常并抛出
                RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                RxJavaHooks.onObservableError(r);
                throw r; 
            }
        }
        //取消订阅
        return Subscriptions.unsubscribed();
    }
}

核心的代码就是RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber)简化成observable.onSubscribe.call(subscriber)。这才是关键。这里的subscriber其实对应于我们编写的Subscriber。那么,很明显,Hello world的调用关系如下,看上去是不是很简单。

observable.subscribe(subscriber);
-->onSubscribe.onCall(Subscriber)
    -->subscribe.onNext(1)       ==>  你写的subscriber.onNext(1)
    -->subscribe.onNext(2)       ==>  你写的subscriber.onNext(2)
    -->subscribe.onNext(3)       ==>  你写的subscriber.onNext(3)
    -->subscribe.onCompleted()   ==>  你写的subscriber.onCompleted()

说的再直白一点,为什么RxJava可以完成调用–响应呢?还记得你在Hello World里定义的Observable.OnSubscribe吗?看看它的onCall方法的参数的类型是不是和你定义的观察者Subscriber是同一个类型,都是rx.Subscriber。懂了吧!简单的理解,因为你在call(subscriber)中调用subscribe.onNext(1),所以你写的subscriber的onNext(Integer)方法会被调用。所以,你认为将你写的 subscriber变成了Observable.OnSubscribe里方法call(Subscriber)的一个参数.也只是因为你调用了observable.subscribe(subscriber),才有了后面onNext()、onCompleted()、onError()的一系列方法调用

map

在解释完RxJava的Hello world,下面我们分析一下map,实例代码(非链式调用)如下:

Observable.OnSubscribe<Integer> onSubscribe = (subscriber) -> {
    subscriber.onNext(1);
    subscriber.onNext(2);
    subscriber.onNext(3);
    subscriber.onCompleted();
};
Func1<Integer, String> func1 = (integer) -> {
    return "Integer " + integer;
};
Subscriber<String> subscriber = new Subscriber<String>() {...};
Observable<Integer> observable = Observable.create(onSubscribe);
//分析重点
Observable<String> mapObservable = observable.map(func1);
mapObservable.subscribe(subscriber);

要了解为什么map可以实现链式调用,那么我们需要深入到源码那一层进行分析。不过在此之前,我们可以想一下如果是我们进行开发,那么应该如何开发。这里提供一个实现思路:在这里,mapObservable其实承担着双重责任,它既是 observable对应的Subscriber(注册者),也是subscriber的Observable(观察对象)。

MapSubscriber mapSubscriber = new MapSubscriber(Func1);
Observable<Integer> observable = Observable.create(onSubscribe);
Observable<String> mapObservable = Observable.create((subscriber)->{/*doSomething*/});
mapObservable.subscribe(subscriber);
observable.subscribe(mapSubscriber);

查看源码,我们发现RxJava的具体实现如下:

Observable.java


public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return create(new OnSubscribeMap<T, R>(this, func));
}
public static <T> Observable<T> create(OnSubscribe<T> f) {
    return new Observable<T>(RxJavaHooks.onCreate(f));
}

OnSubscribeMap.java

public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {
    final Observable<T> source;
    final Func1<? super T, ? extends R> transformer;
    public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
        this.source = source;
        this.transformer = transformer;
    }
    @Override
    public void call(final Subscriber<? super R> o) {
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        source.unsafeSubscribe(parent);
    }
    static final class MapSubscriber<T, R> extends Subscriber<T> {
        final Subscriber<? super R> actual;
        final Func1<? super T, ? extends R> mapper;
        boolean done;
        public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }
        @Override
        public void onNext(T t) {
            R result;
            try {
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }
            actual.onNext(result);
        }
        @Override
        public void onError(Throwable e) {
            if (done) {
                RxJavaHooks.onError(e);
                return;
            }
            done = true;
            actual.onError(e);
        }
        @Override
        public void onCompleted() {
            if (done) {
                return;
            }
            actual.onCompleted();
        }
        @Override
        public void setProducer(Producer p) {
            actual.setProducer(p);
        }
    }
}