RxJava之二

运算符

map

map 的基本作用就是将一个 Observable 通过某种函数,转换为另一种 Observable 。

Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
observableEmitter.onNext(1);
observableEmitter.onNext(2);
observableEmitter.onNext(3);
}
}).map(new Function<Integer, String>() {

@Override
public String apply(Integer integer) throws Exception {
return "This is result " + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("accept: " + s);
}
});

输出:

accept: This is result 1
accept: This is result 2
accept: This is result 3

zip

zip 专用于合并事件,该合并不是连接,而是两两配对,也就意味着,最终配对出的 Observable 发射事件数目只和少的那个相同。

Observable<String> stringObservable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
if (!observableEmitter.isDisposed()) {
observableEmitter.onNext("A");
observableEmitter.onNext("B");
observableEmitter.onNext("C");
}
}
});

Observable<Integer> integerObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
if (!observableEmitter.isDisposed()) {
observableEmitter.onNext(1);
observableEmitter.onNext(2);
observableEmitter.onNext(3);
observableEmitter.onNext(4);
observableEmitter.onNext(5);
}
}
});

Observable.zip(stringObservable, integerObservable, new BiFunction<String, Integer, String>() {

@Override
public String apply(String s, Integer integer) throws Exception {
return s + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("accept: " + s);
}
});

输出:

accept: A1
accept: B2
accept: C3

concat

concat 用于将两个 Observable 合并成一个。

Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5, 6))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("accept: " + integer);
}
});

输出:

accept: 1
accept: 2
accept: 3
accept: 4
accept: 5
accept: 6

flatMap

flatMap 可以把一个 Observable 通过某种方法转换为多个 Observables,然后再把这些分散的 Observables 装进一个单一的 Observable 。 需要注意的是,flatMap 并不能保证事件的顺序,如果需要保证,需要用到下面要讲的 ConcatMap。

Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
observableEmitter.onNext(1);
observableEmitter.onNext(2);
observableEmitter.onNext(3);
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
List<String> list = new ArrayList<>();
for (int i = 0; i < integer; i++) {
list.add("I am value " + integer);
}
return Observable.fromIterable(list);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("accept: " + s);
}
});

输出:

accept: I am value 1
accept: I am value 2
accept: I am value 2
accept: I am value 3
accept: I am value 3
accept: I am value 3

concatMap

concatMap 与 FlatMap 的唯一区别就是 concatMap 保证了顺序。

distinct

顾名思义,distinct 会对事件进行滤重。

Observable.just(1, 2, 2, 3, 3, 3, 4, 4, 4, 4)
.distinct()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("accept: " + integer);
}
});

输出:

accept: 1
accept: 2
accept: 3
accept: 4

filter

filter 可以接受一个参数,让其过滤掉不符合我们条件的值。

Observable.just(1, 20, 65, -56, 7, 19)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer > 10;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("accept: " + integer);
}
});

输出:

accept: 20
accept: 65
accept: 19

buffer

buffer 操作符接受两个参数,buffer(count,skip),作用是将 Observable 中的数据按 skip (步长) 分成最大不超过 count 的 buffer ,然后生成一个 Observable 。

Observable.just(1, 2, 3, 4, 5, 6)
.buffer(3, 2)
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
System.out.println("accept: " + integers);
}
});

输出:

accept: [1, 2, 3]
accept: [3, 4, 5]
accept: [5, 6]

timer

timer 相当于一个定时任务。在 1.x 中它还可以执行间隔逻辑,但在 2.x 中此功能被交给了 interval,下一个会介绍。但需要注意的是,timer 和 interval 均默认在新线程。

interval

如同我们上面可说,interval 操作符用于间隔时间执行某个操作,其接受三个参数,分别是第一次发送延迟,间隔时间,时间单位。

skip

skip 作用就和字面意思一样,接受一个 long 型参数 count ,代表跳过 count 个数目开始接收。

take

take,接受一个 long 型参数 count ,代表至多接收 count 个数据。

just

just,没什么好说的,其实在前面各种例子都说明了,就是一个简单的发射器依次调用 onNext() 方法。