1. 程式人生 > >Dubbo原始碼分析(三):Dubbo之服務端(Service)

Dubbo原始碼分析(三):Dubbo之服務端(Service)

    

    如上圖所示的Dubbo的暴露服務的過程,不難看出它也和消費者端很像,也需要一個像reference的物件來維護service關聯的所有物件及其屬性,這裡的reference就是provider。由於ServiceBean實現了



InitializingBean介面,所有在Spring例項化這個bean後會呼叫介面方法afterPropertiesSet:

  1. public void afterPropertiesSet() throws Exception {

  2. //如果沒有配置provider

  3. if (getProvider() == null) {

  4. //獲取IOC容器裡的所有provider

  5. Map<String, ProviderConfig> providerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProviderConfig.class, false, false);

  6. if (providerConfigMap != null && providerConfigMap.size() > 0) {

  7. Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false);

  8. if ((protocolConfigMap == null || protocolConfigMap.size() == 0)

  9. && providerConfigMap.size() > 1) { // 相容舊版本

  10. List<ProviderConfig> providerConfigs = new ArrayList<ProviderConfig>();

  11. for (ProviderConfig config : providerConfigMap.values()) {

  12. if (config.isDefault() != null && config.isDefault().booleanValue()) {

  13. providerConfigs.add(config);

  14. }

  15. }

  16. //關聯所有providers

  17. if (providerConfigs.size() > 0) {

  18. setProviders(providerConfigs);

  19. }

  20. } else {

  21. ProviderConfig providerConfig = null;

  22. for (ProviderConfig config : providerConfigMap.values()) {

  23. if (config.isDefault() == null || config.isDefault().booleanValue()) {

  24. if (providerConfig != null) {

  25. throw new IllegalStateException("Duplicate provider configs: " + providerConfig + " and " + config);

  26. }

  27. providerConfig = config;

  28. }

  29. }

  30. if (providerConfig != null) {

  31. setProvider(providerConfig);

  32. }

  33. }

  34. }

  35. }

  36. //如果沒有配置application,且沒有配置provider

  37. if (getApplication() == null

  38. && (getProvider() == null || getProvider().getApplication() == null)) {

  39. //獲取所有applications

  40. Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false, false);

  41. if (applicationConfigMap != null && applicationConfigMap.size() > 0) {

  42. ApplicationConfig applicationConfig = null;

  43. for (ApplicationConfig config : applicationConfigMap.values()) {

  44. if (config.isDefault() == null || config.isDefault().booleanValue()) {

  45. if (applicationConfig != null) {

  46. throw new IllegalStateException("Duplicate application configs: " + applicationConfig + " and " + config);

  47. }

  48. applicationConfig = config;

  49. }

  50. }

  51. //關聯application

  52. if (applicationConfig != null) {

  53. setApplication(applicationConfig);

  54. }

  55. }

  56. }

  57. //如果沒有配置module,且沒有配置provider

  58. if (getModule() == null

  59. && (getProvider() == null || getProvider().getModule() == null)) {

  60. Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class, false, false);

  61. if (moduleConfigMap != null && moduleConfigMap.size() > 0) {

  62. ModuleConfig moduleConfig = null;

  63. for (ModuleConfig config : moduleConfigMap.values()) {

  64. if (config.isDefault() == null || config.isDefault().booleanValue()) {

  65. if (moduleConfig != null) {

  66. throw new IllegalStateException("Duplicate module configs: " + moduleConfig + " and " + config);

  67. }

  68. moduleConfig = config;

  69. }

  70. }

  71. //關聯module

  72. if (moduleConfig != null) {

  73. setModule(moduleConfig);

  74. }

  75. }

  76. }

  77. //如果沒有配置registries,且沒有配置provider

  78. if ((getRegistries() == null || getRegistries().size() == 0)

  79. && (getProvider() == null || getProvider().getRegistries() == null || getProvider().getRegistries().size() == 0)

  80. && (getApplication() == null || getApplication().getRegistries() == null || getApplication().getRegistries().size() == 0)) {

  81. Map<String, RegistryConfig> registryConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class, false, false);

  82. if (registryConfigMap != null && registryConfigMap.size() > 0) {

  83. List<RegistryConfig> registryConfigs = new ArrayList<RegistryConfig>();

  84. for (RegistryConfig config : registryConfigMap.values()) {

  85. if (config.isDefault() == null || config.isDefault().booleanValue()) {

  86. registryConfigs.add(config);

  87. }

  88. }

  89. //關聯registries

  90. if (registryConfigs != null && registryConfigs.size() > 0) {

  91. super.setRegistries(registryConfigs);

  92. }

  93. }

  94. }

  95. //如果沒有配置monitor,且沒有配置provider

  96. if (getMonitor() == null

  97. && (getProvider() == null || getProvider().getMonitor() == null)

  98. && (getApplication() == null || getApplication().getMonitor() == null)) {

  99. Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false, false);

  100. if (monitorConfigMap != null && monitorConfigMap.size() > 0) {

  101. MonitorConfig monitorConfig = null;

  102. for (MonitorConfig config : monitorConfigMap.values()) {

  103. if (config.isDefault() == null || config.isDefault().booleanValue()) {

  104. if (monitorConfig != null) {

  105. throw new IllegalStateException("Duplicate monitor configs: " + monitorConfig + " and " + config);

  106. }

  107. monitorConfig = config;

  108. }

  109. }

  110. //關聯monitor

  111. if (monitorConfig != null) {

  112. setMonitor(monitorConfig);

  113. }

  114. }

  115. }

  116. //如果沒有配置protocol,且沒有配置provider

  117. if ((getProtocols() == null || getProtocols().size() == 0)

  118. && (getProvider() == null || getProvider().getProtocols() == null || getProvider().getProtocols().size() == 0)) {

  119. Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false);

  120. if (protocolConfigMap != null && protocolConfigMap.size() > 0) {

  121. List<ProtocolConfig> protocolConfigs = new ArrayList<ProtocolConfig>();

  122. for (ProtocolConfig config : protocolConfigMap.values()) {

  123. if (config.isDefault() == null || config.isDefault().booleanValue()) {

  124. protocolConfigs.add(config);

  125. }

  126. }

  127. //關聯protocol

  128. if (protocolConfigs != null && protocolConfigs.size() > 0) {

  129. super.setProtocols(protocolConfigs);

  130. }

  131. }

  132. }

  133. //如果沒有配置path

  134. if (getPath() == null || getPath().length() == 0) {

  135. if (beanName != null && beanName.length() > 0

  136. && getInterface() != null && getInterface().length() > 0

  137. && beanName.startsWith(getInterface())) {

  138. setPath(beanName);

  139. }

  140. }

  141. //暴露provider

  142. if (! isDelay()) {

  143. export();

  144. }

  145. }

      Dubbo在確認了所有相關物件都配置後呼叫export方法開始暴露過程:

  1. public synchronized void export() {

  2. //如果provider沒有配置

  3. if (provider != null) {

  4. //如果exporter沒有配置使用provider所關聯的exporter

  5. if (export == null) {

  6. export = provider.getExport();

  7. }

  8. //如果delay(延遲暴露)沒有配置,獲取provider的delay

  9. if (delay == null) {

  10. delay = provider.getDelay();

  11. }

  12. }

  13. //如果不需要暴露介面則直接返回

  14. if (export != null && ! export.booleanValue()) {

  15. return;

  16. }

  17. //如果延遲暴露的時間(毫秒級)是存在的,開啟執行緒並等待delay毫秒後開始暴露介面,否則直接執行暴露介面過程

  18. if (delay != null && delay > 0) {

  19. Thread thread = new Thread(new Runnable() {

  20. public void run() {

  21. try {

  22. Thread.sleep(delay);

  23. } catch (Throwable e) {

  24. }

  25. doExport();

  26. }

  27. });

  28. thread.setDaemon(true);

  29. thread.setName("DelayExportServiceThread");

  30. thread.start();

  31. } else {

  32. doExport();

  33. }

  34. }

  1. protected synchronized void doExport() {

  2. //如果不需要暴露介面則丟擲異常

  3. if (unexported) {

  4. throw new IllegalStateException("Already unexported!");

  5. }

  6. //如果已經暴露則不需要重複暴露

  7. if (exported) {

  8. return;

  9. }

  10. exported = true;

  11. //如果interfaceName沒配置(這樣dubbo就無法找到需要暴露的service物件)則丟擲異常

  12. if (interfaceName == null || interfaceName.length() == 0) {

  13. throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");

  14. }

  15. checkDefault();

  16. //provider已經配置的情況下,如果application、module、registries、monitor、protocol中有未配置的均可以從provider獲取

  17. if (provider != null) {

  18. if (application == null) {

  19. application = provider.getApplication();

  20. }

  21. if (module == null) {

  22. module = provider.getModule();

  23. }

  24. if (registries == null) {

  25. registries = provider.getRegistries();

  26. }

  27. if (monitor == null) {

  28. monitor = provider.getMonitor();

  29. }

  30. if (protocols == null) {

  31. protocols = provider.getProtocols();

  32. }

  33. }

  34. if (module != null) {

  35. if (registries == null) {

  36. registries = module.getRegistries();

  37. }

  38. if (monitor == null) {

  39. monitor = module.getMonitor();

  40. }

  41. }

  42. if (application != null) {

  43. if (registries == null) {

  44. registries = application.getRegistries();

  45. }

  46. if (monitor == null) {

  47. monitor = application.getMonitor();

  48. }

  49. }

  50. if (ref instanceof GenericService) {

  51. interfaceClass = GenericService.class;

  52. if (StringUtils.isEmpty(generic)) {

  53. generic = Boolean.TRUE.toString();

  54. }

  55. } else {

  56. try {

  57. interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()

  58. .getContextClassLoader());

  59. } catch (ClassNotFoundException e) {

  60. throw new IllegalStateException(e.getMessage(), e);

  61. }

  62. checkInterfaceAndMethods(interfaceClass, methods);

  63. checkRef();

  64. generic = Boolean.FALSE.toString();

  65. }

  66. //如果是本地服務

  67. if(local !=null){

  68. //如果是本地服務在interfaceName屬性後面加上Local

  69. if(local=="true"){

  70. local=interfaceName+"Local";

  71. }

  72. Class<?> localClass;

  73. try {

  74. //載入service

  75. localClass = ClassHelper.forNameWithThreadContextClassLoader(local);

  76. } catch (ClassNotFoundException e) {

  77. throw new IllegalStateException(e.getMessage(), e);

  78. }

  79. if(!interfaceClass.isAssignableFrom(localClass)){

  80. throw new IllegalStateException("The local implemention class " + localClass.getName() + " not implement interface " + interfaceName);

  81. }

  82. }

  83. //如果是遠端服務

  84. if(stub !=null){

  85. if(stub=="true"){

  86. stub=interfaceName+"Stub";

  87. }

  88. Class<?> stubClass;

  89. try {

  90. //載入service

  91. stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub);

  92. } catch (ClassNotFoundException e) {

  93. throw new IllegalStateException(e.getMessage(), e);

  94. }

  95. if(!interfaceClass.isAssignableFrom(stubClass)){

  96. throw new IllegalStateException("The stub implemention class " + stubClass.getName() + " not implement interface " + interfaceName);

  97. }

  98. }

  99. //檢查application

  100. checkApplication();

  101. //檢查registries

  102. checkRegistry();

  103. //檢查protocol

  104. checkProtocol();

  105. //將所有這些物件的屬性關聯到provider

  106. appendProperties(this);

  107. checkStubAndMock(interfaceClass);

  108. if (path == null || path.length() == 0) {

  109. path = interfaceName;

  110. }

  111. //暴露地址

  112. doExportUrls();

  113. }

  1. private void doExportUrls() {

  2. //將註冊的所有url匹配上對應的協議在服務端暴露出來

  3. List<URL> registryURLs = loadRegistries(true);

  4. for (ProtocolConfig protocolConfig : protocols) {

  5. doExportUrlsFor1Protocol(protocolConfig, registryURLs);

  6. }

  7. }

  1. private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {

  2. //如果沒配置protocol則預設使用dubbo協議

  3. String name = protocolConfig.getName();

  4. if (name == null || name.length() == 0) {

  5. name = "dubbo";

  6. }

  7. //獲取主機地址

  8. String host = protocolConfig.getHost();

  9. if (provider != null && (host == null || host.length() == 0)) {

  10. host = provider.getHost();

  11. }

  12. boolean anyhost = false;

  13. if (NetUtils.isInvalidLocalHost(host)) {

  14. anyhost = true;

  15. try {

  16. host = InetAddress.getLocalHost().getHostAddress();

  17. } catch (UnknownHostException e) {

  18. logger.warn(e.getMessage(), e);

  19. }

  20. if (NetUtils.isInvalidLocalHost(host)) {

  21. if (registryURLs != null && registryURLs.size() > 0) {

  22. for (URL registryURL : registryURLs) {

  23. try {

  24. //建立socket,連線到註冊中心

  25. Socket socket = new Socket();

  26. try {

  27. SocketAddress addr = new InetSocketAddress(registryURL.getHost(), registryURL.getPort());

  28. socket.connect(addr, 1000);

  29. //獲取服務所在主機地址

  30. host = socket.getLocalAddress().getHostAddress();

  31. break;

  32. } finally {

  33. try {

  34. socket.close();

  35. } catch (Throwable e) {}

  36. }

  37. } catch (Exception e) {

  38. logger.warn(e.getMessage(), e);

  39. }

  40. }

  41. }

  42. if (NetUtils.isInvalidLocalHost(host)) {

  43. host = NetUtils.getLocalHost();

  44. }

  45. }

  46. }

  47. //獲取協議介面號

  48. Integer port = protocolConfig.getPort();

  49. if (provider != null && (port == null || port == 0)) {

  50. port = provider.getPort();

  51. }

  52. final int defaultPort = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort();

  53. if (port == null || port == 0) {

  54. port = defaultPort;

  55. }

  56. if (port == null || port <= 0) {

  57. port = getRandomPort(name);

  58. if (port == null || port < 0) {

  59. port = NetUtils.getAvailablePort(defaultPort);

  60. putRandomPort(name, port);

  61. }

  62. logger.warn("Use random available port(" + port + ") for protocol " + name);

  63. }

  64. //獲取application、module、provider、protocol、exporter、registries、monitor所有屬性

  65. Map<String, String> map = new HashMap<String, String>();

  66. if (anyhost) {

  67. map.put(Constants.ANYHOST_KEY, "true");

  68. }

  69. map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);

  70. map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());

  71. map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));

  72. if (ConfigUtils.getPid() > 0) {

  73. map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));

  74. }

  75. appendParameters(map, application);

  76. appendParameters(map, module);

  77. appendParameters(map, provider, Constants.DEFAULT_KEY);

  78. appendParameters(map, protocolConfig);

  79. appendParameters(map, this);

  80. if (methods != null && methods.size() > 0) {

  81. for (MethodConfig method : methods) {

  82. appendParameters(map, method, method.getName());

  83. String retryKey = method.getName() + ".retry";

  84. if (map.containsKey(retryKey)) {

  85. String retryValue = map.remove(retryKey);

  86. if ("false".equals(retryValue)) {

  87. map.put(method.getName() + ".retries", "0");

  88. }

  89. }

  90. List<ArgumentConfig> arguments = method.getArguments();

  91. if (arguments != null && arguments.size() > 0) {

  92. for (ArgumentConfig argument : arguments) {

  93. //型別自動轉換.

  94. if(argument.getType() != null && argument.getType().length() >0){

  95. Method[] methods = interfaceClass.getMethods();

  96. //遍歷所有方法

  97. if(methods != null && methods.length > 0){

  98. for (int i = 0; i < methods.length; i++) {

  99. String methodName = methods[i].getName();

  100. //匹配方法名稱,獲取方法簽名.

  101. if(methodName.equals(method.getName())){

  102. Class<?>[] argtypes = methods[i].getParameterTypes();

  103. //一個方法中單個callback

  104. if (argument.getIndex() != -1 ){

  105. if (argtypes[argument.getIndex()].getName().equals(argument.getType())){

  106. appendParameters(map, argument, method.getName() + "." + argument.getIndex());

  107. }else {

  108. throw new IllegalArgumentException("argument config error : the index attribute and type attirbute not match :index :"+argument.getIndex() + ", type:" + argument.getType());

  109. }

  110. } else {

  111. //一個方法中多個callback

  112. for (int j = 0 ;j<argtypes.length ;j++) {

  113. Class<?> argclazz = argtypes[j];

  114. if (argclazz.getName().equals(argument.getType())){

  115. appendParameters(map, argument, method.getName() + "." + j);

  116. if (argument.getIndex() != -1 && argument.getIndex() != j){

  117. throw new IllegalArgumentException("argument config error : the index attribute and type attirbute not match :index :"+argument.getIndex() + ", type:" + argument.getType());

  118. }

  119. }

  120. }

  121. }

  122. }

  123. }

  124. }

  125. }else if(argument.getIndex() != -1){

  126. appendParameters(map, argument, method.getName() + "." + argument.getIndex());

  127. }else {

  128. throw new IllegalArgumentException("argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");

  129. }

  130. }

  131. }

  132. } // end of methods for

  133. }

  134. if (ProtocolUtils.isGeneric(generic)) {

  135. map.put("generic", generic);

  136. map.put("methods", Constants.ANY_VALUE);

  137. } else {

  138. String revision = Version.getVersion(interfaceClass, version);

  139. if (revision != null && revision.length() > 0) {

  140. map.put("revision", revision);

  141. }

  142. String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();

  143. if(methods.length == 0) {

  144. logger.warn("NO method found in service interface " + interfaceClass.getName());

  145. map.put("methods", Constants.ANY_VALUE);

  146. }

  147. else {

  148. map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));

  149. }

  150. }

  151. if (! ConfigUtils.isEmpty(token)) {

  152. if (ConfigUtils.isDefault(token)) {

  153. map.put("token", UUID.randomUUID().toString());

  154. } else {

  155. map.put("token", token);

  156. }

  157. }

  158. if ("injvm".equals(protocolConfig.getName())) {

  159. protocolConfig.setRegister(false);

  160. map.put("notify", "false");

  161. }

  162. // 匯出服務

  163. String contextPath = protocolConfig.getContextpath();

  164. if ((contextPath == null || contextPath.length() == 0) && provider != null) {

  165. contextPath = provider.getContextpath();

  166. }

  167. //建立服務所在url

  168. URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);

  169. if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)

  170. .hasExtension(url.getProtocol())) {

  171. url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)

  172. .getExtension(url.getProtocol()).getConfigurator(url).configure(url);

  173. }

  174. String scope = url.getParameter(Constants.SCOPE_KEY);

  175. //配置為none不暴露

  176. if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

  177. //配置不是remote的情況下做本地暴露 (配置為remote,則表示只暴露遠端服務)

  178. if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {

  179. //暴露的地址是localhost所以遠端無法訪問

  180. exportLocal(url);

  181. }

  182. //如果配置不是local則暴露為遠端服務.(配置為local,則表示只暴露遠端服務)

  183. if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){

  184. if (logger.isInfoEnabled()) {

  185. logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);

  186. }

  187. if (registryURLs != null && registryURLs.size() > 0

  188. && url.getParameter("register", true)) {

  189. for (URL registryURL : registryURLs) {

  190. url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));

  191. URL monitorUrl = loadMonitor(registryURL);

  192. if (monitorUrl != null) {

  193. url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());

  194. }

  195. if (logger.isInfoEnabled()) {

  196. logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);

  197. }

  198. //獲取invoker

  199. Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

  200. //根據協議將invoker暴露成exporter,具體過程是建立一個ExchangeServer,它會繫結一個ServerSocket到配置埠

  201. Exporter<?> exporter = protocol.export(invoker);

  202. //將建立的exporter放進連結串列便於管理

  203. exporters.add(exporter);

  204. }

  205. } else {

  206. Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

  207. Exporter<?> exporter = protocol.export(invoker);

  208. exporters.add(exporter);

  209. }

  210. }

  211. }

  212. this.urls.add(url);

  213. }

      整個暴露服務的流程就是首先檢查配置是否完整然後獲取協議和埠資訊,通知registries自己已經註冊可以提供服務最後建立Server繫結埠。這樣使用者就可以從配置的資訊連線到Server並和Server通訊遠端呼叫方法了。