摘要:我们可以看下,它把,也就是取消请求的情况处理的挺好。这段代码是给增加一个的事件。也就是请求完成的时候,会自动对进行一个终止,也就是的行为。
概述
前文回顾: 谜之RxJava (三)—— 线程切换
今天来介绍下和RxJava搭配使用的好基友,就是我们的Retrofit啦,Retrofit使用动态代理的机制,为我们提供了一个简要的使用方法来获取网络上的资料,现在更新到2.0.0-beta2了(因为是beta,我也碰到一些坑,期待作者发布下一个版本)。
Retrofit不算是一个网络库,它应该算是封装了okhttp,然后为我们提供了一个友好的接口的一个工具库吧。
我们今天着重来介绍下RxJavaCallAdapterFactory这个类。用过的朋友们都知道,它是用来把Retrofit转成RxJava可用的适配类。
RxJavaCallAdapterFactoryOK,首先看下声明
public final class RxJavaCallAdapterFactory implements CallAdapter.Factory
CallAdapter.Factory是Retrofit这个库中的接口,用来给我们自定义去解析我们自己想要的类型用的。
举个栗子:
@GET("/aaa") ObservablegetQuestionNewestList();
这么一个接口,retrofit本身是无法识别Observable
看下它的实现。
@Override public CallAdapter> get(Type returnType, Annotation[] annotations, Retrofit retrofit) { Class> rawType = Utils.getRawType(returnType); boolean isSingle = "rx.Single".equals(rawType.getCanonicalName()); if (rawType != Observable.class && !isSingle) { return null; } if (!(returnType instanceof ParameterizedType)) { String name = isSingle ? "Single" : "Observable"; throw new IllegalStateException(name + " return type must be parameterized" + " as " + name + "or " + name + " extends Foo>"); } CallAdapter > callAdapter = getCallAdapter(returnType); if (isSingle) { // Add Single-converter wrapper from a separate class. This defers classloading such that // regular Observable operation can be leveraged without relying on this unstable RxJava API. return SingleHelper.makeSingle(callAdapter); } return callAdapter; }
这里代码的意思就是说,如果你返回的不是Observable
如果是的话,那我再来详细看下模板类是哪个,也就是getCallAdapter接口干的事。
private CallAdapter> getCallAdapter(Type returnType) { Type observableType = Utils.getSingleParameterUpperBound((ParameterizedType) returnType); Class> rawObservableType = Utils.getRawType(observableType); if (rawObservableType == Response.class) { if (!(observableType instanceof ParameterizedType)) { throw new IllegalStateException("Response must be parameterized" + " as Response or Response extends Foo>"); } Type responseType = Utils.getSingleParameterUpperBound((ParameterizedType) observableType); return new ResponseCallAdapter(responseType); } if (rawObservableType == Result.class) { if (!(observableType instanceof ParameterizedType)) { throw new IllegalStateException("Result must be parameterized" + " as Result or Result extends Foo>"); } Type responseType = Utils.getSingleParameterUpperBound((ParameterizedType) observableType); return new ResultCallAdapter(responseType); } return new SimpleCallAdapter(observableType); }
这里告诉我们,除了Observable
OK,我们就不看别的,直捣黄龙,看SimpleCallAdapter!
SimpleCallAdapter 创建Observable的类static final class SimpleCallAdapter implements CallAdapter> { private final Type responseType; SimpleCallAdapter(Type responseType) { this.responseType = responseType; } @Override public Type responseType() { return responseType; } @Override public Observable adapt(Call call) { return Observable.create(new CallOnSubscribe<>(call)) // .flatMap(new Func1 , Observable >() { @Override public Observable call(Response response) { if (response.isSuccess()) { return Observable.just(response.body()); } return Observable.error(new HttpException(response)); } }); } }
这里总算看见我们熟悉的Observable接口咯,原来是自己定义了一个OnSubscribe,然后把Response通过flatMap转为我们想要的对象了。 这里同时也判断请求是否成功,进入Observable的工作流里了。
好,我们最终可以看下CallOnSubscribe干了啥
static final class CallOnSubscribeimplements Observable.OnSubscribe > { private final Call originalCall; private CallOnSubscribe(Call originalCall) { this.originalCall = originalCall; } @Override public void call(final Subscriber super Response > subscriber) { // Since Call is a one-shot type, clone it for each new subscriber. final Call call = originalCall.clone(); // Attempt to cancel the call if it is still in-flight on unsubscription. subscriber.add(Subscriptions.create(new Action0() { @Override public void call() { call.cancel(); } })); if (subscriber.isUnsubscribed()) { return; } try { Response response = call.execute(); if (!subscriber.isUnsubscribed()) { subscriber.onNext(response); } } catch (Throwable t) { Exceptions.throwIfFatal(t); if (!subscriber.isUnsubscribed()) { subscriber.onError(t); } return; } if (!subscriber.isUnsubscribed()) { subscriber.onCompleted(); } } }
这里其实蛮简单的,call是retrofit对okhttp的一个代理,是一个同步网络请求,在这里就是典型的对网络进行数据请求,完了放到subscriber的onNext里,完成网络请求。我们可以看下,它把unsubscribe,也就是取消请求的情况处理的挺好。
subscriber.add(Subscriptions.create(new Action0() { @Override public void call() { call.cancel(); } }));
这段代码是给subscribe增加一个unsubscribe的事件。 也就是请求完成的时候,会自动对call进行一个终止,也就是http的close行为。
前方高能今天被坑到这里很久,我们对API调用了observeOn(MainThread)之后,线程会跑在主线程上,包括onComplete也是,unsubscribe也在主线程,然后如果这时候调用call.cancel会导致NetworkOnMainThreadException,所以一定要在retrofit的API调用ExampleAPI.subscribeOn(io).observeOn(MainThread)之后加一句unsubscribeOn(io)。
完整的就是ExampleAPI.subscribeOn(io).observeOn(MainThread).unsubscribeOn(io)。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/12472.html
摘要:谜之二一个异步库最迷人的是什么答案就是把异步序列写到一个工作流里和的如出一辙。综上所述,如果我们需要我们的在一个别的线程上运行的时候,只需要在后面跟一个即可。 【谜之RxJava (二) —— Magic Lift】 Rxjava -- 一个异步库 RxJava最迷人的是什么?答案就是把异步序列写到一个工作流里!和javascript的Promise/A如出一辙。OK,在java中做异...
摘要:最近在界,最火的大概就是了。先来上个最最简单的,经典的。这段代码产生的最终结果就是在里会出现。原来只是为了帮我们处理好异常,以及防止工作流的重复。这是最最基本的工作流,让我们认识到他是怎么工作的。谜之二欢迎关注我以及 最近在Android界,最火的framework大概就是RxJava了。扔物线大大之前写了一篇文章 《给 Android 开发者的 RxJava 详解》,在我学习RxJa...
摘要:操作,操作是在线程上,因为在他们之前的切换了线程。特别注意那一段,对于操作和操作是无效的再简单点总结就是的调用切换之前的线程。 开头 之前我们分析过subscribeOn这个函数,现在我们来看下subscribeOn和observeOn这两个函数到底有什么异同。 用过rxjava的旁友都知道,subscribeOn和observeOn都是用来切换线程用的,可是我什么时候用subscri...
摘要:回顾上一篇文章讲了和之间的关系。我们知道,的具体工作都是在中完成的。这样就生成了一个代理的,最后我们最外层的对象对我们代理的进行了调用。。也就是此处的就是被包裹后的对象。迷之三线程切换欢迎关注我以及 回顾 上一篇文章 讲了Observable、OnSubscribe和Subscriber之间的关系。 我们知道,Observable的具体工作都是在OnSubscribe中完成的。从这个类...
摘要:在更新版本后,这个接口进行了一个重构,变换方式从一个比较难理解的递归嵌套的上做一个改成了从角度上进行了一层封装。从类型来说,脱离了的概念,变身成了。而在老接口中,变换前的是通过传进来的。 在RxJava更新版本后,OperatorSubscribeOn这个接口进行了一个重构,变换方式从一个比较难理解的递归嵌套的Observable上做一个Operator改成了从OnSubscribe角...
阅读 2332·2021-09-30 09:46
阅读 2306·2021-09-26 09:46
阅读 2040·2021-09-24 09:48
阅读 1725·2021-09-23 11:20
阅读 1483·2019-08-30 15:54
阅读 2254·2019-08-30 15:52
阅读 1768·2019-08-29 15:33
阅读 2940·2019-08-28 17:56