1. RxJava中常用的Operator

1.1. map

将源ObservableSource里面的每一条数据通过转换算法一一转换为另一条数据。

Observable<Integer> oi = Observable.fromArray(1, 2);
oi.map(new Function<Integer, String>() {
    @Override
    public String apply(Integer integer) throws Exception {
        return "MapString#" + integer.intValue();
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        print("map -> accept: " + s);
    }
});

Result:

map -> accept: MapString#1
map -> accept: MapString#2

1.2. flatMap

将源ObservableSource里面的每一条数据转换为另一个ObservableSource,然后将它们合并到一个Observable里面后返回。

Observable<Integer> oi = Observable.fromArray(1, 2);
oi.flatMap(new Function<Integer, ObservableSource<String>>() {
    @Override
    public ObservableSource<String> apply(Integer integer) throws Exception {
        int v = integer.intValue();
        return Observable.fromArray(v * 10 + "", "" + (v * 10 + 5));
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        print("flatMap -> accept: " + s);
    }
});

Result:

flatMap -> accept: 10
flatMap -> accept: 15
flatMap -> accept: 20
flatMap -> accept: 25

mapflatMap都是一对一的转换,但flatMap转换是把ObservableSource数据类型转换成另外一种ObservableSource数据类型合并后返回,所以flatMap转换可以将数据展开操作转换为多条数据。

下面例子是将ObservableSource<List<String>>数据源集合展开转换为ObservableSource<String>数据源。

Observable<List<String>> oss = Observable.fromArray(list); // "A", "B", "", "C"
oss.flatMap(new Function<List<String>, ObservableSource<String>>() {
    @Override
    public ObservableSource<String> apply(List<String> strings) {
        for (int i = strings.size() - 1; i >= 0; i--) {
            if (strings.get(i).isEmpty()) {
                strings.remove(i);
            }
        }
        return Observable.fromIterable(strings);
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        print("flatMap -> accept: " + s);
    }
});

Result:

flatMap -> accept: A
flatMap -> accept: B
flatMap -> accept: C

1.3. flatMapIterable

将源ObservableSource里面的每一条数据转换为一个Iterable集合,合并到一个Observable里面后返回。

Observable<Integer> oi = Observable.fromArray(1, 2);
oi.flatMapIterable(new Function<Integer, Iterable<String>>() {
    @Override
    public Iterable<String> apply(Integer integer) throws Exception {
        int v = integer.intValue();
        return Arrays.asList(v * 10 + "", "" + (v * 10 + 5));
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        print("flatMapIterable -> accept: " + s);
    }
});

Result:

flatMapIterable -> accept: 10
flatMapIterable -> accept: 15
flatMapIterable -> accept: 20
flatMapIterable -> accept: 25

Map相关类似的操作还有:

  • flatMapSingle: 将源数据遍历转换为SingleSource数据类型,需要关注处理结果。
  • flatMapCompletable: 对数据源进行遍历处理,订阅者只关注处理是否完成,不需要结果。
  • flatMapMaybe: 基本同flatMapCompletable。

1.4. filter

过滤数据源,如果需要保留数据,则在test方法中返回true。

Observable<Integer> oi = Observable.fromArray(1, 2);
oi.filter(new Predicate<Integer>() {
    @Override
    public boolean test(Integer integer) throws Exception {
        return integer.intValue() == 1;
    }
}).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        print("filter -> accept: " + integer.intValue());
    }
});

Result:

filter -> accept: 1

1.5. compose

将Source源数据组合在一起转换为集合数据类型并返回,和map类的方法不同的是,该转换的apply方法将所有的Source都合并到apply方法的参数中,而map类的方法则是将源数据一个一个的传入apply方法中,在方法体中进行转换处理。

Observable<Integer> oi = Observable.fromArray(2, 4);
oi.compose(new ObservableTransformer<Integer, List<Integer>>() {
    @Override
    public ObservableSource<List<Integer>> apply(Observable<Integer> upstream) {
        Iterator<Integer> ii = upstream.blockingIterable().iterator();
        List<Integer> integers = new ArrayList<Integer>();
        while (ii.hasNext()) {
            integers.add(ii.next());
        }
        return Observable.just(integers);
    }
}).subscribe(new Consumer<List<Integer>>() {
    @Override
    public void accept(List<Integer> integers) throws Exception {
        print("compose -> accept: " + integers.get(0).intValue());
        print("compose -> accept: " + integers.get(1).intValue());
    }
});

Result:

compose -> accept: 2
compose -> accept: 4

另外一个例子

List<String> list; // A, B, C, ''
Single<List<String>> ss = Single.just(list);
Single<String> stringSingle = ss.compose(new SingleTransformer<List<String>, String>() {
    @Override
    public SingleSource<String> apply(Single<List<String>> upstream) {
        String item = "";
        List<String> lists = upstream.blockingGet();
        for (int i = 0; i < lists.size(); i++) {
            item += "," + lists.get(i);
        }
        return Single.just(item);
    }
});
stringSingle.subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        print("compose -> accept: " + s);
    }
});

Result:

compose -> accept: ,A,B,C,

1.6. collect

将Source数据类型转换为列表数据类型,它返回的数据类型是Single。

Observable.fromArray("A", "B", "C").collect(new Callable<List<String>>() {
    @Override
    public List<String> call() throws Exception {
        return new ArrayList<>(4);
    }
}, new BiConsumer<List<String>, String>() {
    @Override
    public void accept(List<String> strings, String s) throws Exception {
        strings.add(s);
    }
}).subscribe(new Consumer<List<String>>() {
    @Override
    public void accept(List<String> strings) throws Exception {
          print(strings);
    }
});

Result:

[A, B, C]

1.7. to | toObservable

Single 和 Observable 数据源互相可以转换,Single.toObservable将数据源转换为Observable,Observable.to将数据源转换为Single。

List<String> list; // A, B
Single<List<String>> ss = Single.just(list);
// 转换为Observable
Observable<List<String>> ols = ss.toObservable();
ols.subscribe(new Consumer<List<String>>() {
    @Override
    public void accept(List<String> strings) throws Exception {
        for (String s : strings) {
            print("toObservable -> accept: " + s);
        }
    }
});
// 转换为Single
ols.to(new Function<Observable<List<String>>, Single<List<String>>>() {
    @Override
    public Single<List<String>> apply(Observable<List<String>> listObservable) {
        return Single.just(listObservable.blockingSingle());
    }
}).subscribe(new Consumer<List<String>>() {
    @Override
    public void accept(List<String> strings) throws Exception {
        for (String s : strings) {
            print("to -> accept: " + s);
        }
    }
});

Result:

toObservable -> accept: A
toObservable -> accept: B
to -> accept: A
to -> accept: B

1.8. toList

将源数据转换为List合并后返回一个Single<List<?>>

Observable<Integer> oi = Observable.fromArray(1, 2);
oi.toList().subscribe(new Consumer<List<Integer>>() {
    @Override
    public void accept(List<Integer> integers) throws Exception {
        print("toList -> accept: " + integers);
    }
});

Result:

toList -> accept: [1, 2]

另外一个例子

Observable<Integer> oi = Observable.fromArray(1, 2, 3, 4);
oi.toList(new Callable<List<Integer>>() {
    @Override
    public List<Integer> call() throws Exception {
        return new ArrayList<>();
    }
}).subscribe(new Consumer<List<Integer>>() {
    @Override
    public void accept(List<Integer> integers) throws Exception {
        print("toList -> accept: " + integers);
    }
});

Result:

toList -> accept: [1, 2, 3, 4]

1.9. toMap

将Observable中的源数据转换为Map。

Observable<Integer> oi = Observable.fromArray(1, 2);
oi.toMap(new Function<Integer, String>() {
             @Override
             public String apply(Integer integer) throws Exception {
                 return "" + integer.intValue(); // key
             }
         }, new Function<Integer, String>() {
             @Override
             public String apply(Integer integer) throws Exception {
                 return "" + integer.intValue() * 10; // value
             }
         }
).subscribe(new Consumer<Map<String, String>>() {
    @Override
    public void accept(Map<String, String> stringStringMap) throws Exception {
        print(stringStringMap.get("1")); // 10
        print(stringStringMap.get("2")); // 20
    }
});

1.10. as

将Observable中的源数据直接转换获取具体的数据。

Observable<Integer> oi = Observable.fromArray(1, 2, 3, 4);
String str = oi.as(new ObservableConverter<Integer, String>() {
    @Override
    public String apply(Observable<Integer> upstream) {
        final StringBuffer sb = new StringBuffer();
        upstream.blockingForEach(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                sb.append(integer.intValue() + ",");
            }
        });
        return sb.toString();
    }
});
// str: 1,2,3,4,

1.11. take | takeLast

take从数据源中从0开始取数据,takeLast表示从数据源中取后几个数据。

Observable<Integer> oi = Observable.fromArray(1, 2, 3, 4);
oi.take(2).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        print("take -> accept: " + integer.intValue());
    }
});
oi.takeLast(2).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        print("takeLast -> " + integer.i,ntValue());
    }
});

Result:

take -> accept: 1
take -> accept: 2
takeLast -> 3
takeLast -> 4

1.12. takeWhile

takeWhile表示取到某个数据之后就不再取数据了,test方法返回false表示停止遍历。

Observable<Integer> oi = Observable.fromArray(1, 2, 3, 4);
oi.takeWhile(new Predicate<Integer>() {
    @Override
    public boolean test(Integer integer) throws Exception {
        print("takeWhile -> test: " + integer.intValue());
        if (integer.intValue() == 3) {
            return false;
        }
        return true;
    }
}).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        print("takeWhile -> accept: " + integer.intValue());
    }
});

Result:

takeWhile -> test: 1
takeWhile -> accept: 1
takeWhile -> test: 2
takeWhile -> accept: 2
takeWhile -> test: 3

1.13. takeUntil

takeUntil表示一直取数据直到取到某个数据后就不再取数据了,test方法返回true表示停止遍历获取数据。

Observable<Integer> oi = Observable.fromArray(1, 2, 3, 4);
oi.takeUntil(new Predicate<Integer>() {
    @Override
    public boolean test(Integer integer) throws Exception {
        print("takeUntil -> test: " + integer.intValue());
        if (integer.intValue() == 2) {
            return true;
        }
        return false;
    }
}).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        print("takeUntil -> accept: " + integer.intValue());
    }
});

Result:

takeUntil -> accept: 1
takeUntil -> test: 1
takeUntil -> accept: 2
takeUntil -> test: 2

1.14. lastOrError

lastOrError获取最后一个元素,firstOrError获取第一个元素。

Observable<Integer> oi = Observable.fromArray(1, 2);
oi.lastOrError().subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        print("lastOrError -> accept: " + integer); // 2
    }
});
oi.firstOrError().subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        print("firstOrError -> accept: " + integer); // 1
    }
});

1.15. zip

将两个Observable数据源打包,两个数据源分别同时使用apply方法计算出结果,最终转换成另外一种数据源。

Observable<Integer> oi1 = Observable.fromArray(1, 3);
Observable<Integer> oi2 = Observable.fromArray(2, 4, 5);
oi1.zipWith(oi2, new BiFunction<Integer, Integer, String>() {
    @Override
    public String apply(Integer integer, Integer integer2) throws Exception {
        return integer.intValue() * integer2.intValue() + "";
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        print("zipWith.accept = " + s);
    }
});

Result:

zipWith.accept = 2
zipWith.accept = 12

1.16. merge

将两个数据源合然后把所有的数据合并成一个数据源并返回。

Observable<Integer> oi1 = Observable.fromArray(1);
Observable<Integer> oi2 = Observable.fromArray(2, 4);
oi1.mergeWith(oi2).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        print("mergeWith.accept = " + integer);
    }
});

Result:

mergeWith.accept = 1
mergeWith.accept = 2
mergeWith.accept = 4

Doc: http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Observable.html

results matching ""

    No results matching ""