资讯专栏INFORMATION COLUMN

Dubbo 源码分析 - 服务导出

刘玉平 / 3258人阅读

摘要:支持两种服务导出方式,分别延迟导出和立即导出。本文打算分析服务延迟导出过程,因此不会分析方法。服务导出之前,要进行对一系列的配置进行检查,以及生成。返回时,表示需要延迟导出。赛程预告,下一站是服务导出的前置工作。

1.服务导出过程

本篇文章,我们来研究一下 Dubbo 导出服务的过程。Dubbo 服务导出过程始于 Spring 容器发布刷新事件,Dubbo 在接收到事件后,会立即执行服务导出逻辑。整个逻辑大致可分为三个部分,第一是前置工作,主要用于检查参数,组装 URL。第二是导出服务,包含导出服务到本地 (JVM),和导出服务到远程两个过程。第三是向注册中心注册服务,用于服务发现。本篇文章将会对这三个部分代码进行详细的分析,在分析之前,我们先来了解一下服务的导出过程。

Dubbo 支持两种服务导出方式,分别延迟导出和立即导出。延迟导出的入口是 ServiceBean 的 afterPropertiesSet 方法,立即导出的入口是 ServiceBean 的 onApplicationEvent 方法。本文打算分析服务延迟导出过程,因此不会分析 afterPropertiesSet 方法。下面从 onApplicationEvent 方法说起,该方法收到 Spring 容器的刷新事件后,会调用 export 方法执行服务导出操作。服务导出之前,要进行对一系列的配置进行检查,以及生成 URL。准备工作做完,随后开始导出服务。首先导出到本地,然后再导出到远程。导出到本地就是将服务导出到 JVM 中,此过程比较简单。导出到远程的过程则要复杂的多,以 dubbo 协议为例,DubboProtocol 类的 export 方法将会被调用。该方法主要用于创建 Exporter 和 ExchangeServer。ExchangeServer 本身并不具备通信能力,需要借助更底层的 Server 实现通信功能。因此,在创建 ExchangeServer 实例时,需要先创建 NettyServer 或者 MinaServer 实例,并将实例作为参数传给 ExchangeServer 实现类的构造方法。ExchangeServer 实例创建完成后,导出服务到远程的过程也就接近尾声了。服务导出结束后,服务消费者即可通过直联的方式消费服务。当然,一般我们不会使用直联的方式消费服务。所以,在服务导出结束后,紧接着要做的事情是向注册中心注册服务。此时,客户端即可从注册中心发现服务。

以上就是 Dubbo 服务导出的过程,比较复杂。下面开始分析源码,从源码的角度展现整个过程。

2.源码分析

一场 Dubbo 源码分析的马拉松比赛即将开始,现在我们站在赛道的起点进行热身准备。本次比赛的起点位置位于 ServiceBean 的 onApplicationEvent 方法处。好了,发令枪响了,我将和一些朋友从 onApplicationEvent 方法处出发,探索 Dubbo 服务导出的全过程。下面我们来看一下 onApplicationEvent 方法的源码。

</>复制代码

  1. public void onApplicationEvent(ContextRefreshedEvent event) {
  2. // 是否有延迟导出 && 是否已导出 && 是不是已被取消导出
  3. if (isDelay() && !isExported() && !isUnexported()) {
  4. // 导出服务
  5. export();
  6. }
  7. }

onApplicationEvent 是一个事件响应方法,该方法会在收到 Spring 上下文刷新事件后执行。这个方法首先会根据条件决定是否导出服务,比如有些服务设置了延时导出,那么此时就不应该在此处导出。还有一些服务已经被导出了,或者当前服务被取消导出了,此时也不能再次导出相关服务。注意这里的 isDelay 方法,这个方法字面意思是“是否延迟导出服务”,返回 true 表示延迟导出,false 表示不延迟导出。但是该方法真实意思却并非如此,当方法返回 true 时,表示无需延迟导出。返回 false 时,表示需要延迟导出。与字面意思恰恰相反,让人觉得很奇怪。下面我们来看一下这个方法的逻辑。

</>复制代码

  1. // -☆- ServiceBean
  2. private boolean isDelay() {
  3. // 获取 delay
  4. Integer delay = getDelay();
  5. ProviderConfig provider = getProvider();
  6. if (delay == null && provider != null) {
  7. // 如果前面获取的 delay 为空,这里继续获取
  8. delay = provider.getDelay();
  9. }
  10. // 判断 delay 是否为空,或者等于 -1
  11. return supportedApplicationListener && (delay == null || delay == -1);
  12. }

暂时忽略 supportedApplicationListener 这个条件,当 delay 为空,或者等于-1时,该方法返回 true,而不是 false。这个方法的返回值让人有点困惑,因此我重构了该方法的代码,并给 Dubbo 提了一个 Pull Request,最终这个 PR 被合到了 Dubbo 主分支中。详细请参见 Dubbo #2686。

现在解释一下 supportedApplicationListener 变量含义,该变量用于表示当前的 Spring 容器是否支持 ApplicationListener,这个值初始为 false。在 Spring 容器将自己设置到 ServiceBean 中时,ServiceBean 的 setApplicationContext 方法会检测 Spring 容器是否支持 ApplicationListener。若支持,则将 supportedApplicationListener 置为 true。代码就不分析了,大家自行查阅了解。

ServiceBean 是 Dubbo 与 Spring 框架进行整合的关键,可以看做是两个框架之间的桥梁。具有同样作用的类还有 ReferenceBean。ServiceBean 实现了 Spring 的一些拓展接口,有 FactoryBean、ApplicationContextAware、ApplicationListener、DisposableBean 和 BeanNameAware。这些接口我在 Spring 源码分析系列文章中介绍过,大家可以参考一下,这里就不赘述了。

现在我们知道了 Dubbo 服务导出过程的起点。那么接下来,我们快马加鞭,继续进行比赛。赛程预告,下一站是“服务导出的前置工作”。

2.1 前置工作

前置工作主要包含两个部分,分别是配置检查,以及 URL 装配。在导出服务之前,Dubbo 需要检查用户的配置是否合理,或者为用户补充缺省配置。配置检查完成后,接下来需要根据这些配置组装 URL。在 Dubbo 中,URL 的作用十分重要。Dubbo 使用 URL 作为配置载体,所有的拓展点都是通过 URL 获取配置。这一点,官方文档中有所说明。

</>复制代码

  1. 采用 URL 作为配置信息的统一格式,所有扩展点都通过传递 URL 携带配置信息。

接下来,我们先来分析配置检查部分的源码,随后再来分析 URL 组装部分的源码。

2.1.1 检查配置

本节我们接着前面的源码向下分析,前面说过 onApplicationEvent 方法在经过一些判断后,会决定是否调用 export 方法导出服务。那么下面我们从 export 方法开始进行分析,如下:

</>复制代码

  1. public synchronized void export() {
  2. if (provider != null) {
  3. // 获取 export 和 delay 配置
  4. if (export == null) {
  5. export = provider.getExport();
  6. }
  7. if (delay == null) {
  8. delay = provider.getDelay();
  9. }
  10. }
  11. // 如果 exportfalse,则不导出服务
  12. if (export != null && !export) {
  13. return;
  14. }
  15. if (delay != null && delay > 0) { // delay > 0,延时导出服务
  16. delayExportExecutor.schedule(new Runnable() {
  17. @Override
  18. public void run() {
  19. doExport();
  20. }
  21. }, delay, TimeUnit.MILLISECONDS);
  22. } else { // 立即导出服务
  23. doExport();
  24. }
  25. }

export 对两个配置进行了检查,并配置执行相应的动作。首先是 export,这个配置决定了是否导出服务。有时候我们只是想本地启动服务进行一些调试工作,这个时候我们并不希望把本地启动的服务暴露出去给别人调用。此时,我们就可以通过配置 export 禁止服务导出,比如:

</>复制代码

delay 见名知意了,用于延迟导出服务。下面,我们继续分析源码,这次要分析的是 doExport 方法。

</>复制代码

  1. protected synchronized void doExport() {
  2. if (unexported) {
  3. throw new IllegalStateException("Already unexported!");
  4. }
  5. if (exported) {
  6. return;
  7. }
  8. exported = true;
  9. // 检测 interfaceName 是否合法
  10. if (interfaceName == null || interfaceName.length() == 0) {
  11. throw new IllegalStateException("interface not allow null!");
  12. }
  13. // 检测 provider 是否为空,为空则新建一个,并通过系统变量为其初始化
  14. checkDefault();
  15. // 下面几个 if 语句用于检测 provider、application 等核心配置类对象是否为空,
  16. // 若为空,则尝试从其他配置类对象中获取相应的实例。
  17. if (provider != null) {
  18. if (application == null) {
  19. application = provider.getApplication();
  20. }
  21. if (module == null) {
  22. module = provider.getModule();
  23. }
  24. if (registries == null) {...}
  25. if (monitor == null) {...}
  26. if (protocols == null) {...}
  27. }
  28. if (module != null) {
  29. if (registries == null) {
  30. registries = module.getRegistries();
  31. }
  32. if (monitor == null) {...}
  33. }
  34. if (application != null) {
  35. if (registries == null) {
  36. registries = application.getRegistries();
  37. }
  38. if (monitor == null) {...}
  39. }
  40. // 检测 ref 是否泛化服务类型
  41. if (ref instanceof GenericService) {
  42. // 设置 interfaceClass 为 GenericService.class
  43. interfaceClass = GenericService.class;
  44. if (StringUtils.isEmpty(generic)) {
  45. // 设置 generic = "true"
  46. generic = Boolean.TRUE.toString();
  47. }
  48. } else { // ref 非 GenericService 类型
  49. try {
  50. interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
  51. .getContextClassLoader());
  52. } catch (ClassNotFoundException e) {
  53. throw new IllegalStateException(e.getMessage(), e);
  54. }
  55. // 对 interfaceClass,以及 必要字段进行检查
  56. checkInterfaceAndMethods(interfaceClass, methods);
  57. // 对 ref 合法性进行检测
  58. checkRef();
  59. // 设置 generic = "false"
  60. generic = Boolean.FALSE.toString();
  61. }
  62. // local 属性 Dubbo 官方文档中没有说明,不过 local 和 stub 在功能应该是一致的,用于配置本地存根
  63. if (local != null) {
  64. if ("true".equals(local)) {
  65. local = interfaceName + "Local";
  66. }
  67. Class localClass;
  68. try {
  69. // 获取本地存根类
  70. localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
  71. } catch (ClassNotFoundException e) {
  72. throw new IllegalStateException(e.getMessage(), e);
  73. }
  74. // 检测本地存根类是否可赋值给接口类,若不可赋值则会抛出异常,提醒使用者本地存根类类型不合法
  75. if (!interfaceClass.isAssignableFrom(localClass)) {
  76. throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName);
  77. }
  78. }
  79. // stub 和 local 均用于配置本地存根
  80. if (stub != null) {
  81. // 此处的代码和上一个 if 分支的代码基本一致,这里省略了
  82. }
  83. // 检测各种对象是否为空,为空则新建,或者抛出异常
  84. checkApplication();
  85. checkRegistry();
  86. checkProtocol();
  87. appendProperties(this);
  88. checkStubAndMock(interfaceClass);
  89. if (path == null || path.length() == 0) {
  90. path = interfaceName;
  91. }
  92. // 导出服务
  93. doExportUrls();
  94. // ProviderModel 表示服务提供者模型,此对象中存储了和服务提供者相关的信息。
  95. // 比如服务的配置信息,服务实例等。每个被导出的服务对应一个 ProviderModel。
  96. // ApplicationModel 持有所有的 ProviderModel。
  97. ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
  98. ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
  99. }

以上就是配置检查的相关分析,代码比较多,需要大家耐心看一下。下面对配置检查的逻辑进行简单的总结,如下:

检测 标签的 interface 属性合法性,不合法则抛出异常

检测 ProviderConfig、ApplicationConfig 等核心配置类对象是否为空,若为空,则尝试从其他配置类对象中获取相应的实例。

检测并处理泛化服务和普通服务类

检测本地存根配置,并进行相应的处理

对 ApplicationConfig、RegistryConfig 等配置类进行检测,为空则尝试创建,若无法创建则抛出异常

配置检查并非本文重点,因此我不打算对 doExport 方法所调用的方法进行分析(doExportUrls 方法除外)。在这些方法中,除了 appendProperties 方法稍微复杂一些,其他方法都还好。因此,大家可自行进行分析。好了,其他的就不多说了,继续向下分析。

2.1.2 多协议多注册中心导出服务

Dubbo 允许我们使用不同的协议导出服务,也允许我们向多个注册中心注册服务。Dubbo 在 doExportUrls 方法中对多协议,多注册中心进行了支持。相关代码如下:

</>复制代码

  1. private void doExportUrls() {
  2. // 加载注册中心链接
  3. List registryURLs = loadRegistries(true);
  4. // 遍历 protocols,导出每个服务
  5. for (ProtocolConfig protocolConfig : protocols) {
  6. doExportUrlsFor1Protocol(protocolConfig, registryURLs);
  7. }
  8. }

上面代码比较简单,首先是通过 loadRegistries 加载注册中心链接,然后再遍历 ProtocolConfig 集合导出每个服务。并在导出服务的过程中,将服务注册到注册中心处。下面,我们先来看一下 loadRegistries 方法的逻辑。

</>复制代码

  1. protected List loadRegistries(boolean provider) {
  2. // 检测是否存在注册中心配置类,不存在则抛出异常
  3. checkRegistry();
  4. List registryList = new ArrayList();
  5. if (registries != null && !registries.isEmpty()) {
  6. for (RegistryConfig config : registries) {
  7. String address = config.getAddress();
  8. if (address == null || address.length() == 0) {
  9. // 若 address 为空,则将其设为 0.0.0.0
  10. address = Constants.ANYHOST_VALUE;
  11. }
  12. // 从系统属性中加载注册中心地址
  13. String sysaddress = System.getProperty("dubbo.registry.address");
  14. if (sysaddress != null && sysaddress.length() > 0) {
  15. address = sysaddress;
  16. }
  17. // 判断 address 是否合法
  18. if (address.length() > 0 && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
  19. Map map = new HashMap();
  20. // 添加 ApplicationConfig 中的字段信息到 map 中
  21. appendParameters(map, application);
  22. // 添加 RegistryConfig 字段信息到 map 中
  23. appendParameters(map, config);
  24. map.put("path", RegistryService.class.getName());
  25. map.put("dubbo", Version.getProtocolVersion());
  26. map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
  27. if (ConfigUtils.getPid() > 0) {
  28. map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
  29. }
  30. if (!map.containsKey("protocol")) {
  31. if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) {
  32. map.put("protocol", "remote");
  33. } else {
  34. map.put("protocol", "dubbo");
  35. }
  36. }
  37. // 解析得到 URL 列表,address 可能包含多个注册中心 ip,
  38. // 因此解析得到的是一个 URL 列表
  39. List urls = UrlUtils.parseURLs(address, map);
  40. for (URL url : urls) {
  41. url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
  42. // 将 URL 协议头设置为 registry
  43. url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
  44. // 通过判断条件,决定是否添加 url 到 registryList 中,条件如下:
  45. // (服务提供者 && register = truenull)
  46. // || (非服务提供者 && subscribe = truenull)
  47. if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
  48. || (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
  49. registryList.add(url);
  50. }
  51. }
  52. }
  53. }
  54. }
  55. return registryList;
  56. }

上面代码不是很复杂,包含如下逻辑:

检测是否存在注册中心配置类,不存在则抛出异常

构建参数映射集合,也就是 map

构建注册中心链接列表

遍历链接列表,并根据条件决定是否将其添加到 registryList 中

关于多协议多注册中心导出服务就先分析到这,代码不是很多,就不过多叙述了。接下来分析 URL 组装过程。

2.1.3 组装 URL

配置检查完毕后,紧接着要做的事情是根据配置,以及其他一些信息组装 URL。前面说过,URL 是 Dubbo 配置的载体,通过 URL 可让 Dubbo 的各种配置在各个模块之间传递。URL 之于 Dubbo,犹如水之于鱼,非常重要。大家在阅读 Dubbo 服务导出相关源码的过程中,要注意 URL 内容的变化。既然 URL 如此重要,那么下面我们来了解一下 URL 组装的过程。

</>复制代码

  1. private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List registryURLs) {
  2. String name = protocolConfig.getName();
  3. // 如果协议名为空,或空串,则将协议名变量设置为 dubbo
  4. if (name == null || name.length() == 0) {
  5. name = "dubbo";
  6. }
  7. Map map = new HashMap();
  8. // 添加 side、版本、时间戳以及进程号等信息到 map 中
  9. map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
  10. map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
  11. map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
  12. if (ConfigUtils.getPid() > 0) {
  13. map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
  14. }
  15. // 通过反射将对象的字段信息到 map 中
  16. appendParameters(map, application);
  17. appendParameters(map, module);
  18. appendParameters(map, provider, Constants.DEFAULT_KEY);
  19. appendParameters(map, protocolConfig);
  20. appendParameters(map, this);
  21. // methods 为 MethodConfig 集合,MethodConfig 中存储了 标签的配置信息
  22. if (methods != null && !methods.isEmpty()) {
  23. // 这段代码用于添加 Callback 配置到 map 中,代码太长,待会多带带分析
  24. }
  25. // 检测 generic 是否为 "true",并根据检测结果向 map 中添加不同的信息
  26. if (ProtocolUtils.isGeneric(generic)) {
  27. map.put(Constants.GENERIC_KEY, generic);
  28. map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
  29. } else {
  30. String revision = Version.getVersion(interfaceClass, version);
  31. if (revision != null && revision.length() > 0) {
  32. map.put("revision", revision);
  33. }
  34. // 为接口生成包裹类 Wrapper,Wrapper 中包含了接口的详细信息,比如接口方法名数组,字段信息等
  35. String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
  36. // 添加方法名到 map 中,如果包含多个方法名,则用逗号隔开,比如 method = init,destroy
  37. if (methods.length == 0) {
  38. logger.warn("NO method found in service interface ...");
  39. map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
  40. } else {
  41. // 将逗号作为分隔符连接方法名,并将连接后的字符串放入 map 中
  42. map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet(Arrays.asList(methods)), ","));
  43. }
  44. }
  45. // 添加 token 到 map 中
  46. if (!ConfigUtils.isEmpty(token)) {
  47. if (ConfigUtils.isDefault(token)) {
  48. map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
  49. } else {
  50. map.put(Constants.TOKEN_KEY, token);
  51. }
  52. }
  53. // 判断协议名是否为 injvm
  54. if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
  55. protocolConfig.setRegister(false);
  56. map.put("notify", "false");
  57. }
  58. // 获取上下文路径
  59. String contextPath = protocolConfig.getContextpath();
  60. if ((contextPath == null || contextPath.length() == 0) && provider != null) {
  61. contextPath = provider.getContextpath();
  62. }
  63. // 获取 host 和 port
  64. String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
  65. Integer port = this.findConfigedPorts(protocolConfig, name, map);
  66. // 组装 URL
  67. URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
  68. // 省略无关代码
  69. }

上面的代码首先是将一些信息,比如版本、时间戳、方法名以及各种配置对象的字段信息放入到 map 中,map 中的内容将作为 URL 的查询字符串。构建好 map 后,紧接着是获取上下文路径、主机名以及端口号等信息。最后将 map 和主机名等数据传给 URL 构造方法创建 URL 对象。需要注意的是,这里出现的 URL 并非 java.net.URL,而是 com.alibaba.dubbo.common.URL。

上面省略了一段代码,这里简单分析一下。这段代码用于检测 标签中的配置信息,并将相关配置添加到 map 中。代码如下:

</>复制代码

  1. private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List registryURLs) {
  2. // ...
  3. // methods 为 MethodConfig 集合,MethodConfig 中存储了 标签的配置信息
  4. if (methods != null && !methods.isEmpty()) {
  5. for (MethodConfig method : methods) {
  6. // 添加 MethodConfig 对象的字段信息到 map 中,键 = 方法名.属性名。
  7. // 比如存储 对应的 MethodConfig,
  8. // 键 = sayHello.retries,map = {"sayHello.retries": 2, "xxx": "yyy"}
  9. appendParameters(map, method, method.getName());
  10. String retryKey = method.getName() + ".retry";
  11. if (map.containsKey(retryKey)) {
  12. String retryValue = map.remove(retryKey);
  13. // 检测 MethodConfig retry 是否为 false,若是,则设置重试次数为0
  14. if ("false".equals(retryValue)) {
  15. map.put(method.getName() + ".retries", "0");
  16. }
  17. }
  18. // 获取 ArgumentConfig 列表
  19. List arguments = method.getArguments();
  20. if (arguments != null && !arguments.isEmpty()) {
  21. for (ArgumentConfig argument : arguments) {
  22. // 检测 type 属性是否为空,或者空串(分支1 ⭐️)
  23. if (argument.getType() != null && argument.getType().length() > 0) {
  24. Method[] methods = interfaceClass.getMethods();
  25. if (methods != null && methods.length > 0) {
  26. for (int i = 0; i < methods.length; i++) {
  27. String methodName = methods[i].getName();
  28. // 比对方法名,查找目标方法
  29. if (methodName.equals(method.getName())) {
  30. Class[] argtypes = methods[i].getParameterTypes();
  31. if (argument.getIndex() != -1) {
  32. // 检测 ArgumentConfig 中的 type 属性与方法参数列表
  33. // 中的参数名称是否一致,不一致则抛出异常(分支2 ⭐️)
  34. if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
  35. // 添加 ArgumentConfig 字段信息到 map 中,
  36. // 键前缀 = 方法名.index,比如:
  37. // map = {"sayHello.3": true}
  38. appendParameters(map, argument, method.getName() + "." + argument.getIndex());
  39. } else {
  40. throw new IllegalArgumentException("argument config error: ...");
  41. }
  42. } else { // 分支3 ⭐️
  43. for (int j = 0; j < argtypes.length; j++) {
  44. Class argclazz = argtypes[j];
  45. // 从参数类型列表中查找类型名称为 argument.type 的参数
  46. if (argclazz.getName().equals(argument.getType())) {
  47. appendParameters(map, argument, method.getName() + "." + j);
  48. if (argument.getIndex() != -1 && argument.getIndex() != j) {
  49. throw new IllegalArgumentException("argument config error: ...");
  50. }
  51. }
  52. }
  53. }
  54. }
  55. }
  56. }
  57. // 用户未配置 type 属性,但配置了 index 属性,且 index != -1
  58. } else if (argument.getIndex() != -1) { // 分支4 ⭐️
  59. // 添加 ArgumentConfig 字段信息到 map 中
  60. appendParameters(map, argument, method.getName() + "." + argument.getIndex());
  61. } else {
  62. throw new IllegalArgumentException("argument config must set index or type");
  63. }
  64. }
  65. }
  66. }
  67. }
  68. // ...
  69. }

上面这段代码 for 循环和 if else 分支嵌套太多,导致层次太深,不利于阅读,需要耐心看一下。大家在看这段代码时,注意把几个重要的条件分支找出来。只要理解了这几个分支的意图,就可以弄懂这段代码。我在上面代码中用⭐️符号标识出了4个重要的分支,下面用伪代码解释一下这几个分支的含义。

</>复制代码

  1. // 获取 ArgumentConfig 列表
  2. for (遍历 ArgumentConfig 列表) {
  3. if (type 不为 null,也不为空串) { // 分支1
  4. 1. 通过反射获取 interfaceClass 的方法列表
  5. for (遍历方法列表) {
  6. 1. 比对方法名,查找目标方法
  7. 2. 通过反射获取目标方法的参数类型数组 argtypes
  8. if (index != -1) { // 分支2
  9. 1. 从 argtypes 数组中获取下标 index 处的元素 argType
  10. 2. 检测 argType 的名称与 ArgumentConfig 中的 type 属性是否一致
  11. 3. 添加 ArgumentConfig 字段信息到 map 中,或抛出异常
  12. } else { // 分支3
  13. 1. 遍历参数类型数组 argtypes,查找 argument.type 类型的参数
  14. 2. 添加 ArgumentConfig 字段信息到 map
  15. }
  16. }
  17. } else if (index != -1) { // 分支4
  18. 1. 添加 ArgumentConfig 字段信息到 map
  19. }
  20. }

在本节分析的源码中,appendParameters 这个方法出现的次数比较多,该方法用于将对象字段信息添加到 map 中。实现上则是通过反射获取目标对象的 getter 方法,并调用该方法获取属性值。然后再通过 getter 方法名解析出属性名,比如从方法名 getName 中可解析出属性 name。如果用户传入了属性名前缀,此时需要将属性名加入前缀内容。最后将 <属性名,属性值> 键值对存入到 map 中就行了。限于篇幅原因,这里就不分析 appendParameters 方法的源码了,大家请自行分析。

2.2 导出 Dubbo 服务

前置工作做完,接下来就可以进行服务导出工作。服务导出,分为导出到本地 (JVM),和导出到远程。在深入分析服务导出源码前,我们先来从宏观层面上看一下服务导出逻辑。如下:

</>复制代码

  1. private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List registryURLs) {
  2. // 省略无关代码
  3. if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
  4. .hasExtension(url.getProtocol())) {
  5. // 加载 ConfiguratorFactory,并生成 Configurator 配置 url
  6. url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
  7. .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
  8. }
  9. String scope = url.getParameter(Constants.SCOPE_KEY);
  10. // 如果 scope = none,则什么都不做
  11. if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
  12. // scope != remote,导出到本地
  13. if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
  14. exportLocal(url);
  15. }
  16. // scope != local,导出到远程
  17. if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
  18. if (registryURLs != null && !registryURLs.isEmpty()) {
  19. for (URL registryURL : registryURLs) {
  20. url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
  21. // 加载监视器链接
  22. URL monitorUrl = loadMonitor(registryURL);
  23. if (monitorUrl != null) {
  24. // 将监视器链接作为参数添加到 url 中
  25. url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
  26. }
  27. String proxy = url.getParameter(Constants.PROXY_KEY);
  28. if (StringUtils.isNotEmpty(proxy)) {
  29. registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
  30. }
  31. // 为服务提供类(ref)生成 Invoker
  32. Invoker invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
  33. // DelegateProviderMetaDataInvoker 仅用于持有 Invoker 和 ServiceConfig
  34. DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
  35. // 导出服务,并生成 Exporter
  36. Exporter exporter = protocol.export(wrapperInvoker);
  37. exporters.add(exporter);
  38. }
  39. } else { // 不存在注册中心,仅导出服务
  40. Invoker invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
  41. DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
  42. Exporter exporter = protocol.export(wrapperInvoker);
  43. exporters.add(exporter);
  44. }
  45. }
  46. }
  47. this.urls.add(url);
  48. }

上面代码根据 url 中的 scope 参数决定服务导出方式,分别如下:

scope = none,不导出服务

scope != remote,导出到本地

scope != local,导出到远程

不管是导出到本地,还是远程。进行服务导出之前,均需要先创建 Invoker。这是一个很重要的步骤,因此接下来我会先分析 Invoker 的创建过程。

2.2.1 Invoker 创建过程

在 Dubbo 中,Invoker 是一个非常重要的模型。在服务提供端,以及服务引用端均会出现 Invoker。Dubbo 官方文档中对 Invoker 进行了说明,这里引用一下。

</>复制代码

  1. Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。

既然 Invoker 如此重要,那么我们很有必要搞清楚 Invoker 的用途。Invoker 是由 ProxyFactory 创建而来,Dubbo 默认的 ProxyFactory 实现类是 JavassistProxyFactory。下面我们到 JavassistProxyFactory 代码中,探索 Invoker 的创建过程。如下:

</>复制代码

  1. -- JavassistProxyFactory
  2. public Invoker getInvoker(T proxy, Class type, URL url) {
  3. // 为目标类创建 Wrapper
  4. final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf("$") < 0 ? proxy.getClass() : type);
  5. // 创建匿名 Invoker 类对象,并实现 doInvoke 方法。
  6. return new AbstractProxyInvoker(proxy, type, url) {
  7. @Override
  8. protected Object doInvoke(T proxy, String methodName,
  9. Class[] parameterTypes,
  10. Object[] arguments) throws Throwable {
  11. // 调用 Wrapper 的 invokeMethod 方法,invokeMethod 最终会调用目标方法
  12. return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
  13. }
  14. };
  15. }

如上,JavassistProxyFactory 创建了一个继承自 AbstractProxyInvoker 类的匿名对象,并覆写了抽象方法 doInvoke。覆写后的 doInvoke 逻辑比较简单,仅是将调用请求转发给了 Wrapper 类的 invokeMethod 方法。Wrapper 用于“包裹”目标类,Wrapper 是一个抽象类,仅可通过 getWrapper(Class) 方法创建子类。在创建 Wrapper 子类的过程中,子类代码生成逻辑会对 getWrapper 方法传入的 Class 对象进行解析,拿到诸如类方法,类成员变量等信息。以及生成 invokeMethod 方法代码,和其他一些方法代码。代码生成完毕后,通过 Javassist 生成 Class 对象,最后再通过反射创建 Wrapper 实例。相关的代码如下:

</>复制代码

  1. public static Wrapper getWrapper(Class c) {
  2. while (ClassGenerator.isDynamicClass(c))
  3. c = c.getSuperclass();
  4. if (c == Object.class)
  5. return OBJECT_WRAPPER;
  6. // 访存
  7. Wrapper ret = WRAPPER_MAP.get(c);
  8. if (ret == null) {
  9. // 缓存未命中,创建 Wrapper
  10. ret = makeWrapper(c);
  11. // 写入缓存
  12. WRAPPER_MAP.put(c, ret);
  13. }
  14. return ret;
  15. }

getWrapper 方法只是包含了一些缓存操作逻辑,非重点。下面我们重点关注 makeWrapper 方法。

</>复制代码

  1. private static Wrapper makeWrapper(Class c) {
  2. // 检测 c 是否为私有类型,若是则抛出异常
  3. if (c.isPrimitive())
  4. throw new IllegalArgumentException("Can not create wrapper for primitive type: " + c);
  5. String name = c.getName();
  6. ClassLoader cl = ClassHelper.getClassLoader(c);
  7. // c1 用于存储 setPropertyValue 方法代码
  8. StringBuilder c1 = new StringBuilder("public void setPropertyValue(Object o, String n, Object v){ ");
  9. // c2 用于存储 getPropertyValue 方法代码
  10. StringBuilder c2 = new StringBuilder("public Object getPropertyValue(Object o, String n){ ");
  11. // c3 用于存储 invokeMethod 方法代码
  12. StringBuilder c3 = new StringBuilder("public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws " + InvocationTargetException.class.getName() + "{ ");
  13. // 生成类型转换代码及异常捕捉代码,比如:
  14. // DemoService w; try { w = ((DemoServcie) $1); }}catch(Throwable e){ throw new IllegalArgumentException(e); }
  15. c1.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
  16. c2.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
  17. c3.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
  18. // pts 用于存储成员变量名和类型
  19. Map> pts = new HashMap>();
  20. // ms 用于存储方法描述信息(可理解为方法签名)及 Method 实例
  21. Map ms = new LinkedHashMap();
  22. // mns 为方法名列表
  23. List mns = new ArrayList();
  24. // dmns 用于存储定义在当前类中的方法的名称
  25. List dmns = new ArrayList();
  26. // --------------------------------✨ 分割线1 ✨-------------------------------------
  27. // 获取 public 访问级别的字段,并为所有字段生成条件判断语句
  28. for (Field f : c.getFields()) {
  29. String fn = f.getName();
  30. Class ft = f.getType();
  31. if (Modifier.isStatic(f.getModifiers()) || Modifier.isTransient(f.getModifiers()))
  32. // 忽略关键字 static 或 transient 修饰的变量
  33. continue;
  34. // 生成条件判断及赋值语句,比如:
  35. // if( $2.equals("name") ) { w.name = (java.lang.String) $3; return;}
  36. // if( $2.equals("age") ) { w.age = ((Number) $3).intValue(); return;}
  37. c1.append(" if( $2.equals("").append(fn).append("") ){ w.").append(fn).append("=").append(arg(ft, "$3")).append("; return; }");
  38. // 生成条件判断及返回语句,比如:
  39. // if( $2.equals("name") ) { return ($w)w.name; }
  40. c2.append(" if( $2.equals("").append(fn).append("") ){ return ($w)w.").append(fn).append("; }");
  41. // 存储 <字段名, 字段类型> 键值对到 pts 中
  42. pts.put(fn, ft);
  43. }
  44. // --------------------------------✨ 分割线2 ✨-------------------------------------
  45. Method[] methods = c.getMethods();
  46. // 检测 c 中是否包含在当前类中声明的方法
  47. boolean hasMethod = hasMethods(methods);
  48. if (hasMethod) {
  49. c3.append(" try{");
  50. }
  51. for (Method m : methods) {
  52. if (m.getDeclaringClass() == Object.class)
  53. // 忽略 Object 中定义的方法
  54. continue;
  55. String mn = m.getName();
  56. // 生成方法名判断语句,示例如下:
  57. // if ( "sayHello".equals( $2 )
  58. c3.append(" if( "").append(mn).append("".equals( $2 ) ");
  59. int len = m.getParameterTypes().length;
  60. // 生成运行时传入参数的数量与方法的参数列表长度判断语句,示例如下:
  61. // && $3.length == 2
  62. c3.append(" && ").append(" $3.length == ").append(len);
  63. boolean override = false;
  64. for (Method m2 : methods) {
  65. // 检测方法是否存在重载情况,条件为:方法对象不同 && 方法名相同
  66. if (m != m2 && m.getName().equals(m2.getName())) {
  67. override = true;
  68. break;
  69. }
  70. }
  71. // 对重载方法进行处理,考虑下面的方法:
  72. // 1. void sayHello(Integer, String)
  73. // 2. void sayHello(Integer, Integer)
  74. // 方法名相同,参数列表长度也相同,因此不能仅通过这两项判断两个方法是否相等。
  75. // 需要进一步判断方法的参数类型
  76. if (override) {
  77. if (len > 0) {
  78. for (int l = 0; l < len; l++) {
  79. // && $3[0].getName().equals("java.lang.Integer")
  80. // && $3[1].getName().equals("java.lang.String")
  81. c3.append(" && ").append(" $3[").append(l).append("].getName().equals("")
  82. .append(m.getParameterTypes()[l].getName()).append("")");
  83. }
  84. }
  85. }
  86. // 添加 ) {,完成方法判断语句,此时生成的方法可能如下(已格式化):
  87. // if ("sayHello".equals($2)
  88. // && $3.length == 2
  89. // && $3[0].getName().equals("java.lang.Integer")
  90. // && $3[1].getName().equals("java.lang.String")) {
  91. c3.append(" ) { ");
  92. // 根据返回值类型生成目标方法调用语句
  93. if (m.getReturnType() == Void.TYPE)
  94. // w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]); return null;
  95. c3.append(" w.").append(mn).append("(").append(args(m.getParameterTypes(), "$4")).append(");").append(" return null;");
  96. else
  97. // return w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]);
  98. c3.append(" return ($w)w.").append(mn).append("(").append(args(m.getParameterTypes(), "$4")).append(");");
  99. // 添加 }, 当前”方法判断条件“代码生成完毕,示例代码如下(已格式化):
  100. // if ("sayHello".equals($2)
  101. // && $3.length == 2
  102. // && $3[0].getName().equals("java.lang.Integer")
  103. // && $3[1].getName().equals("java.lang.String")) {
  104. //
  105. // w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]);
  106. // return null;
  107. // }
  108. c3.append(" }");
  109. // 添加方法名到 mns 集合中
  110. mns.add(mn);
  111. // 检测当前方法是否在 c 中被声明的
  112. if (m.getDeclaringClass() == c)
  113. // 若是,则将当前方法名添加到 dmns 中
  114. dmns.add(mn);
  115. ms.put(ReflectUtils.getDesc(m), m);
  116. }
  117. if (hasMethod) {
  118. // 添加异常捕捉语句
  119. c3.append(" } catch(Throwable e) { ");
  120. c3.append(" throw new java.lang.reflect.InvocationTargetException(e); ");
  121. c3.append(" }");
  122. }
  123. // 添加 NoSuchMethodException 异常抛出代码
  124. c3.append(" throw new " + NoSuchMethodException.class.getName() + "("Not found method ""+$2+"" in class " + c.getName() + "."); }");
  125. // --------------------------------✨ 分割线3 ✨-------------------------------------
  126. Matcher matcher;
  127. // 处理 get/set 方法
  128. for (Map.Entry entry : ms.entrySet()) {
  129. String md = entry.getKey();
  130. Method method = (Method) entry.getValue();
  131. // 匹配以 get 开头的方法
  132. if ((matcher = ReflectUtils.GETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {
  133. // 获取属性名
  134. String pn = propertyName(matcher.group(1));
  135. // 生成属性判断以及返回语句,示例如下:
  136. // if( $2.equals("name") ) { return ($w).w.getName(); }
  137. c2.append(" if( $2.equals("").append(pn).append("") ){ return ($w)w.").append(method.getName()).append("(); }");
  138. pts.put(pn, method.getReturnType());
  139. // 匹配以 is/has/can 开头的方法
  140. } else if ((matcher = ReflectUtils.IS_HAS_CAN_METHOD_DESC_PATTERN.matcher(md)).matches()) {
  141. String pn = propertyName(matcher.group(1));
  142. // 生成属性判断以及返回语句,示例如下:
  143. // if( $2.equals("dream") ) { return ($w).w.hasDream(); }
  144. c2.append(" if( $2.equals("").append(pn).append("") ){ return ($w)w.").append(method.getName()).append("(); }");
  145. pts.put(pn, method.getReturnType());
  146. // 匹配以 set 开头的方法
  147. } else if ((matcher = ReflectUtils.SETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {
  148. Class pt = method.getParameterTypes()[0];
  149. String pn = propertyName(matcher.group(1));
  150. // 生成属性判断以及 setter 调用语句,示例如下:
  151. // if( $2.equals("name") ) { w.setName((java.lang.String)$3); return; }
  152. c1.append(" if( $2.equals("").append(pn).append("") ){ w.").append(method.getName()).append("(").append(arg(pt, "$3")).append("); return; }");
  153. pts.put(pn, pt);
  154. }
  155. }
  156. // 添加 NoSuchPropertyException 异常抛出代码
  157. c1.append(" throw new " + NoSuchPropertyException.class.getName() + "("Not found property ""+$2+"" filed or setter method in class " + c.getName() + "."); }");
  158. c2.append(" throw new " + NoSuchPropertyException.class.getName() + "("Not found property ""+$2+"" filed or setter method in class " + c.getName() + "."); }");
  159. // --------------------------------✨ 分割线4 ✨-------------------------------------
  160. long id = WRAPPER_CLASS_COUNTER.getAndIncrement();
  161. // 创建类生成器
  162. ClassGenerator cc = ClassGenerator.newInstance(cl);
  163. // 设置类名及超类
  164. cc.setClassName((Modifier.isPublic(c.getModifiers()) ? Wrapper.class.getName() : c.getName() + "$sw") + id);
  165. cc.setSuperClass(Wrapper.class);
  166. // 添加默认构造方法
  167. cc.addDefaultConstructor();
  168. // 添加字段
  169. cc.addField("public static String[] pns;");
  170. cc.addField("public static " + Map.class.getName() + " pts;");
  171. cc.addField("public static String[] mns;");
  172. cc.addField("public static String[] dmns;");
  173. for (int i = 0, len = ms.size(); i < len; i++)
  174. cc.addField("public static Class[] mts" + i + ";");
  175. // 添加方法代码
  176. cc.addMethod("public String[] getPropertyNames(){ return pns; }");
  177. cc.addMethod("public boolean hasProperty(String n){ return pts.containsKey($1); }");
  178. cc.addMethod("public Class getPropertyType(String n){ return (Class)pts.get($1); }");
  179. cc.addMethod("public String[] getMethodNames(){ return mns; }");
  180. cc.addMethod("public String[] getDeclaredMethodNames(){ return dmns; }");
  181. cc.addMethod(c1.toString());
  182. cc.addMethod(c2.toString());
  183. cc.addMethod(c3.toString());
  184. try {
  185. // 生成类
  186. Class wc = cc.toClass();
  187. // 设置字段值
  188. wc.getField("pts").set(null, pts);
  189. wc.getField("pns").set(null, pts.keySet().toArray(new String[0]));
  190. wc.getField("mns").set(null, mns.toArray(new String[0]));
  191. wc.getField("dmns").set(null, dmns.toArray(new String[0]));
  192. int ix = 0;
  193. for (Method m : ms.values())
  194. wc.getField("mts" + ix++).set(null, m.getParameterTypes());
  195. // 创建 Wrapper 实例
  196. return (Wrapper) wc.newInstance();
  197. } catch (RuntimeException e) {
  198. throw e;
  199. } catch (Throwable e) {
  200. throw new RuntimeException(e.getMessage(), e);
  201. } finally {
  202. cc.release();
  203. ms.clear();
  204. mns.clear();
  205. dmns.clear();
  206. }
  207. }

上面代码很长,大家耐心看一下。我在上面代码中做了大量的注释,并按功能对代码进行了分块,以帮助大家理解代码逻辑。下面对这段代码进行讲解。首先我们把目光移到分割线1之上的代码,这段代码主要用于进行一些初始化操作。比如创建 c1、c2、c3 以及 pts、ms、mns 等变量,以及向 c1、c2、c3 中添加方法定义和类型类型转换代码。接下来是分割线1到分割线2之间的代码,这段代码用于为 public 级别的字段生成条件判断取值与赋值代码。这段代码不是很难看懂,就不多说了。继续向下看,分割线2和分隔线3之间的代码用于为定义在当前类中的方法生成判断语句,和方法调用语句。因为需要对方法重载进行校验,因此到这这段代码看起来有点复杂。不过耐心开一下,也不是很难理解。接下来是分割线3和分隔线4之间的代码,这段代码用于处理 getter、setter 以及以 is/has/can 开头的方法。处理方式是通过正则表达式获取方法类型(get/set/is/...),以及属性名。之后为属性名生成判断语句,然后为方法生成调用语句。最后我们再来看一下分隔线4以下的代码,这段代码通过 ClassGenerator 为刚刚生成的代码构建 Class 类,并通过反射创建对象。ClassGenerator 是 Dubbo 自己封装的,该类的核心是 toClass() 的重载方法 toClass(ClassLoader, ProtectionDomain),该方法通过 javassist 构建 Class。这里就不分析 toClass 方法了,大家请自行分析。

阅读 Wrapper 类代码需要对 javassist 框架有所了解。关于 javassist,大家如果不熟悉,请自行查阅资料,本节不打算介绍 javassist 相关内容。

好了,关于 Wrapper 类生成过程就分析到这。如果大家看的不是很明白,可以多带带为 Wrapper 创建单元测试,然后单步调试。并将生成的代码拷贝出来,格式化后再进行观察和理解。好了,本节先到这。

2.2.2 导出服务到本地

本节我们来看一下服务导出相关的代码,按照代码执行顺序,本节先来分析导出服务到本地的过程。相关代码如下:

</>复制代码

  1. private void exportLocal(URL url) {
  2. // 如果 URL 的协议头等于 injvm,说明已经导出到本地了,无需再次导出
  3. if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
  4. URL local = URL.valueOf(url.toFullString())
  5. .setProtocol(Constants.LOCAL_PROTOCOL) // 设置协议头为 injvm
  6. .setHost(LOCALHOST)
  7. .setPort(0);
  8. ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));
  9. // 创建 Invoker,并导出服务,这里的 protocol 会在运行时调用 InjvmProtocol 的 export 方法
  10. Exporter exporter = protocol.export(
  11. proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
  12. exporters.add(exporter);
  13. }
  14. }

exportLocal 方法比较简单,首先根据 URL 协议头决定是否导出服务。若需导出,则创建一个新的 URL 并将协议头、主机名以及端口设置成新的值。然后创建 Invoker,并调用 InjvmProtocol 的 export 方法导出服务。下面我们来看一下 InjvmProtocol 的 export 方法都做了哪些事情。

</>复制代码

  1. public Exporter export(Invoker invoker) throws RpcException {
  2. // 创建 InjvmExporter
  3. return new InjvmExporter(invoker, invoker.getUrl().getServiceKey(), exporterMap);
  4. }

如上,InjvmProtocol 的 export 方法仅创建了一个 InjvmExporter,无其他逻辑。到此导出服务到本地就分析完了,接下来,我们继续分析导出服务到远程的过程。

2.2.3 导出服务到远程

与导出服务到本地相比,导出服务到远程的过程要复杂不少,其包含了服务导出与服务注册两个过程。这两个过程涉及到了大量的调用,因此比较复杂。不过不管再难,我们都要看一下,万一看懂了呢。按照代码执行顺序,本节先来分析服务导出逻辑,服务注册逻辑将在下一节进行分析。下面开始分析,我们把目光移动到 RegistryProtocol 的 export 方法上。

</>复制代码

  1. public Exporter export(final Invoker originInvoker) throws RpcException {
  2. // 导出服务
  3. final ExporterChangeableWrapper exporter = doLocalExport(originInvoker);
  4. // 获取注册中心 URL,以 zookeeper 注册中心为例,得到的示例 URL 如下:
  5. // zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.17.48.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider
  6. URL registryUrl = getRegistryUrl(originInvoker);
  7. // 根据 URL 加载 Registry 实现类,比如 ZookeeperRegistry
  8. final Registry registry = getRegistry(originInvoker);
  9. // 获取已注册的服务提供者 URL,比如:
  10. // dubbo://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
  11. final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
  12. // 获取 register 参数
  13. boolean register = registeredProviderUrl.getParameter("register", true);
  14. // 向服务提供者与消费者注册表中注册服务提供者
  15. ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
  16. // 根据 register 的值决定是否注册服务
  17. if (register) {
  18. // 向注册中心注册服务
  19. register(registryUrl, registeredProviderUrl);
  20. ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
  21. }
  22. // 获取订阅 URL,比如:
  23. // provider://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?category=configurators&check=false&anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
  24. final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
  25. // 创建监听器
  26. final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
  27. overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
  28. // 向注册中心进行订阅 override 数据
  29. registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
  30. // 创建并返回 DestroyableExporter
  31. return new DestroyableExporter(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
  32. }

上面代码看起来比较复杂,主要做如下一些操作:

调用 doLocalExport 导出服务

向注册中心注册服务

向注册中心进行订阅 override 数据

创建并返回 DestroyableExporter

在以上操作中,除了创建并返回 DestroyableExporter 没啥难度外,其他几步操作都不是很简单。这其中,导出服务和注册服务是本章要重点分析的逻辑。 订阅 override 数据这个是非重点内容,后面会简单介绍一下。下面开始本节的分析,先来分析 doLocalExport 方法的逻辑,如下:

</>复制代码

  1. private ExporterChangeableWrapper doLocalExport(final Invoker originInvoker) {
  2. String key = getCacheKey(originInvoker);
  3. // 访问缓存
  4. ExporterChangeableWrapper exporter = (ExporterChangeableWrapper) bounds.get(key);
  5. if (exporter == null) {
  6. synchronized (bounds) {
  7. exporter = (ExporterChangeableWrapper) bounds.get(key);
  8. if (exporter == null) {
  9. // 创建 Invoker 为委托类对象
  10. final Invoker invokerDelegete = new InvokerDelegete(originInvoker, getProviderUrl(originInvoker));
  11. // 调用 protocol 的 export 方法导出服务
  12. exporter = new ExporterChangeableWrapper((Exporter) protocol.export(invokerDelegete), originInvoker);
  13. // 写缓存
  14. bounds.put(key, exporter);
  15. }
  16. }
  17. }
  18. return exporter;
  19. }

上面的代码是典型的双重检查,这个大家应该都知道。接下来,我们把重点放在 Protocol 的 export 方法上。假设运行时协议为 dubbo,此处的 protocol 会在运行时加载 DubboProtocol,并调用 DubboProtocol 的 export 方法。我们目光转移到 DubboProtocol 的 export 方法上,相关分析如下:

</>复制代码

  1. public Exporter export(Invoker invoker) throws RpcException {
  2. URL url = invoker.getUrl();
  3. // 获取服务标识,理解成服务坐标也行。由服务组名,服务名,服务版本号以及端口组成。比如:
  4. // demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880
  5. String key = serviceKey(url);
  6. // 创建 DubboExporter
  7. DubboExporter exporter = new DubboExporter(invoker, key, exporterMap);
  8. // 将 键值对放入缓存中
  9. exporterMap.put(key, exporter);
  10. // 以下代码应该和本地存根有关,代码不难看懂,但具体用途暂时不清楚,先忽略
  11. Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
  12. Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
  13. if (isStubSupportEvent && !isCallbackservice) {
  14. String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
  15. if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
  16. // 省略日志打印代码
  17. } else {
  18. stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
  19. }
  20. }
  21. // 启动服务器
  22. openServer(url);
  23. // 优化序列化
  24. optimizeSerialization(url);
  25. return exporter;
  26. }

如上,我们重点关注 DubboExporter 的创建以及 openServer 方法,其他逻辑看不懂也没关系,不影响理解服务导出过程。另外,DubboExporter 的代码比较简单,就不分析了。下面分析 openServer 方法。

</>复制代码

  1. private void openServer(URL url) {
  2. // 获取 host:port,并将其作为服务器实例的 key,用于标识当前的服务器实例
  3. String key = url.getAddress();
  4. boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
  5. if (isServer) {
  6. // 访问缓存
  7. ExchangeServer server = serverMap.get(key);
  8. if (server == null) {
  9. // 创建服务器实例
  10. serverMap.put(key, createServer(url));
  11. } else {
  12. // 服务器已创建,则根据 url 中的配置重置服务器
  13. server.reset(url);
  14. }
  15. }
  16. }

如上,在同一台机器上(单网卡),同一个端口上仅允许启动一个服务器实例。若某个端口上已有服务器实例,此时则调用 reset 方法重置服务器的一些配置。考虑到篇幅问题,关于服务器实例重置的代码就不分析了。接下来分析服务器实例的创建过程。如下:

</>复制代码

  1. private ExchangeServer createServer(URL url) {
  2. url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY,
  3. // 添加心跳检测配置到 url 中
  4. url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
  5. // 获取 server 参数,默认为 netty
  6. String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
  7. // 通过 SPI 检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常
  8. if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
  9. throw new RpcException("Unsupported server type: " + str + ", url: " + url);
  10. // 添加编码解码器参数
  11. url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
  12. ExchangeServer server;
  13. try {
  14. // 创建 ExchangeServer
  15. server = Exchangers.bind(url, requestHandler);
  16. } catch (RemotingException e) {
  17. throw new RpcException("Fail to start server...");
  18. }
  19. // 获取 client 参数,可指定 netty,mina
  20. str = url.getParameter(Constants.CLIENT_KEY);
  21. if (str != null && str.length() > 0) {
  22. // 获取所有的 Transporter 实现类名称集合,比如 supportedTypes = [netty, mina]
  23. Set supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
  24. // 检测当前 Dubbo 所支持的 Transporter 实现类名称列表中,
  25. // 是否包含 client 所表示的 Transporter,若不包含,则抛出异常
  26. if (!supportedTypes.contains(str)) {
  27. throw new RpcException("Unsupported client type...");
  28. }
  29. }
  30. return server;
  31. }

如上,createServer 包含三个核心的操作。第一是检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常。第二是创建服务器实例。第三是检测是否支持 client 参数所表示的 Transporter 拓展,不存在也是抛出异常。两次检测操作所对应的代码比较直白了,无需多说。但创建服务器的操作目前还不是很清晰,我们继续往下看。

</>复制代码

  1. public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
  2. if (url == null) {
  3. throw new IllegalArgumentException("url == null");
  4. }
  5. if (handler == null) {
  6. throw new IllegalArgumentException("handler == null");
  7. }
  8. url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
  9. // 获取 Exchanger,默认为 HeaderExchanger。
  10. // 紧接着调用 HeaderExchanger 的 bind 方法创建 ExchangeServer 实例
  11. return getExchanger(url).bind(url, handler);
  12. }

上面代码比较简单,就不多说了。下面看一下 HeaderExchanger 的 bind 方法。

</>复制代码

  1. public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
  2. // 创建 HeaderExchangeServer 实例,该方法包含了多步操作,本别如下:
  3. // 1. new HeaderExchangeHandler(handler)
  4. // 2. new DecodeHandler(new HeaderExchangeHandler(handler))
  5. // 3. Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
  6. return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
  7. }

HeaderExchanger 的 bind 方法包含的逻辑比较多,但目前我们仅需关心 Transporters 的 bind 方法逻辑即可。该方法的代码如下:

</>复制代码

  1. public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
  2. if (url == null) {
  3. throw new IllegalArgumentException("url == null");
  4. }
  5. if (handlers == null || handlers.length == 0) {
  6. throw new IllegalArgumentException("handlers == null");
  7. }
  8. ChannelHandler handler;
  9. if (handlers.length == 1) {
  10. handler = handlers[0];
  11. } else {
  12. // 如果 handlers 元素数量大于1,则创建 ChannelHandler 分发器
  13. handler = new ChannelHandlerDispatcher(handlers);
  14. }
  15. // 获取自适应 Transporter 实例,并调用实例方法
  16. return getTransporter().bind(url, handler);
  17. }

如上,getTransporter() 方法获取的 Transporter 是在运行时动态创建的,类名为 Transporter$Adaptive,也就是自适应拓展类。我在[上一篇文章](http://www.tianxiaobo.com/2018/10/13/Dubbo-%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90-%E8%87%AA%E9%80%82%E5%BA%94%E6%8B%93%E5%B1%95%E5%8E%9F%E7%90%86/)中详细分析了自适应拓展类的生成过程,对自适应拓展类不了解的同学可以参考我之前的文章,这里不再赘述。Transporter$Adaptive 会在运行时根据传入的 URL 参数决定加载什么类型的 Transporter,默认为 NettyTransporter。下面我们继续跟下去,这次分析的是 NettyTransporter 的 bind 方法。

</>复制代码

  1. public Server bind(URL url, ChannelHandler listener) throws RemotingException {
  2. // 创建 NettyServer
  3. return new NettyServer(url, listener);
  4. }

这里仅有一句创建 NettyServer 的代码,没啥好讲的,我们继续向下看。

</>复制代码

  1. public class NettyServer extends AbstractServer implements Server {
  2. public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
  3. // 调用父类构造方法
  4. super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
  5. }
  6. }
  7. public abstract class AbstractServer extends AbstractEndpoint implements Server {
  8. public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
  9. // 调用父类构造方法,这里就不用跟进去了,没什么复杂逻辑
  10. super(url, handler);
  11. localAddress = getUrl().toInetSocketAddress();
  12. // 获取 ip 和端口
  13. String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
  14. int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
  15. if (ur

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/71977.html

相关文章

  • Dubbo 2.7.1 踩坑记

    摘要:面试题服务提供者能实现失效踢出是什么原理高频题服务宕机的时候,该节点由于是持久节点会永远存在,而且当服务再次重启的时候会将重新注册一个新节点。 Dubbo 2.7 版本增加新特性,新系统开始使用 Dubbo 2.7.1 尝鲜新功能。使用过程中不慎踩到这个版本的 Bug。 系统架构 Spring Boot 2.14-Release + Dubbo 2.7.1 现象 Dubbo 服务者启动...

    wudengzan 评论0 收藏0
  • dubbo源码解析(四十四)服务暴露过程

    摘要:服务暴露过程目标从源码的角度分析服务暴露过程。导出服务,包含暴露服务到本地,和暴露服务到远程两个过程。其中服务暴露的第八步已经没有了。将泛化调用版本号或者等信息加入获得服务暴露地址和端口号,利用内数据组装成。 dubbo服务暴露过程 目标:从源码的角度分析服务暴露过程。 前言 本来这一篇一个写异步化改造的内容,但是最近我一直在想,某一部分的优化改造该怎么去撰写才能更加的让读者理解。我觉...

    light 评论0 收藏0
  • Dubbo 源码分析 - 集群容错之 Cluster

    摘要:集群用途是将多个服务提供者合并为一个,并将这个暴露给服务消费者。比如发请求,接受服务提供者返回的数据等。如果包含,表明对应的服务提供者可能因网络原因未能成功提供服务。如果不包含,此时还需要进行可用性检测,比如检测服务提供者网络连通性等。 1.简介 为了避免单点故障,现在的应用至少会部署在两台服务器上。对于一些负载比较高的服务,会部署更多台服务器。这样,同一环境下的服务提供者数量会大于1...

    denson 评论0 收藏0
  • Dubbo 源码分析 - 集群容错之 Router

    摘要:源码分析条件路由规则有两个条件组成,分别用于对服务消费者和提供者进行匹配。如果服务提供者匹配条件为空,表示对某些服务消费者禁用服务。此时第六次循环分隔符,,。第二个和第三个参数来自方法的参数列表,这两个参数分别为服务提供者和服务消费者。 1. 简介 上一篇文章分析了集群容错的第一部分 -- 服务目录 Directory。服务目录在刷新 Invoker 列表的过程中,会通过 Router...

    jcc 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<