RxJava2 学习(二)
RxAndroid 学习和操作符的理解
RxJava 的观察者模式
1)RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。
2)与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()。
3)onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
4)onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
5)在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
上游可以发送无限个onNext, 下游也可以接收无限个onNext. 当上游发送了一个onComplete后, 上游onComplete之后的事件将会继续发送, 而下游收到onComplete事件之后将不再继续接收事件. 当上游发送了一个onError后, 上游onError之后的事件将继续发送, 而下游收到onError事件之后将不再继续接收事件. 上游可以不发送onComplete或onError. 最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError
RxJava多线程选项
name | 说明 |
---|---|
Schedulers.io() | 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作 |
Schedulers.computation() | 代表CPU计算密集型的操作, 即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。 |
Schedulers.newThread() | 代表一个常规的新线程 |
AndroidSchedulers.mainThread() | 代表Android的主线程 |
newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
创建一个普通的
private void create() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.e(TAG, "emit 1");
emitter.onNext(1);
Log.e(TAG, "emit 2");
emitter.onNext(2);
Log.e(TAG, "emit 3");
emitter.onNext(3);
Log.e(TAG, "emit complete");
emitter.onComplete();
Log.e(TAG, "emit 4");
emitter.onNext(4);
}
}).subscribe(new Observer<Integer>() {
private Disposable mDisposable;
private int i;
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "subscribe");
mDisposable = d;
}
@Override
public void onNext(Integer value) {
Log.e(TAG, "onNext: " + value);
i++;
if (i == 2) {
Log.e(TAG, "dispose");
mDisposable.dispose();
Log.e(TAG, "isDisposed : " + mDisposable.isDisposed());
}
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "error");
}
@Override
public void onComplete() {
Log.e(TAG, "complete");
}
});
}
使用just时,just是将数据作为一个完整的对象一次性发射的,最后调用的fromArray map 是一个变形的过程eg 由△—>口
private void just() {
String[] students = {"jay", "tom"};
String[] students22 = {"jay22", "tom22"};
//------------------对两个数组进行合并后一次性发出------------------------
Observable.just(students, students22).subscribe(new Consumer<String[]>() {
@Override
public void accept(String[] strings) throws Exception {
for (String s : strings) {
Log.e(TAG, "1=just===" + s);
}
}
});
Observable observable1 = Observable.just("100", "2000").map(new Function<String, Object>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.parseInt(s);
}
});
observable1.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "1====" + integer);
}
});
//---------------------------------------------------
Observable<String> observable2 = Observable.just(students).map(new Function<String[], String>() {
@Override
public String apply(String[] strings) throws Exception {
String str = "";
for (String s : strings) {
str = str + s;
}
return str;
}
});
observable2.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "observable2==" + s);
}
});
//==============================================
Observable observable = Observable.just(students, students22).map(new Function<String[], String>() {
@Override
public String apply(String[] strings) throws Exception {
String str = "";
for (String s : strings) {
str = str + s;
}
return str;
}
});
observable.subscribe(new Consumer<String>() {
@Override
public void accept(String strings) throws Exception {
Log.e(TAG, "222===" + strings);
}
});
}
使用fromArray接收的数据源是逐个发射的
private void fromArray() {
String[] students = {"jay", "tom"};
Observable.fromArray(students).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.e(TAG, s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
map 是一个变形的过程eg 由△—>口 由String —>Integer
private void map() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("111111");
}
}).map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.parseInt(s);
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer s) throws Exception {
Log.e(TAG, "map===" + s);
}
});
}
concatMap它和flatMap的作用几乎一模一样, 只是它的结果是严格按照上游发送的顺序来发送的, flatMap并不保证事件的顺序
private void flatMap() {
// concatMap它和flatMap的作用几乎一模一样, 只是它的结果是严格按照上游发送的顺序来发送的, flatMap并不保证事件的顺序
Disposable subscribe = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(7);
emitter.onNext(8);
emitter.onNext(9);
}
}).flatMap(new Function<Integer, ObservableSource<List<String>>>() {
@Override
public ObservableSource<List<String>> apply(Integer integer) throws Exception {
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
return Observable.just(list);
}
}).flatMap(new Function<List<String>, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(List<String> strings) throws Exception {
String str = strings.toString();
return Observable.just(str);
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
mBtnStart.setText(s.toString());
}
});
}
Zip通过一个函数将多个Observable发送的事件结合到一起,然后发送这些组合到一起的事件. 它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据。
private void zip() {
Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; ; i++) {
e.onNext(i);
// Thread.sleep(1000);
}
}
}).subscribeOn(Schedulers.io());
Observable observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("A");
Thread.sleep(2000);
e.onNext("B");
Thread.sleep(2000);
e.onNext("C");
Thread.sleep(2000);
}
}).subscribeOn(Schedulers.io());
Observable.zip(observable, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer o, String o2) throws Exception {
return o + o2;
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
mBtnStart.setText(s);
}
});
}
上下流分配(借鉴Season_zlc)
我们把上游看成小日本, 把下游当作叶问, 当调用Subscription.request(1)时, 叶问就说我要打一个!
然后小日本就拿出一个鬼子给叶问, 让他打, 等叶问打死这个鬼子之后, 再次调用request(10),
叶问就又说我要打十个! 然后小日本又派出十个鬼子给叶问, 然后就在边上看热闹, 看叶问能不能打死十个鬼子,
等叶问打死十个鬼子后再继续要鬼子接着打...
private void fLowable() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 128; i++) {
Log.d(TAG, "emit " + i);
emitter.onNext(i);
}
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
subscription = s;
s.request(1);// 每次从水缸里去取一个
}
@Override
public void onNext(Integer integer) {
mBtnStart.setText("第" + integer + "个");
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
}
DOME下载 欢迎Start Star 我的GitHub