开源项目
https://github.com/ReactiveX/RxJava
https://github.com/ReactiveX/RxAndroid
导入依赖
1 2
| compile 'io.reactivex:rxjava:1.0.14' compile 'io.reactivex:rxandroid:1.0.1'
|
RxJava是什么?
我们先了解一下Rx,是Reactive Extensions的缩写。Rx 是微软.NET的一个响应式扩展。Rx借助可观测的序列提供一种简单的方式来创 建异步的,基于事件驱动的程序。开发者可以使用Observables模拟异步数据流使用LINQ语法查询Observables,并且很容易管理调度器的并发。
RxJava是Reactive Extensions的Java VM实现:用于通过使用可观察序列来编译异步和基于事件的程序的库。说到底,就是异步。
为什么要用RxJava?
用简洁的流式操作处理各类复杂的异步请求,解决回调地狱
基本实现
核心对象
Rx的基本实现,是通过一种可拓展的观察者模式。
四个基本对象 :Observable(被观察者),Observer(观察者),subscribe(订阅),基本事件。
基本示例
我们先来通过一个基本的示例来观看RxJava的流式操作:
Observable:印书厂管理员
Observer:销售员
需求:我们现在有几个书柜,每个书柜里有几个箱子,有些箱子里面书本不合格要整箱去除,剩下的箱子重新包装后销售;
传统方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| new Thread() { @Override public void run() { super.run(); for (BookCase bookcase : bookCaseList) { for (final BookPack bookpack : bookcase.getBookPackList()) { if (bookpack.getQuality() == true) { new Thread() { @Override public void run() { super.run(); sell(bookpack); } }.start(); } } } } }.start();
|
使用RxJava
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| Observable.from(bookCaseList) .flatMap(new Func1<BookCase, Observable<BookPack>>() { @Override public Observable<BookPack> call(BookCase bookCase) { return Observable.from(bookCase.getBookPackList()); } }) .filter(new Func1<BookPack, Boolean>() { @Override public Boolean call(BookPack bookPack) { return bookPack.getQuality(); } }) .map(new Func1<BookPack, BookPack>() { @Override public BookPack call(BookPack bookPack) { bookPack.setName(bookPack.getName() + "with new bag"); return bookPack; } }) .subscribe(new Observer<BookPack>() { @Override public void onCompleted() { Log.i("RxJavaTest","sell success"); } @Override public void onError(Throwable e) { Log.e("RxJavaTest","something wrong"); } @Override public void onNext(BookPack bookPack) { sell(bookPack); } });
|
基础内容:
1.创建观察者Observer(定义三个回调方法):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| Observer<String> observer = new Observer<String>() { @Override public void onNext(String s) { Log.d(tag, "Item: " + s); } @Override public void onCompleted() { Log.d(tag, "success"); } @Override public void onError(Throwable e) { Log.d(tag, "something wrong"); } };
|
- onNext(T item)
Observable调用这个方法发射数据,方法的参数就是Observable发射的数据,这个方法 可能会被调用多次,取决于你的实现。
- onError(Exception ex)
当Observable遇到错误或者无法返回期望的数据时会调用这个方法,这个调用会终止 Observable,后续不会再调用onNext和onCompleted,onError方法的参数是抛出的异常。
- onComplete
正常终止,如果没有遇到错误,Observable在最后一次调用onNext之后调用此方法。
根据Observable协议的定义,onNext可能会被调用零次或者很多次,最后会有一次 onCompleted或onError调用(不会同时),传递数据给onNext通常被称作发射, onCompleted和onError被称作通知。
2.创建被观察者(Observable)(常用到的几种方法):
- creat() — 使用一个函数从头创建一个Observable
1 2 3 4 5 6 7 8 9
| Observable observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello"); subscriber.onNext("Android"); subscriber.onNext("Lab"); subscriber.onCompleted(); } });
|
这里传入的OnSubscribe我们可以把它理解为容器,存放了我们要发送的消息,当被订阅的时候,call方法被调用,回调事件依次发生。
- just(T…) — 将一个或多个对象转换成发射这个或这些对象的一个Observable
1 2 3 4 5 6
| Observable observable = Observable.just("Hello", "Android", "Lab");
|
- from(T[]) ——将一个Iterable,一个Future, 或者一个数组转换成一个Observable
1 2 3 4 5 6 7
| String[] test = {"Hello", "Android", "Lab"}; Observable observable = Observable.from(test);
|
- repeat( ) — 创建一个重复发射指定数据或数据序列的Observable
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| Observable.just("one", "two", "three") .repeat(3) .subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } }); "C:\Program Files\Java\jdk1.8.0_65\bin\java" one two three one two three one two three Process finished with exit code 0
|
- defer( ) — 只有当订阅者订阅才创建Observable;为每个订阅创建一个新的 Observable
1 2 3 4 5 6
| Observable.defer(new Func0<Observable<String>>() { @Override public Observable<String> call() { return Observable.just("hello"); } });
|
- range( ) — 创建一个发射指定范围的整数序列的Observable
1 2
| Observable<Integer> observable = Observable.range(1, 5)
|
- interval( ) — 创建一个按照给定的时间间隔发射整数序列的Observable
1 2
| Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS);
|
- timer( ) — 创建一个在给定的延时之后发射单个数据的Observable
1 2
| Observable<Long> observable = Observable.timer(1, TimeUnit.SECONDS);
|
3.订阅事件subscribe
最基本的订阅事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("hello"); subscriber.onCompleted(); } }).subscribe(new Observer<String>() { @Override public void onCompleted() { Log.i("RxJava","completed") } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { Log.i("RxJava","next"); } });
|
除了这种方式之外,RxJava还提供了Action0,Action1,Action2等接口,后面的数字表示所含有的
参数的个数,RxJava会自动创建出我们需要的Observer。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| Observable.just("hello") .subscribe(new Action1<String>() { @Override public void call(String s) { } }); Observable.just("hello") .subscribe(new Action1<String>() { @Override public void call(String s) { } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { } }, new Action0() { @Override public void call() { } } );
|
各种操作符(operators)
过滤操作符
- filter() — 只发射通过了谓词测试的数据项(也就是我们说的if())
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| Observable.just(1, 2, 3, 4, 5) .filter(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer item) { return (item < 4); } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer+""); } }); "C:\Program Files\Java\jdk1.8.0_65\bin\java" 1 2 3 Process finished with exit code 0
|
- distinct() ——只允许还没有发射过的数据项通过
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| Observable.just(1, 1,2,2, 3,3,3, 4, 5) .distinct() .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer+""); } }); "C:\Program Files\Java\jdk1.8.0_65\bin\java" 1 2 3 4 5 Process finished with exit code 0
|
- ofType() — 是 filter 操作符的一个特殊形式。它过滤一个Observable只返回指定类型的数据。
ofType在之后的RxBus中发挥了重大作用。
- debounce() — 在过了一段指定的时间还没发射数据时才发射一个数据
- take(n) — 发射前n项数据
- throttleFirst() — 定期发射Observable发射的第一项数据
变换操作符
- map( ) — 对序列的每一项都应用一个函数来变换Observable发射的数据序列,把内容更新之后包装成新的Observable。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| Observable.just("hello","android","lab") .map(new Func1<String, String>() { @Override public String call(String s) { return s+" world"; } }) .subscribe(new Action1<String>() { @Override public void call(String string) { System.out.println(string+""); } }); "C:\Program Files\Java\jdk1.8.0_65\bin\java" hello world android world lab world Process finished with exit code 0
|
- flatMap() ——将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并 后放进一个单独的Observable
我们假设Android创新实验室分成了几个小组(groupList),我们现在要获取每个组的每个成员名字(memberName),我们先不使用转换操作fu来完成
1 2 3 4 5 6 7 8 9
| Observable.from(groupList) .subscribe(new Action1<Group>() { @Override public void call(Group group) { for (Member member:group.getMembers()) { Log.i("RxJavaTest",member.getMemberName()) } } });
|
因为map只能完成一对一的转化,我们也不希望在RxJava的流式操作中看到for循环,所以我们使用flatmap()来完成一对多的转换
1 2 3 4 5 6 7 8 9 10 11 12 13
| Observable.from(groupList) .flatMap(new Func1<Group, Observable<Member>>() { @Override public Observable<Member> call(Group group) { return Observable.from(group.getMembers()); } }) .subscribe(new Action1<Member>() { @Override public void call(Member member) { Log.i("RxJavaTest",member.getMemberName()); } });
|
哼,这不就是个for循环嘛,你个大骗子!
- cast() ——它将源Observable中的每一项数据都转换为新的类型,把它变成了不同的Class。
说一个和RxBus有关的东西,ofType的源码我们来看一下
1 2 3 4 5 6 7 8
| public final <R> Observable<R> ofType(final Class<R> klass) { return filter(new Func1<T, Boolean>() { @Override public final Boolean call(T t) { return klass.isInstance(t); } }).cast(klass); }
|
就是用filter,和cast组成的,所以给我们提供了一个自定义操作符的思路,大家也可以试试,反正我没试过~
组合操作符
- Merge ——把两个或者多个Observables合并到他们发射的数据项里面
1 2 3
| Observable<Group>groupObservable= Observable.from(groupList); Observable<Group>groupObservable2=Observable.from(groupList); Observable<Group>group=Observable.merge(groupObservable,groupObservable2);
|
- Zip ——合并两个或者多个Observables发射出来的数据项,根据指定的函数 Fun*变化他们,并且发射一个新值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| final Observable<Group>groupObservable= Observable.from(groupList); Observable<Long>tictoc= Observable.interval(1, TimeUnit.SECONDS); Observable.zip(groupObservable, tictoc, new Func2<Group, Long, String>() { @Override public String call(Group group, Long aLong) { return groupObservable.toString()+aLong.toString(); } }).subscribe(new Action1<String>() { @Override public void call(String s) { Log.i("RxJava",s); } });
|
Subject = Observable + Observer
subject 是一个神奇的对象,它可以是一个Observable同时也可以是一个 Observer:它作为连接这两个世界的一座桥梁。一个Subject可以订阅一个 Observable,就像一个观察者,并且它可以发射新的数据,或者传递它接受到的数 据,就像一个Observable。很明显,作为一个Observable,观察者们或者其它 Subject都可以订阅它。
一旦Subject订阅了Observable,它将会触发Observable开始发射。如果原始的 Observable是“冷”的,这将会对订阅一个“热”的Observable变量产生影响。
RxJava提供四种不同的Subject:
- PublishSubject
Publish是Subject的一个基础子类。让我们看看用PublishSubject实现传统的 Observable Hello World :1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| PublishSubject<String>stringPublishSubject=PublishSubject.create(); Subscription subscriptionPrint=stringPublishSubject.subscribe(new Observer<String>(){ @Override @Override public void onCompleted() { System.out.println("Observable completed"); } @Override public void onError(Throwable e){ System.out.println("Oh,no!Something wrong happened!"); } @Override public void onNext(String message){ System.out.println(message); } }); stringPublishSubject.onNext("Hello World");
|
线程调度器Scheduler
“一直说RxJava是异步操作,你到现在都还没告诉我们怎么切换线程,你几个意思呀。”
“咳咳,大佬别着急,我们慢慢看”
先看看RxJava中可用的调度器种类
RxAndroid中给我们添加了专用的AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行,也就是我们的UI线程。
使用ObserverOn和SubScribeOn操作符
- ObserveOn:指定回调发生的线程,事件消费的线程,可以执行多次!
- SubscribeOn(): 订阅事件发生的线程,事件产生的线程,只允许执行一次。
最基本的用法
1 2 3 4 5 6 7 8 9
| Observable.just("hello", "android", "lab") .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<String>() { @Override public void call(String s) { Log.i("RxJavaTest", s); } });
|
多次切换线程
1 2 3 4 5 6 7 8
| Observable.just("hello","android ","lab") .subscribeOn(Schedulers.io()) .observeOn(Schedulers.newThread()) .map() .observeOn(Schedulers.io()) .map() .observeOn(AndroidSchedulers.mainThread) .subscribe(subscriber);
|
基础的先说这么多,会慢慢补充的,see you~