Skip to the content.

RxJava2 学习(二)

RxAndroid 学习和操作符的理解

RxJava 的观察者模式

1)RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。
2)与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()。
3)onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
4)onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
5)在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

上游可以发送无限个onNext, 下游也可以接收无限个onNext. 当上游发送了一个onComplete后, 上游onComplete之后的事件将会继续发送, 而下游收到onComplete事件之后将不再继续接收事件. 当上游发送了一个onError后, 上游onError之后的事件将继续发送, 而下游收到onError事件之后将不再继续接收事件. 上游可以不发送onComplete或onError. 最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError

RxJava多线程选项

name 说明
Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
Schedulers.computation() 代表CPU计算密集型的操作, 即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
Schedulers.newThread() 代表一个常规的新线程
AndroidSchedulers.mainThread() 代表Android的主线程

newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。

创建一个普通的

private void create() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.e(TAG, "emit 1");
                emitter.onNext(1);
                Log.e(TAG, "emit 2");
                emitter.onNext(2);
                Log.e(TAG, "emit 3");
                emitter.onNext(3);
                Log.e(TAG, "emit complete");
                emitter.onComplete();
                Log.e(TAG, "emit 4");
                emitter.onNext(4);
            }
        }).subscribe(new Observer<Integer>() {
            private Disposable mDisposable;
            private int i;

            @Override
            public void onSubscribe(Disposable d) {
                Log.e(TAG, "subscribe");
                mDisposable = d;
            }

            @Override
            public void onNext(Integer value) {
                Log.e(TAG, "onNext: " + value);
                i++;
                if (i == 2) {
                    Log.e(TAG, "dispose");
                    mDisposable.dispose();
                    Log.e(TAG, "isDisposed : " + mDisposable.isDisposed());
                }
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "error");
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "complete");
            }
        });
    }

使用just时,just是将数据作为一个完整的对象一次性发射的,最后调用的fromArray map 是一个变形的过程eg 由△—>口

   private void just() {
           String[] students = {"jay", "tom"};
           String[] students22 = {"jay22", "tom22"};
           //------------------对两个数组进行合并后一次性发出------------------------
           Observable.just(students, students22).subscribe(new Consumer<String[]>() {
               @Override
               public void accept(String[] strings) throws Exception {
                   for (String s : strings) {
                       Log.e(TAG, "1=just===" + s);
                   }
               }
           });
           Observable observable1 = Observable.just("100", "2000").map(new Function<String, Object>() {
               @Override
               public Integer apply(String s) throws Exception {

                   return Integer.parseInt(s);
               }
           });
           observable1.subscribe(new Consumer<Integer>() {
               @Override
               public void accept(Integer integer) throws Exception {
                   Log.e(TAG, "1====" + integer);
               }
           });
           //---------------------------------------------------
           Observable<String> observable2 = Observable.just(students).map(new Function<String[], String>() {
               @Override
               public String apply(String[] strings) throws Exception {
                   String str = "";
                   for (String s : strings) {
                       str = str + s;
                   }
                   return str;
               }
           });
           observable2.subscribe(new Consumer<String>() {
               @Override
               public void accept(String s) throws Exception {
                   Log.e(TAG, "observable2==" + s);
               }
           });


           //==============================================
           Observable observable = Observable.just(students, students22).map(new Function<String[], String>() {
               @Override
               public String apply(String[] strings) throws Exception {
                   String str = "";
                   for (String s : strings) {
                       str = str + s;
                   }
                   return str;
               }
           });

           observable.subscribe(new Consumer<String>() {
               @Override
               public void accept(String strings) throws Exception {
                   Log.e(TAG, "222===" + strings);

               }
           });


       }

使用fromArray接收的数据源是逐个发射的

private void fromArray() {
        String[] students = {"jay", "tom"};
        Observable.fromArray(students).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                Log.e(TAG, s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
    }

map 是一个变形的过程eg 由△—>口 由String —>Integer

    private void map() {
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("111111");
            }
        }).map(new Function<String, Integer>() {
            @Override
            public Integer apply(String s) throws Exception {
                return Integer.parseInt(s);
            }
        })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer s) throws Exception {
                        Log.e(TAG, "map===" + s);
                    }
                });
    }

concatMap它和flatMap的作用几乎一模一样, 只是它的结果是严格按照上游发送的顺序来发送的, flatMap并不保证事件的顺序

private void flatMap() {
        // concatMap它和flatMap的作用几乎一模一样, 只是它的结果是严格按照上游发送的顺序来发送的, flatMap并不保证事件的顺序
        Disposable subscribe = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(7);
                emitter.onNext(8);
                emitter.onNext(9);
            }
        }).flatMap(new Function<Integer, ObservableSource<List<String>>>() {
            @Override
            public ObservableSource<List<String>> apply(Integer integer) throws Exception {
                final List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("I am value " + integer);
                }
                return Observable.just(list);


            }
        }).flatMap(new Function<List<String>, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(List<String> strings) throws Exception {
                String str = strings.toString();
                return Observable.just(str);
            }
        })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        mBtnStart.setText(s.toString());
                    }
                });
    }

Zip通过一个函数将多个Observable发送的事件结合到一起,然后发送这些组合到一起的事件. 它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据。

   private void zip() {
        Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                for (int i = 0; ; i++) {
                    e.onNext(i);
//                    Thread.sleep(1000);
                }

            }
        }).subscribeOn(Schedulers.io());
        Observable observable2 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("A");
                Thread.sleep(2000);
                e.onNext("B");
                Thread.sleep(2000);
                e.onNext("C");
                Thread.sleep(2000);
            }
        }).subscribeOn(Schedulers.io());
        Observable.zip(observable, observable2, new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer o, String o2) throws Exception {
                return o + o2;
            }
        }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                mBtnStart.setText(s);
            }
        });


    }

上下流分配(借鉴Season_zlc

    我们把上游看成小日本, 把下游当作叶问, 当调用Subscription.request(1)时, 叶问就说我要打一个!
    然后小日本就拿出一个鬼子给叶问, 让他打, 等叶问打死这个鬼子之后, 再次调用request(10),
    叶问就又说我要打十个! 然后小日本又派出十个鬼子给叶问, 然后就在边上看热闹, 看叶问能不能打死十个鬼子,
    等叶问打死十个鬼子后再继续要鬼子接着打...

    private void fLowable() {
        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 128; i++) {
                    Log.d(TAG, "emit " + i);
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                subscription = s;
                s.request(1);// 每次从水缸里去取一个
            }

            @Override
            public void onNext(Integer integer) {
                mBtnStart.setText("第" + integer + "个");
            }

            @Override
            public void onError(Throwable t) {

            }

            @Override
            public void onComplete() {

            }
        });
    }

DOME下载 欢迎Start Star 我的GitHub

# Back