RxJava1.0结构初探(一)

代码栗子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Observable<String> observable = Observable.just("A", "B");
observable = observable.subscribeOn(Schedulers.newThread());
observable = observable.filter(new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
Log.d(TAG, "filter: " + getCurrentThreadName());
return s.equals("A");
}
});
observable = observable.observeOn(AndroidSchedulers.mainThread());
Subscription subscription = observable.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: " + getCurrentThreadName());
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + getCurrentThreadName());
}

@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + getCurrentThreadName());
Log.d(TAG, "onNext: ---" + s);
}
});

Log:

1
2
3
4
5
MainActivity: filter: RxNewThreadScheduler-1
MainActivity: filter: RxNewThreadScheduler-1
MainActivity: onNext: main
MainActivity: onNext: ---A
MainActivity: onCompleted: main

看log可以知道filter的操作在名字叫RxNewThreadScheduler-1的线程进行,s.equals(“A”)是过滤条件,
所以在onNext方法里只有A输出,并且onNext和onCompleted方法是在UI线程。

创建Observer过程

Observable observable = Observable.just(“A”, “B”);
首先创建Observable, Observable类提供很多静态的方便方法,用于创建。

Observable只有一个构造函数,有一个OnSubscribe的成员变量,
protected Observable(OnSubscribe f) {
this.onSubscribe = f;
}

OnSubscribe是一个接口,只有Call方法, 且有一个订阅者Subscriber的参数
public interface OnSubscribe extends Action1> {
void call(Subscriber<? super T> t);
}

跟踪源码发现Observable的just方法实际是new Observable( new OnSubscribeFromArray(array) ).

OnSubscribeFromArray的部分源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
public final class OnSubscribeFromArray<T> implements OnSubscribe<T> {
final T[] array;
public OnSubscribeFromArray(T[] array) {
this.array = array;
}

@Override
public void call(Subscriber<? super T> child) {
child.setProducer(new FromArrayProducer<T>(child, array));
}

}

接着看observable = observable.subscribeOn(Schedulers.newThread());

subscribeOn(scheduler)方法指定工作线程,subscribeOn方法本质又是创建了一个Observable:
传入了new OperatorSubscribeOn(this, scheduler)) 这个OnSubscribe,这里把this传到
OperatorSubscribeOn里,OperatorSubscribeOn是一个包装,实现了OnSubscribe接口同时包装了一个Observable.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

final Scheduler scheduler;
final Observable<T> source;

public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
this.source = source;
}

@Override
public void call(final Subscriber<? super T> subscriber) {
// 利用scheduler对Subscriber进行线程的调度
}

过滤操作符的过滤过程

Observable的filter的方法源码:

1
2
3
4
5
6
7
8

public final Observable<T> filter(Func1<? super T, Boolean> predicate) {
return lift(new OperatorFilter<T>(predicate));
}

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
}

filter的过程又是创建了Observable, 传入了名叫OnSubscribeLift的OnSubscribe实现。还有
创建OperatorFilter的操作符

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {

final OnSubscribe<T> parent;

final Operator<? extends R, ? super T> operator;

public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
this.parent = parent;
this.operator = operator;
}

@Override
public void call(Subscriber<? super R> o) {
// 利用operator, parent 这两个成员变量,对 Subscriber进行过滤,
// 这里parent是上一个Observable里的成员变量onSubscribe, 在这里就是OperatorSubscribeOn的引用
}

....
}

// Operator操作符接口
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
Subscriber<? super R> call(Subscriber<? super T> t);
}

可观察者 订阅 观察者

observable.observeOn(AndroidSchedulers.mainThread());
这句代码是对观察者执行线程的调度.

1
2
3
4
5
6
7
8
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
...
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
}

过程跟上面filter的过程一样,不同的是传入了OperatorObserveOn操作符.

最后到Observable的subscribe(observer)方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public final Subscription subscribe(final Observer<? super T> observer) {
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
return subscribe(new ObserverSubscriber<T>(observer));
}

public final class ObserverSubscriber<T> extends Subscriber<T> {
final Observer<? super T> observer;

public ObserverSubscriber(Observer<? super T> observer) {
this.observer = observer;
}

@Override
public void onNext(T t) {
observer.onNext(t);
}

@Override
public void onError(Throwable e) {
observer.onError(e);
}

@Override
public void onCompleted() {
observer.onCompleted();
}
}

public interface Observer<T> {
void onCompleted();
void onError(Throwable e);
void onNext(T t);
}

从上面代码看,observer最后会被包装成Subscriber.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}


// 只截取了部分关键代码
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {

subscriber.onStart();

if (!(subscriber instanceof SafeSubscriber)) {
subscriber = new SafeSubscriber<T>(subscriber);
}

try {
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
if (subscriber.isUnsubscribed()) {
RxJavaPluginUtils.handleException(hook.onSubscribeError(e));
} else {
subscriber.onError(hook.onSubscribeError(e));
}
}
}

看hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber)这句代码
用observable的成员变量onSubscribe, 调用onSubscribe.call(subscriber)启动Observable序列.

Subscriber继承了Observer和Subscription, 所以return hook.onSubscribeReturn(subscriber),简单来讲就是直接返回subscriber.

最后总结一下,Observable里有个成员变量onSubscribe,onSubscribe是个接口,就有一个call方法,参数为Subscriber。call方法的实现是Observable的真正操作内容。
从Observable调用静态just方法创建一个Observable,subscribeOn(schedulers)也是创建一个Observable并把前一个Observable的onSubscribe传递下去,
filter操作符又是创建一个Observable并把前一个Observable的onSubscribe传递下去,通过不断递归传递onSubscribe,直到调用subscribe(observable),最后调用
onSubscribe.call(subscriber)启动Observable序列,最终又会递归到第一个onSubscribe,执行call方法,然后按过程回调observer的onNext, onCompleted,onError.

下一篇继续探索具体OnSubscribe的call(subscriber)方法实现.

Loading comments box needs to over the wall