RxJava1.0源码解读二

RxJava1.0源码解读二

用一张图总结上一篇博文 RxJava1.0源码解读一:

RxJava从Observable开始,线程调度,各种操作符都是通过创建新的Observable和OnSubscribe来传递原始Observable, 通过调用最后一个Observer的onSubscribe.call(subscriber)方法,
又经过递归回到原始Observable的onSubscribe.call(subscriber)开始任务流。这篇文章分析一下任务流的传递过程, 任务流的线程调度, 到最后回调到subscriber是怎么工作的。

RxJava最后一定会调用subscribe(observer)来订阅观察者.
subscribe的重载方法虽然有很多种,但最终都是subscribe(observer), 不同形式都是对observer的包装形成的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
1. observable.subscribe();

2. observable.subscribe(new Action1<String>() {
@Override
public void call(String s) {

}
});

3. observable.subscribe(observer)

4.observable.subscribe(new Action1<String>() {
@Override
public void call(String s) {

}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {

}
}, new Action0() {
@Override
public void call() {

}
});

从上一篇博文 RxJava1.0源码解读一得知最后一个Observable是new Observable(new OnSubscribeLift(onSubscribe, operator))
operator是new OperatorObserveOn(scheduler, delayError, bufferSize)

从最后一个Observable的onSubscribe.call(subscriber)开始,实质是调用OnSubscribeLift的call方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
st.onStart();
parent.call(st);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
o.onError(e);
}
}

hook这里不过多分析了,把hook.onLift(operator).call(o) 看成operator.call(o)就可以了.(hook默认就是这样)

OperatorObserveOn的call方法:

1
2
3
4
5
6
7
8
public Subscriber<? super T> call(Subscriber<? super T> child){

....

ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}

OperatorObserveOn的call方法创建了ObserveOnSubscriber对象返回去,ObserveOnSubscriber封装了对subscriber的线程调度
由上面OnSubscribeLift的call方法可知,接着调用parent.call(st); parent是指上一个OnSubscribe, st就是ObserveOnSubscriber.

从上一篇博文 RxJava1.0源码解读一可知,接着上一个Observable是new Observable(new OnSubscribeLift(onSubscribe, operator))
operator是new OperatorFilter(predicate).

所以在这里上一个OnSubscribe是指new OnSubscribeLift(onSubscribe, operator), operator是OperatorFilter,进行过滤操作.
同理接着会执行:
Subscriber st = operatorFilter.call(subscriber);
parent.call(st);

OperatorFilter的call方法如下:

1
2
3
4
5
6
@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
FilterSubscriber<T> parent = new FilterSubscriber<T>(child, predicate);
child.add(parent);
return parent;
}

创建一个FilterSubscriber对象,传递给上一个的OnSubscribe

从上一篇博文 RxJava1.0源码解读一可知,在上一个Observable是new Observable(new OperatorSubscribeOn(this, scheduler))
OperatorSubscribeOn的call方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

@Override
public void call(final Subscriber<? super T> subscriber) { //传进来的subscriber是FilterSubscriber
final Worker inner = scheduler.createWorker();
subscriber.add(inner);

inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();

Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}

@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}

@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}

@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
};

source.unsafeSubscribe(s);
}
});
}

从上面的代码可知用scheduler创建worker对线程进行调度, 接着创建 Subscriber s = new Subscriber(subscriber) 并执行source.unsafeSubscribe(s);
这里source是最上面的Observable了,也就是数据源了,也就是 Observable.just(“A”, “B”),即是new Observable( new OnSubscribeFromArray(array) )

执行source.unsafeSubscribe(s) 最终会调用 onSubscribeFromArray.call(s)

1
2
3
4
@Override
public void call(Subscriber<? super T> child) {
child.setProducer(new FromArrayProducer<T>(child, array));
}

child是OperatorSubscribeOn的call方法里局部内部类Subscriber,接着调用FilterSubscriber.setProducer(p)

1
2
3
4
5
@Override
public void setProducer(Producer p) {
super.setProducer(p);
actual.setProducer(p);
}

actual是指前面的ObserveOnSubscriber

Loading comments box needs to over the wall