RxJava之一

RxJava 是一个基于事件流、实现异步操作的库。

RxJava 基于观察者模式实现,涉及被观察者、观察者、订阅关系等等概念,比较绕。可以类比于消息队列中的一些概念,RxJava 中被观察者产生事件,类比于消息队列中的生产者;观察者处理事件,类比于消息队列中的消费者;订阅关系联系其被订阅者和订阅者,类比于消息队列中的某一话题。

用一个简单的例子来表示这种关系:

Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
observableEmitter.onNext(1);
observableEmitter.onNext(2);
observableEmitter.onNext(3);
}
});

Observer observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable disposable) {
System.out.println("subscribe");
}

@Override
public void onNext(Integer value) {
System.out.println(value);
}

@Override
public void onError(Throwable throwable) {
System.out.println(throwable.getMessage());
}

@Override
public void onComplete() {
System.out.println("complete");
}
};

observable.subscribe(observer);

这段代码可以连起来写,就成为 RxJava 中著名的链式操作:

Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
observableEmitter.onNext(1);
observableEmitter.onNext(2);
observableEmitter.onNext(3);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable disposable) {
System.out.println("subscribe");
}

@Override
public void onNext(Integer value) {
System.out.println(value);
}

@Override
public void onError(Throwable throwable) {
System.out.println(throwable.getMessage());
}

@Override
public void onComplete() {
System.out.println("complete");
}
});

上面代码中用到了 ObservableEmitter 这个类,顾名思义,是一个事件发射器。它可以发射三种类型的时间,通过调用 onNext(T value)onComplete()onError(Throwable error) 就可以分别发出 next 事件、 complete 事件和 error 事件。

这三类事件遵循如下规则:

  • Observable可以发送无限个onNext事件,Observer也可以接收无限个onNext事件。
  • Observable发送了一个onComplete事件后,可以继续发送事件;而Observer接收到一个onComplete事件之后将不再接收事件。
  • Observable发送了一个onError事件后,可以继续发送事件;而Observer接收到一个onError事件之后将不再接收事件。
  • Observable可以不发送onComplete或onError。
  • 关键的是onComplete和onError必须唯一并且互斥,即不能发送多个onComplete事件,也不能发送多个onError事件,也不能先发一个onComplete事件、再发一个onError事件,反之亦然。

另一个概念是 Disposable 。这个单词的字面意思是一次性用品,用完即可丢弃的。在 RxJava 中,可以将它理解为事件发送者和接收者之间的一个控制开关,当调用它的 dispose 方法后,它就会将事件流切断,从而导致事件接收者接收不到事件。注意,这个方法并不会阻止事件的发送者继续发送事件。

一个 Disposeable 的例子:

Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
System.out.println("emit 1");
observableEmitter.onNext(1);
System.out.println("emit 2");
observableEmitter.onNext(2);
System.out.println("emit 3");
observableEmitter.onNext(3);
}
}).subscribe(new Observer<Integer>() {
Disposable disposable;
int i;

@Override
public void onSubscribe(Disposable disposable) {
System.out.println("subscribe");
this.disposable = disposable;
}

@Override
public void onNext(Integer value) {
System.out.println(value);
i++;
if (i == 2) {
System.out.println("dispose");
disposable.dispose();
System.out.println("isDisposed:" + disposable.isDisposed());
}
}

@Override
public void onError(Throwable throwable) {
System.out.println(throwable.getMessage());
}

@Override
public void onComplete() {
System.out.println("complete");
}
});

运行结果如下:

subscribe
emit 1
1
emit 2
2
dispose
isDisposed:true
emit 3