1. RxJava中常用的Operator
1.1. map
将源ObservableSource里面的每一条数据通过转换算法一一转换为另一条数据。
Observable<Integer> oi = Observable.fromArray(1, 2);
oi.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "MapString#" + integer.intValue();
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
print("map -> accept: " + s);
}
});
Result:
map -> accept: MapString#1
map -> accept: MapString#2
1.2. flatMap
将源ObservableSource里面的每一条数据转换为另一个ObservableSource,然后将它们合并到一个Observable里面后返回。
Observable<Integer> oi = Observable.fromArray(1, 2);
oi.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
int v = integer.intValue();
return Observable.fromArray(v * 10 + "", "" + (v * 10 + 5));
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
print("flatMap -> accept: " + s);
}
});
Result:
flatMap -> accept: 10
flatMap -> accept: 15
flatMap -> accept: 20
flatMap -> accept: 25
map和flatMap都是一对一的转换,但flatMap转换是把ObservableSource数据类型转换成另外一种ObservableSource数据类型合并后返回,所以flatMap转换可以将数据展开操作转换为多条数据。
下面例子是将ObservableSource<List<String>>
数据源集合展开转换为ObservableSource<String>
数据源。
Observable<List<String>> oss = Observable.fromArray(list); // "A", "B", "", "C"
oss.flatMap(new Function<List<String>, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(List<String> strings) {
for (int i = strings.size() - 1; i >= 0; i--) {
if (strings.get(i).isEmpty()) {
strings.remove(i);
}
}
return Observable.fromIterable(strings);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
print("flatMap -> accept: " + s);
}
});
Result:
flatMap -> accept: A
flatMap -> accept: B
flatMap -> accept: C
1.3. flatMapIterable
将源ObservableSource里面的每一条数据转换为一个Iterable集合,合并到一个Observable里面后返回。
Observable<Integer> oi = Observable.fromArray(1, 2);
oi.flatMapIterable(new Function<Integer, Iterable<String>>() {
@Override
public Iterable<String> apply(Integer integer) throws Exception {
int v = integer.intValue();
return Arrays.asList(v * 10 + "", "" + (v * 10 + 5));
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
print("flatMapIterable -> accept: " + s);
}
});
Result:
flatMapIterable -> accept: 10
flatMapIterable -> accept: 15
flatMapIterable -> accept: 20
flatMapIterable -> accept: 25
Map相关类似的操作还有:
- flatMapSingle: 将源数据遍历转换为SingleSource数据类型,需要关注处理结果。
- flatMapCompletable: 对数据源进行遍历处理,订阅者只关注处理是否完成,不需要结果。
- flatMapMaybe: 基本同flatMapCompletable。
1.4. filter
过滤数据源,如果需要保留数据,则在test方法中返回true。
Observable<Integer> oi = Observable.fromArray(1, 2);
oi.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer.intValue() == 1;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
print("filter -> accept: " + integer.intValue());
}
});
Result:
filter -> accept: 1
1.5. compose
将Source源数据组合在一起转换为集合数据类型并返回,和map类的方法不同的是,该转换的apply方法将所有的Source都合并到apply方法的参数中,而map类的方法则是将源数据一个一个的传入apply方法中,在方法体中进行转换处理。
Observable<Integer> oi = Observable.fromArray(2, 4);
oi.compose(new ObservableTransformer<Integer, List<Integer>>() {
@Override
public ObservableSource<List<Integer>> apply(Observable<Integer> upstream) {
Iterator<Integer> ii = upstream.blockingIterable().iterator();
List<Integer> integers = new ArrayList<Integer>();
while (ii.hasNext()) {
integers.add(ii.next());
}
return Observable.just(integers);
}
}).subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
print("compose -> accept: " + integers.get(0).intValue());
print("compose -> accept: " + integers.get(1).intValue());
}
});
Result:
compose -> accept: 2
compose -> accept: 4
另外一个例子
List<String> list; // A, B, C, ''
Single<List<String>> ss = Single.just(list);
Single<String> stringSingle = ss.compose(new SingleTransformer<List<String>, String>() {
@Override
public SingleSource<String> apply(Single<List<String>> upstream) {
String item = "";
List<String> lists = upstream.blockingGet();
for (int i = 0; i < lists.size(); i++) {
item += "," + lists.get(i);
}
return Single.just(item);
}
});
stringSingle.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
print("compose -> accept: " + s);
}
});
Result:
compose -> accept: ,A,B,C,
1.6. collect
将Source数据类型转换为列表数据类型,它返回的数据类型是Single。
Observable.fromArray("A", "B", "C").collect(new Callable<List<String>>() {
@Override
public List<String> call() throws Exception {
return new ArrayList<>(4);
}
}, new BiConsumer<List<String>, String>() {
@Override
public void accept(List<String> strings, String s) throws Exception {
strings.add(s);
}
}).subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> strings) throws Exception {
print(strings);
}
});
Result:
[A, B, C]
1.7. to | toObservable
Single 和 Observable 数据源互相可以转换,Single.toObservable将数据源转换为Observable,Observable.to将数据源转换为Single。
List<String> list; // A, B
Single<List<String>> ss = Single.just(list);
// 转换为Observable
Observable<List<String>> ols = ss.toObservable();
ols.subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> strings) throws Exception {
for (String s : strings) {
print("toObservable -> accept: " + s);
}
}
});
// 转换为Single
ols.to(new Function<Observable<List<String>>, Single<List<String>>>() {
@Override
public Single<List<String>> apply(Observable<List<String>> listObservable) {
return Single.just(listObservable.blockingSingle());
}
}).subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> strings) throws Exception {
for (String s : strings) {
print("to -> accept: " + s);
}
}
});
Result:
toObservable -> accept: A
toObservable -> accept: B
to -> accept: A
to -> accept: B
1.8. toList
将源数据转换为List合并后返回一个Single<List<?>>
。
Observable<Integer> oi = Observable.fromArray(1, 2);
oi.toList().subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
print("toList -> accept: " + integers);
}
});
Result:
toList -> accept: [1, 2]
另外一个例子
Observable<Integer> oi = Observable.fromArray(1, 2, 3, 4);
oi.toList(new Callable<List<Integer>>() {
@Override
public List<Integer> call() throws Exception {
return new ArrayList<>();
}
}).subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
print("toList -> accept: " + integers);
}
});
Result:
toList -> accept: [1, 2, 3, 4]
1.9. toMap
将Observable中的源数据转换为Map。
Observable<Integer> oi = Observable.fromArray(1, 2);
oi.toMap(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "" + integer.intValue(); // key
}
}, new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "" + integer.intValue() * 10; // value
}
}
).subscribe(new Consumer<Map<String, String>>() {
@Override
public void accept(Map<String, String> stringStringMap) throws Exception {
print(stringStringMap.get("1")); // 10
print(stringStringMap.get("2")); // 20
}
});
1.10. as
将Observable中的源数据直接转换获取具体的数据。
Observable<Integer> oi = Observable.fromArray(1, 2, 3, 4);
String str = oi.as(new ObservableConverter<Integer, String>() {
@Override
public String apply(Observable<Integer> upstream) {
final StringBuffer sb = new StringBuffer();
upstream.blockingForEach(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
sb.append(integer.intValue() + ",");
}
});
return sb.toString();
}
});
// str: 1,2,3,4,
1.11. take | takeLast
take从数据源中从0开始取数据,takeLast表示从数据源中取后几个数据。
Observable<Integer> oi = Observable.fromArray(1, 2, 3, 4);
oi.take(2).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
print("take -> accept: " + integer.intValue());
}
});
oi.takeLast(2).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
print("takeLast -> " + integer.i,ntValue());
}
});
Result:
take -> accept: 1
take -> accept: 2
takeLast -> 3
takeLast -> 4
1.12. takeWhile
takeWhile表示取到某个数据之后就不再取数据了,test方法返回false表示停止遍历。
Observable<Integer> oi = Observable.fromArray(1, 2, 3, 4);
oi.takeWhile(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
print("takeWhile -> test: " + integer.intValue());
if (integer.intValue() == 3) {
return false;
}
return true;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
print("takeWhile -> accept: " + integer.intValue());
}
});
Result:
takeWhile -> test: 1
takeWhile -> accept: 1
takeWhile -> test: 2
takeWhile -> accept: 2
takeWhile -> test: 3
1.13. takeUntil
takeUntil表示一直取数据直到取到某个数据后就不再取数据了,test方法返回true表示停止遍历获取数据。
Observable<Integer> oi = Observable.fromArray(1, 2, 3, 4);
oi.takeUntil(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
print("takeUntil -> test: " + integer.intValue());
if (integer.intValue() == 2) {
return true;
}
return false;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
print("takeUntil -> accept: " + integer.intValue());
}
});
Result:
takeUntil -> accept: 1
takeUntil -> test: 1
takeUntil -> accept: 2
takeUntil -> test: 2
1.14. lastOrError
lastOrError获取最后一个元素,firstOrError获取第一个元素。
Observable<Integer> oi = Observable.fromArray(1, 2);
oi.lastOrError().subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
print("lastOrError -> accept: " + integer); // 2
}
});
oi.firstOrError().subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
print("firstOrError -> accept: " + integer); // 1
}
});
1.15. zip
将两个Observable数据源打包,两个数据源分别同时使用apply方法计算出结果,最终转换成另外一种数据源。
Observable<Integer> oi1 = Observable.fromArray(1, 3);
Observable<Integer> oi2 = Observable.fromArray(2, 4, 5);
oi1.zipWith(oi2, new BiFunction<Integer, Integer, String>() {
@Override
public String apply(Integer integer, Integer integer2) throws Exception {
return integer.intValue() * integer2.intValue() + "";
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
print("zipWith.accept = " + s);
}
});
Result:
zipWith.accept = 2
zipWith.accept = 12
1.16. merge
将两个数据源合然后把所有的数据合并成一个数据源并返回。
Observable<Integer> oi1 = Observable.fromArray(1);
Observable<Integer> oi2 = Observable.fromArray(2, 4);
oi1.mergeWith(oi2).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
print("mergeWith.accept = " + integer);
}
});
Result:
mergeWith.accept = 1
mergeWith.accept = 2
mergeWith.accept = 4
Doc: http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Observable.html