前言
Rxjava介绍
引入
compile 'io.reactivex:rxjava:1.1.6'
compile 'io.reactivex:rxandroid:1.2.1'
compile 'com.squareup.retrofit2:adapter-rxjava:2.1.0'
使用场景
废话讲的比较多,开始具体场景
场景一:延迟处理
Observable
.timer(2, TimeUnit.SECONDS)
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
//TODO
}
});
上看我们用到的是Rxjava不完整回调,而Rxjava定义来三中不完整回到
- observable.subscribe(onNextAction);
- observable.subscribe(onNextAction, onErrorAction);
- observable.subscribe(onNextAction, onErrorAction, onCompleteAction);
特别要注意的是,如果你无法把握Rxjava事件的订阅消费流程,最好是使用完整的回调
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
};
之所以在这里强调这个,是因为我在使用的过程,有事场景需要处理结果,而我直接使用的observable.subscribe(onNextAction)的不完整调用,而忽略了可能出现的异常情况,或者其他不确定的情况而导致报错闪退,具体错误
java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:422)
at java.util.concurrent.FutureTask.run(FutureTask.java:237)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:152)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:265)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)
at java.lang.Thread.run(Thread.java:818)
场景二:短信定时器
现在的应用基本上都是手机注册+第三方登录,所有手机短信验证是必不可少的,而发送短信的过程我们要使用定时器来处理短信接收过程以及发送失败的情况,Rxjava之前我们使用的是Handler+Timer+TimerTask,这情方式的麻烦程度就不用我多说了,下面我们来看看Rxjava的是怎么实现的。。。
直接撸代码:
subscription = Observable.interval(0, 1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.map(new Func1<Long, Integer>() {
@Override
public Integer call(Long increaseTime) {
return countTime - increaseTime.intValue();
}
})
.take(30)
.doOnSubscribe(new Action0() {
@Override
public void call() {
LogUtils.e("================doOnSubscribe");
timeText.setVisibility(View.VISIBLE);
failLayout.setVisibility(View.GONE);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
LogUtils.e("================doOnUnsubscribe");
stopSubScribe();
}
})
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
timeText.setVisibility(View.GONE);
failLayout.setVisibility(View.VISIBLE);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
LogUtils.e("================"+integer);
timeText.setText(Html.fromHtml(String.format(getResources().getString(R.string.verify_code_time), "<font color=\"#3d9bf5\">" + integer + "秒</font>")));
}
});
private void stopSubScribe() {
if (subscription != null) {
subscription.unsubscribe();
}
}
场景三:多接口调用
有些功能无法一个接口能实现了,比如我们上传一张图片到七牛,首先我们获取七牛token,上后根据token上传图片到七牛云服务器得到图片url,最后把图片url提交到我们自己的数据库中,这个场景中我们相当于执行了三次操作,那我们怎样使用rxjava去实现呢?
直接上代码
Subscription subscription = qiniuRep.getQiniuToken()
.flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(final String token) {
return qiniuRep.uploadImageToQiniu(token, path);
}
})
.flatMap(new Func1<String, Observable<UserEntity>>() {
@Override
public Observable<UserEntity> call(String avater) {
return userRep.updateUserInfo(UserManager.getToken(), UserManager.getUid(), nick_name, gender, birth, area, describe, avater);
}
})
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
}
});
现在是不是看起来非常清晰,从上到下,一步步执行。代码也非常清晰,就不做过多的描述了。
未完待续。。。