Slient Blog

RxJava RxJava基础

2017-03-19

开源项目

https://github.com/ReactiveX/RxJava

https://github.com/ReactiveX/RxAndroid

导入依赖

1
2
compile 'io.reactivex:rxjava:1.0.14'
compile 'io.reactivex:rxandroid:1.0.1'

RxJava是什么?

我们先了解一下Rx,是Reactive Extensions的缩写。Rx 是微软.NET的一个响应式扩展。Rx借助可观测的序列提供一种简单的方式来创 建异步的,基于事件驱动的程序。开发者可以使用Observables模拟异步数据流使用LINQ语法查询Observables,并且很容易管理调度器的并发。

RxJava是Reactive Extensions的Java VM实现:用于通过使用可观察序列来编译异步和基于事件的程序的库。说到底,就是异步。

为什么要用RxJava?

用简洁的流式操作处理各类复杂的异步请求,解决回调地狱

基本实现

核心对象

Rx的基本实现,是通过一种可拓展的观察者模式。
四个基本对象 :Observable(被观察者),Observer(观察者),subscribe(订阅),基本事件。
baidu

基本示例

我们先来通过一个基本的示例来观看RxJava的流式操作:


Observable:印书厂管理员

Observer:销售员

需求:我们现在有几个书柜,每个书柜里有几个箱子,有些箱子里面书本不合格要整箱去除,剩下的箱子重新包装后销售;

传统方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
new Thread() {
@Override
public void run() {
super.run();
for (BookCase bookcase : bookCaseList) {
for (final BookPack bookpack : bookcase.getBookPackList()) {
if (bookpack.getQuality() == true) {
new Thread() {
@Override
public void run() {
super.run();
sell(bookpack);
}
}.start();
}
}
}
}
}.start();

使用RxJava

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
Observable.from(bookCaseList)
.flatMap(new Func1<BookCase, Observable<BookPack>>() {
@Override
public Observable<BookPack> call(BookCase bookCase) {
return Observable.from(bookCase.getBookPackList());
}
})
.filter(new Func1<BookPack, Boolean>() {
@Override
public Boolean call(BookPack bookPack) {
//判断质量是否过关
return bookPack.getQuality();
}
})
.map(new Func1<BookPack, BookPack>() {
@Override
public BookPack call(BookPack bookPack) {
//重新包装箱子
bookPack.setName(bookPack.getName() + "with new bag");
return bookPack;
}
})
.subscribe(new Observer<BookPack>() {
@Override
public void onCompleted() {
Log.i("RxJavaTest","sell success");
}
@Override
public void onError(Throwable e) {
Log.e("RxJavaTest","something wrong");
}
@Override
public void onNext(BookPack bookPack) {
//整箱销售
sell(bookPack);
}
});

基础内容:

1.创建观察者Observer(定义三个回调方法):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "success");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "something wrong");
}
};

  • onNext(T item)

    Observable调用这个方法发射数据,方法的参数就是Observable发射的数据,这个方法 可能会被调用多次,取决于你的实现。
  • onError(Exception ex)

    当Observable遇到错误或者无法返回期望的数据时会调用这个方法,这个调用会终止 Observable,后续不会再调用onNext和onCompleted,onError方法的参数是抛出的异常。
  • onComplete

    正常终止,如果没有遇到错误,Observable在最后一次调用onNext之后调用此方法。


    根据Observable协议的定义,onNext可能会被调用零次或者很多次,最后会有一次 onCompleted或onError调用(不会同时),传递数据给onNext通常被称作发射, onCompleted和onError被称作通知。

2.创建被观察者(Observable)(常用到的几种方法):

  • creat() — 使用一个函数从头创建一个Observable
1
2
3
4
5
6
7
8
9
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Android");
subscriber.onNext("Lab");
subscriber.onCompleted();
}
});

这里传入的OnSubscribe我们可以把它理解为容器,存放了我们要发送的消息,当被订阅的时候,call方法被调用,回调事件依次发生。

  • just(T…) — 将一个或多个对象转换成发射这个或这些对象的一个Observable
1
2
3
4
5
6
Observable observable = Observable.just("Hello", "Android", "Lab");
// 将会依次调用:
// onNext("Hello");
// onNext("Android");
// onNext("Lab");
// onCompleted();
  • from(T[]) ——将一个Iterable,一个Future, 或者一个数组转换成一个Observable
1
2
3
4
5
6
7
String[] test = {"Hello", "Android", "Lab"};
Observable observable = Observable.from(test);
// 将会依次调用:
// onNext("Hello");
// onNext("Android");
// onNext("Lab");
// onCompleted();
  • repeat( ) — 创建一个重复发射指定数据或数据序列的Observable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Observable.just("one", "two", "three")
.repeat(3)
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
"C:\Program Files\Java\jdk1.8.0_65\bin\java"
one
two
three
one
two
three
one
two
three
Process finished with exit code 0
  • defer( ) — 只有当订阅者订阅才创建Observable;为每个订阅创建一个新的 Observable
1
2
3
4
5
6
Observable.defer(new Func0<Observable<String>>() {
@Override
public Observable<String> call() {
return Observable.just("hello");
}
});
  • range( ) — 创建一个发射指定范围的整数序列的Observable
1
2
// 依次发射 1,2,3,4,5
Observable<Integer> observable = Observable.range(1, 5)
  • interval( ) — 创建一个按照给定的时间间隔发射整数序列的Observable
1
2
// 每隔 1 s 发送一个序列号,从 0 开始,每次累加 1。
Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS);
  • timer( ) — 创建一个在给定的延时之后发射单个数据的Observable
1
2
// 定时 1 s
Observable<Long> observable = Observable.timer(1, TimeUnit.SECONDS);

3.订阅事件subscribe

最基本的订阅事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello");
subscriber.onCompleted();
}
}).subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.i("RxJava","completed")
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.i("RxJava","next");
}
});

除了这种方式之外,RxJava还提供了Action0,Action1,Action2等接口,后面的数字表示所含有的
参数的个数,RxJava会自动创建出我们需要的Observer。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//有些时候我们只需要处理onNext()的回调
Observable.just("hello")
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
//onNext()
}
});
//我们也可以创建出自定义的onComplete(),和onError()
Observable.just("hello")
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
//onNext()
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
//onError()
}
}, new Action0() {
@Override
public void call() {
//onComplete()
}
}
);

各种操作符(operators)

过滤操作符

  • filter() — 只发射通过了谓词测试的数据项(也就是我们说的if())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Observable.just(1, 2, 3, 4, 5)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer item) {
return (item < 4);
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer+"");
}
});
"C:\Program Files\Java\jdk1.8.0_65\bin\java"
1
2
3
Process finished with exit code 0
  • distinct() ——只允许还没有发射过的数据项通过
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Observable.just(1, 1,2,2, 3,3,3, 4, 5)
.distinct()
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer+"");
}
});
"C:\Program Files\Java\jdk1.8.0_65\bin\java"
1
2
3
4
5
Process finished with exit code 0
  • ofType() — 是 filter 操作符的一个特殊形式。它过滤一个Observable只返回指定类型的数据。
    ofType在之后的RxBus中发挥了重大作用。
  • debounce() — 在过了一段指定的时间还没发射数据时才发射一个数据
  • take(n) — 发射前n项数据
  • throttleFirst() — 定期发射Observable发射的第一项数据

变换操作符

  • map( ) — 对序列的每一项都应用一个函数来变换Observable发射的数据序列,把内容更新之后包装成新的Observable。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Observable.just("hello","android","lab")
.map(new Func1<String, String>() {
@Override
public String call(String s) {
return s+" world";
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String string) {
System.out.println(string+"");
}
});
"C:\Program Files\Java\jdk1.8.0_65\bin\java"
hello world
android world
lab world
Process finished with exit code 0
  • flatMap() ——将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并 后放进一个单独的Observable

我们假设Android创新实验室分成了几个小组(groupList),我们现在要获取每个组的每个成员名字(memberName),我们先不使用转换操作fu来完成

1
2
3
4
5
6
7
8
9
Observable.from(groupList)
.subscribe(new Action1<Group>() {
@Override
public void call(Group group) {
for (Member member:group.getMembers()) {
Log.i("RxJavaTest",member.getMemberName())
}
}
});

因为map只能完成一对一的转化,我们也不希望在RxJava的流式操作中看到for循环,所以我们使用flatmap()来完成一对多的转换

1
2
3
4
5
6
7
8
9
10
11
12
13
Observable.from(groupList)
.flatMap(new Func1<Group, Observable<Member>>() {
@Override
public Observable<Member> call(Group group) {
return Observable.from(group.getMembers());
}
})
.subscribe(new Action1<Member>() {
@Override
public void call(Member member) {
Log.i("RxJavaTest",member.getMemberName());
}
});

哼,这不就是个for循环嘛,你个大骗子!

  • cast() ——它将源Observable中的每一项数据都转换为新的类型,把它变成了不同的Class。

说一个和RxBus有关的东西,ofType的源码我们来看一下

1
2
3
4
5
6
7
8
public final <R> Observable<R> ofType(final Class<R> klass) {
return filter(new Func1<T, Boolean>() {
@Override
public final Boolean call(T t) {
return klass.isInstance(t);
}
}).cast(klass);
}

就是用filter,和cast组成的,所以给我们提供了一个自定义操作符的思路,大家也可以试试,反正我没试过~

组合操作符

  • Merge ——把两个或者多个Observables合并到他们发射的数据项里面

1
2
3
Observable<Group>groupObservable= Observable.from(groupList);
Observable<Group>groupObservable2=Observable.from(groupList);
Observable<Group>group=Observable.merge(groupObservable,groupObservable2);
  • Zip ——合并两个或者多个Observables发射出来的数据项,根据指定的函数 Fun*变化他们,并且发射一个新值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final Observable<Group>groupObservable= Observable.from(groupList);
Observable<Long>tictoc= Observable.interval(1, TimeUnit.SECONDS);
Observable.zip(groupObservable, tictoc, new Func2<Group, Long, String>() {
@Override
public String call(Group group, Long aLong) {
return groupObservable.toString()+aLong.toString();
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i("RxJava",s);
}
});

Subject = Observable + Observer

subject 是一个神奇的对象,它可以是一个Observable同时也可以是一个 Observer:它作为连接这两个世界的一座桥梁。一个Subject可以订阅一个 Observable,就像一个观察者,并且它可以发射新的数据,或者传递它接受到的数 据,就像一个Observable。很明显,作为一个Observable,观察者们或者其它 Subject都可以订阅它。
一旦Subject订阅了Observable,它将会触发Observable开始发射。如果原始的 Observable是“冷”的,这将会对订阅一个“热”的Observable变量产生影响。
RxJava提供四种不同的Subject:

  • PublishSubject
    Publish是Subject的一个基础子类。让我们看看用PublishSubject实现传统的 Observable Hello World :
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    PublishSubject<String>stringPublishSubject=PublishSubject.create();
    Subscription subscriptionPrint=stringPublishSubject.subscribe(new Observer<String>(){ @Override
    @Override
    public void onCompleted() {
    System.out.println("Observable completed");
    }
    @Override
    public void onError(Throwable e){
    System.out.println("Oh,no!Something wrong happened!");
    }
    @Override
    public void onNext(String message){
    System.out.println(message);
    }
    });
    stringPublishSubject.onNext("Hello World");

线程调度器Scheduler

“一直说RxJava是异步操作,你到现在都还没告诉我们怎么切换线程,你几个意思呀。”

“咳咳,大佬别着急,我们慢慢看”

先看看RxJava中可用的调度器种类

RxAndroid中给我们添加了专用的AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行,也就是我们的UI线程。

使用ObserverOn和SubScribeOn操作符

  • ObserveOn:指定回调发生的线程,事件消费的线程,可以执行多次!
  • SubscribeOn(): 订阅事件发生的线程,事件产生的线程,只允许执行一次。

最基本的用法

1
2
3
4
5
6
7
8
9
Observable.just("hello", "android", "lab")
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程,事件产生在Io线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i("RxJavaTest", s);
}
});

多次切换线程

1
2
3
4
5
6
7
8
Observable.just("hello""android ","lab") // 发生在IO 线程,由 subscribeOn() 指定,subscribeOn()只允许执行一次,你也可以指定多个,但是只有第一个起作用
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map() // 发生在新线程,由 observeOn() 指定
.observeOn(Schedulers.io())
.map() // 发生在IO 线程,由 observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber); // 发生在UI主线程,由 observeOn() 指定

基础的先说这么多,会慢慢补充的,see you~

Tags: Android
使用微信添加

若你觉得我的文章对你有帮助,请添加我为好友

扫描二维码,分享此文章