Dubbo(三):深入理解Dubbo原始碼之如何將服務釋出到註冊中心
一、前言
前面有說到Dubbo的服務發現機制,也就是SPI,那既然Dubbo內部實現了更加強大的服務發現機制,現在我們就來一起看看Dubbo在發現服務後需要做什麼才能將服務註冊到註冊中心中。
二、Dubbo服務註冊簡介
首先需要明白的是Dubbo是依賴於Spring容器的(至於為什麼在上篇部落格中有介紹),Dubbo服務註冊過程也是始於Spring容器釋出重新整理事件。而後Dubbo在接收到事件後,就會進行服務註冊,整個邏輯大致分為三個部分:
1、檢查引數,組裝URL:服務消費方是通過URL拿到服務提供者的,所以我們需要為服務提供者配置好對應的URL。
2、匯出服務到本地和遠端:這裡的本地指的是JVM,遠端指的是實現invoke,使得服務消費方能夠通過invoke呼叫到服務。
3、向註冊中心註冊服務:能夠讓服務消費方知道服務提供方提供了那個服務。
三、接收Spring容器重新整理事件
在簡介中我們提到Dubbo服務註冊是始於Spring容器釋出重新整理事件,那麼Dubbo是如何接收該事件的呢?
在我們平常編寫provider的介面實現類時,都會打上@Service註解,從而這個標註這個類屬於ServiceBean。在ServiceBean中有這樣一個方法onApplicationEvent。該方法會在收到 Spring 上下文重新整理事件後執行服務註冊操作
1 public void onApplicationEvent(ContextRefreshedEvent event) { 2 //是否已匯出 && 是不是已被取消匯出 3 if (!this.isExported() && !this.isUnexported()) { 4 if (logger.isInfoEnabled()) { 5 logger.info("The service ready on spring started. service: " + this.getInterface()); 6 } 7 8 this.export(); 9 } 10 11 }
注意這裡是2.7.3的Dubbo,接收Spring上下文重新整理事件已經不需要設定延遲匯出,而是在匯出的時候檢查配置再決定是否需要延時,所以只有兩個判斷。而在2.6.x版本的Dubbo存在著isDelay的判斷。這個是判斷服務是否延時匯出。這裡說個題外話2.6.x的版本是com.alibaba.dubbo的,而2.7.x是org.apache.dubbo的,而2.7.0也開始代表dubbo從Apache裡畢業了。
在這裡就是Dubbo服務匯出到註冊中心過程的起點。需要我們在服務介面實現類上打上@Service。ServiceBean是Dubbo與Spring 框架進行整合的關鍵,可以看做是兩個框架之間的橋樑。具有同樣作用的類還有ReferenceBean。
四、檢查配置引數以及URL裝配
1、檢查配置
在這一階段Dubbo需要檢查使用者的配置是否合理,或者為使用者補充預設配置。就是從重新整理事件開始,進入export()方法,原始碼解析如下:
1 public void export() { 2 super.export(); 3 this.publishExportEvent(); 4 } 5 6 //進入到ServiceConfig.class中的export。 7 8 public synchronized void export() { 9 //檢查並且更新配置 10 this.checkAndUpdateSubConfigs(); 11 //是否需要匯出 12 if (this.shouldExport()) { 13 //是否需要延時 14 if (this.shouldDelay()) { 15 DELAY_EXPORT_EXECUTOR.schedule(this::doExport, (long)this.getDelay(), TimeUnit.MILLISECONDS); 16 } else { 17 //立刻匯出 18 this.doExport(); 19 } 20 21 } 22 } 23 24 //進入checkAndUpdateSubConfigs。 25 26 public void checkAndUpdateSubConfigs() { 27 //檢查配置項包括provider是否存在,匯出埠是否可用,註冊中心是否可以連線等等 28 this.completeCompoundConfigs(); 29 this.startConfigCenter(); 30 this.checkDefault(); 31 this.checkProtocol(); 32 this.checkApplication(); 33 if (!this.isOnlyInJvm()) { 34 this.checkRegistry(); 35 } 36 //檢查介面內部方法是否不為空 37 this.refresh(); 38 this.checkMetadataReport(); 39 if (StringUtils.isEmpty(this.interfaceName)) { 40 throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!"); 41 } else { 42 if (this.ref instanceof GenericService) { 43 this.interfaceClass = GenericService.class; 44 if (StringUtils.isEmpty(this.generic)) { 45 this.generic = Boolean.TRUE.toString(); 46 } 47 } else { 48 try { 49 this.interfaceClass = Class.forName(this.interfaceName, true, Thread.currentThread().getContextClassLoader()); 50 } catch (ClassNotFoundException var5) { 51 throw new IllegalStateException(var5.getMessage(), var5); 52 } 53 54 this.checkInterfaceAndMethods(this.interfaceClass, this.methods); 55 this.checkRef(); 56 this.generic = Boolean.FALSE.toString(); 57 } 58 //是否需要匯出服務或者只是在本地執行測試 59 Class stubClass; 60 if (this.local != null) { 61 if ("true".equals(this.local)) { 62 this.local = this.interfaceName + "Local"; 63 } 64 65 try { 66 stubClass = ClassUtils.forNameWithThreadContextClassLoader(this.local); 67 } catch (ClassNotFoundException var4) { 68 throw new IllegalStateException(var4.getMessage(), var4); 69 } 70 71 if (!this.interfaceClass.isAssignableFrom(stubClass)) { 72 throw new IllegalStateException("The local implementation class " + stubClass.getName() + " not implement interface " + this.interfaceName); 73 } 74 } 75 76 if (this.stub != null) { 77 if ("true".equals(this.stub)) { 78 this.stub = this.interfaceName + "Stub"; 79 } 80 81 try { 82 stubClass = ClassUtils.forNameWithThreadContextClassLoader(this.stub); 83 } catch (ClassNotFoundException var3) { 84 throw new IllegalStateException(var3.getMessage(), var3); 85 } 86 87 if (!this.interfaceClass.isAssignableFrom(stubClass)) { 88 throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + this.interfaceName); 89 } 90 } 91 92 this.checkStubAndLocal(this.interfaceClass); 93 this.checkMock(this.interfaceClass); 94 } 95 }View Code
上面的原始碼分析可看出。export方法主要檢查的配置項有@Service標籤的類是否屬性合法。服務提供者是否存在,是否有對應的Application啟動,埠是否能連線,是否有對應的註冊中心等等一些配置,在檢查完這些配置後Dubbo會識別我們此次啟動服務是想在本地啟動進行一些除錯,還是將服務暴露給別人。不想暴露出去可以進行配置
1 <dubbo:provider export="false" />
2、URL裝配
在Dubbo中的URL一般包括以下欄位:protocol,host,port,path,parameters。在檢查配置後會進入到doExport中。
protocol:就是URL最前面的欄位,表示的是協議,一般是:dubbo thrift http zk
host.port:就是對應的IP地址和埠
path:介面名稱
parameters:引數鍵值對
1 protected synchronized void doExport() { 2 if (this.unexported) { 3 throw new IllegalStateException("The service " + this.interfaceClass.getName() + " has already unexported!"); 4 } else if (!this.exported) { 5 this.exported = true; 6 if (StringUtils.isEmpty(this.path)) { 7 this.path = this.interfaceName; 8 } 9 10 this.doExportUrls(); 11 } 12 } 13 14 //進入到doExportUrls 15 private void doExportUrls() { 16 //載入註冊中心連結 17 List<URL> registryURLs = this.loadRegistries(true); 18 //使用遍歷器遍歷protocols,並在每個協議下匯出服務 19 Iterator var2 = this.protocols.iterator(); 20 21 while(var2.hasNext()) { 22 ProtocolConfig protocolConfig = (ProtocolConfig)var2.next(); 23 String pathKey = URL.buildKey((String)this.getContextPath(protocolConfig).map((p) -> { 24 return p + "/" + this.path; 25 }).orElse(this.path), this.group, this.version); 26 ProviderModel providerModel = new ProviderModel(pathKey, this.ref, this.interfaceClass); 27 ApplicationModel.initProviderModel(pathKey, providerModel); 28 this.doExportUrlsFor1Protocol(protocolConfig, registryURLs); 29 } 30 31 } 32 33 //進入到載入註冊中心連結的方法 34 35 protected List<URL> loadRegistries(boolean provider) { 36 List<URL> registryList = new ArrayList(); 37 if (CollectionUtils.isNotEmpty(this.registries)) { 38 Iterator var3 = this.registries.iterator(); 39 //迴圈的從註冊連結串列中拿取地址及配置 40 label47: 41 while(true) { 42 RegistryConfig config; 43 String address; 44 do { 45 if (!var3.hasNext()) { 46 return registryList; 47 } 48 49 config = (RegistryConfig)var3.next(); 50 address = config.getAddress(); 51 //address為空就預設為0.0.0.0 52 if (StringUtils.isEmpty(address)) { 53 address = "0.0.0.0"; 54 } 55 } while("N/A".equalsIgnoreCase(address)); 56 57 Map<String, String> map = new HashMap(); 58 // 新增 ApplicationConfig 中的欄位資訊到 map 中 59 appendParameters(map, this.application); 60 // 新增 RegistryConfig 欄位資訊到 map 中 61 appendParameters(map, config); 62 // 新增 path,protocol 等資訊到 map 中 63 map.put("path", RegistryService.class.getName()); 64 appendRuntimeParameters(map); 65 if (!map.containsKey("protocol")) { 66 map.put("protocol", "dubbo"); 67 } 68 // 解析得到 URL 列表,address 可能包含多個註冊中心 ip, 69 // 因此解析得到的是一個 URL 列表 70 List<URL> urls = UrlUtils.parseURLs(address, map); 71 Iterator var8 = urls.iterator(); 72 73 while(true) { 74 URL url; 75 do { 76 if (!var8.hasNext()) { 77 continue label47; 78 } 79 80 url = (URL)var8.next(); 81 //// 將 URL 協議頭設定為 registry 82 url = URLBuilder.from(url).addParameter("registry", url.getProtocol()).setProtocol("registry").build(); 83 // 通過判斷條件,決定是否新增 url 到 registryList 中,條件如下: 84 // (服務提供者 && register = true 或 null) || (非服務提供者 && subscribe = true 或 null) 85 } while((!provider || !url.getParameter("register", true)) && (provider || !url.getParameter("subscribe", true))); 86 87 //新增url到registryList中 88 registryList.add(url); 89 } 90 } 91 } else { 92 return registryList; 93 } 94 }View Code
loadRegistries方法主要包含如下的邏輯:
1、構建引數對映集合,也就是 map
2、構建註冊中心連結列表
3、遍歷連結列表,並根據條件決定是否將其新增到 registryList 中
實際上因為Dubbo現如今支援很多註冊中心,所以對於一些註冊中心的URL也要進行遍歷構建。這裡是生成註冊中心的URL。還未生成Dubbo服務的URL。比如說使用的是Zookeeper註冊中心,可能從loadRegistries中拿到的就是:
registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.7.3&pid=1528&qos.port=22222®istry=zookeeper×tamp=1530743640901
這種型別的URL,表示這是一個註冊協議,現在可以根據這個URL定位到註冊中心去了。服務介面是RegistryService,registry的型別為zookeeper。可是我們還未生成Dubbo服務提供方的URL所以接著看下面程式碼
然後進行到doExportUrlsFor1Protocol(裝配Dubbo服務的URL並且實行釋出)
1 private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { 2 //首先是將一些資訊,比如版本、時間戳、方法名以及各種配置物件的欄位資訊放入到 map 中 3 //map 中的內容將作為 URL 的查詢字串。構建好 map 後,緊接著是獲取上下文路徑、主機名以及埠號等資訊。 4 //最後將 map 和主機名等資料傳給 URL 構造方法建立 URL 物件。需要注意的是,這裡出現的 URL 並非 java.net.URL,而是 com.alibaba.dubbo.common.URL。 5 String name = protocolConfig.getName(); 6 // 如果協議名為空,或空串,則將協議名變數設定為 dubbo 7 if (StringUtils.isEmpty(name)) { 8 name = "dubbo"; 9 } 10 11 Map<String, String> map = new HashMap(); 12 // 新增 side、版本、時間戳以及程序號等資訊到 map 中 13 map.put("side", "provider"); 14 appendRuntimeParameters(map); 15 // 通過反射將物件的欄位資訊新增到 map 中 16 appendParameters(map, this.metrics); 17 appendParameters(map, this.application); 18 appendParameters(map, this.module); 19 appendParameters(map, this.provider); 20 appendParameters(map, protocolConfig); 21 appendParameters(map, this); 22 String scope; 23 Iterator metadataReportService; 24 // methods 為 MethodConfig 集合,MethodConfig 中儲存了 <dubbo:method> 標籤的配置資訊 25 if (CollectionUtils.isNotEmpty(this.methods)) { 26 Iterator var5 = this.methods.iterator(); 27 //檢測 <dubbo:method> 標籤中的配置資訊,並將相關配置新增到 map 中 28 label166: 29 while(true) { 30 MethodConfig method; 31 List arguments; 32 do { 33 if (!var5.hasNext()) { 34 break label166; 35 } 36 37 method = (MethodConfig)var5.next(); 38 appendParameters(map, method, method.getName()); 39 String retryKey = method.getName() + ".retry"; 40 if (map.containsKey(retryKey)) { 41 scope = (String)map.remove(retryKey); 42 if ("false".equals(scope)) { 43 map.put(method.getName() + ".retries", "0"); 44 } 45 } 46 47 arguments = method.getArguments(); 48 } while(!CollectionUtils.isNotEmpty(arguments)); 49 50 metadataReportService = arguments.iterator(); 51 52 while(true) { 53 ArgumentConfig argument; 54 Method[] methods; 55 do { 56 do { 57 while(true) { 58 if (!metadataReportService.hasNext()) { 59 continue label166; 60 } 61 62 argument = (ArgumentConfig)metadataReportService.next(); 63 if (argument.getType() != null && argument.getType().length() > 0) { 64 methods = this.interfaceClass.getMethods(); 65 break; 66 } 67 68 if (argument.getIndex() == -1) { 69 throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>"); 70 } 71 72 appendParameters(map, argument, method.getName() + "." + argument.getIndex()); 73 } 74 } while(methods == null); 75 } while(methods.length <= 0); 76 77 for(int i = 0; i < methods.length; ++i) { 78 String methodName = methods[i].getName(); 79 if (methodName.equals(method.getName())) { 80 Class<?>[] argtypes = methods[i].getParameterTypes(); 81 if (argument.getIndex() != -1) { 82 if (!argtypes[argument.getIndex()].getName().equals(argument.getType())) { 83 throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType()); 84 } 85 86 appendParameters(map, argument, method.getName() + "." + argument.getIndex()); 87 } else { 88 for(int j = 0; j < argtypes.length; ++j) { 89 Class<?> argclazz = argtypes[j]; 90 if (argclazz.getName().equals(argument.getType())) { 91 appendParameters(map, argument, method.getName() + "." + j); 92 if (argument.getIndex() != -1 && argument.getIndex() != j) { 93 throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType()); 94 } 95 } 96 } 97 } 98 } 99 } 100 } 101 } 102 } 103 104 String host; 105 // 檢測 generic 是否為 "true",並根據檢測結果向 map 中新增不同的資訊 106 if (ProtocolUtils.isGeneric(this.generic)) { 107 map.put("generic", this.generic); 108 map.put("methods", "*"); 109 } else { 110 host = Version.getVersion(this.interfaceClass, this.version); 111 if (host != null && host.length() > 0) { 112 map.put("revision", host); 113 } 114 // 為介面生成包裹類 Wrapper,Wrapper 中包含了介面的詳細資訊,比如介面方法名陣列,欄位資訊等 115 String[] methods = Wrapper.getWrapper(this.interfaceClass).getMethodNames(); 116 if (methods.length == 0) { 117 logger.warn("No method found in service interface " + this.interfaceClass.getName()); 118 map.put("methods", "*"); 119 } else { 120 // 將逗號作為分隔符連線方法名,並將連線後的字串放入 map 中 121 map.put("methods", StringUtils.join(new HashSet(Arrays.asList(methods)), ",")); 122 } 123 } 124 // 新增 token 到 map 中 125 if (!ConfigUtils.isEmpty(this.token)) { 126 if (ConfigUtils.isDefault(this.token)) { 127 map.put("token", UUID.randomUUID().toString()); 128 } else { 129 map.put("token", this.token); 130 } 131 } 132 //獲取host和port 133 host = this.findConfigedHosts(protocolConfig, registryURLs, map); 134 Integer port = this.findConfigedPorts(protocolConfig, name, map); 135 // 獲取上下文路徑並且組裝URL 136 URL url = new URL(name, host, port, (String)this.getContextPath(protocolConfig).map((p) -> { 137 return p + "/" + this.path; 138 }).orElse(this.path), map); 139 if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) { 140 // 載入 ConfiguratorFactory,並生成 Configurator 例項,然後通過例項配置 url,使用了前面提到的SPI機制 141 url = ((ConfiguratorFactory)ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getExtension(url.getProtocol())).getConfigurator(url).configure(url); 142 } 143 //下面邏輯主要分三步 144 // 如果 scope = none,則什麼都不做 145 // scope != remote,匯出到本地 146 // scope != local,匯出到遠端 147 scope = url.getParameter("scope"); 148 if (!"none".equalsIgnoreCase(scope)) { 149 if (!"remote".equalsIgnoreCase(scope)) { 150 this.exportLocal(url); 151 } 152 153 if (!"local".equalsIgnoreCase(scope)) { 154 if (!this.isOnlyInJvm() && logger.isInfoEnabled()) { 155 logger.info("Export dubbo service " + this.interfaceClass.getName() + " to url " + url); 156 } 157 158 if (CollectionUtils.isNotEmpty(registryURLs)) { 159 metadataReportService = registryURLs.iterator(); 160 161 while(metadataReportService.hasNext()) { 162 URL registryURL = (URL)metadataReportService.next(); 163 if (!"injvm".equalsIgnoreCase(url.getProtocol())) { 164 url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic")); 165 URL monitorUrl = this.loadMonitor(registryURL); 166 if (monitorUrl != null) { 167 url = url.addParameterAndEncoded("monitor", monitorUrl.toFullString()); 168 } 169 170 if (logger.isInfoEnabled()) { 171 logger.info("Register dubbo service " + this.interfaceClass.getName() + " url " + url + " to registry " + registryURL); 172 } 173 174 String proxy = url.getParameter("proxy"); 175 if (StringUtils.isNotEmpty(proxy)) { 176 registryURL = registryURL.addParameter("proxy", proxy); 177 } 178 // 為服務提供類(ref)生成 Invoker 179 Invoker<?> invoker = PROXY_FACTORY.getInvoker(this.ref, this.interfaceClass, registryURL.addParameterAndEncoded("export", url.toFullString())); 180 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); 181 // 匯出服務,並生成 Exporter 182 Exporter<?> exporter = protocol.export(wrapperInvoker); 183 this.exporters.add(exporter); 184 } 185 } 186 // 不存在註冊中心,僅匯出服務 187 } else { 188 Invoker<?> invoker = PROXY_FACTORY.getInvoker(this.ref, this.interfaceClass, url); 189 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); 190 Exporter<?> exporter = protocol.export(wrapperInvoker); 191 this.exporters.add(exporter); 192 } 193 194 metadataReportService = null; 195 MetadataReportService metadataReportService; 196 if ((metadataReportService = this.getMetadataReportService()) != null) { 197 metadataReportService.publishProvider(url); 198 } 199 } 200 } 201 202 this.urls.add(url); 203 }View Code
上面的原始碼前半段是進行URL裝配,這個URL就是Dubbo服務的URL,大致如下:
dubbo://192.168.1.6:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.1.6&bind.port=20880&dubbo=2.7.3&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=5744&qos.port=22222&side=provider×tamp=1530746052546
這個URL表示它是一個dubbo協議(DubboProtocol),地址是當前伺服器的ip,埠是要暴露的服務的埠號,可以從dubbo:protocol配置,服務介面為dubbo:service配置釋出的介面。
後半段主要是判斷scope變數來決定是否將服務匯出遠端或者本地,匯出到本地實際上很簡單隻需要生成Invoker。當匯出到遠端就需要新增監視器還要生成invoker。監視器能讓Dubbo定時檢視註冊中心掛了沒。會丟擲指定異常,而invoker使得服務消費方能夠遠端呼叫到服務。並且還會進行註冊到註冊中心下面我們接著來看看服務的釋出。因為Invoker比較重要在消費者和提供者中都有,所以這個後面會單獨拿出來進行探討。
五、服務釋出本地與遠端
1、服務釋出到本地
1 private void exportLocal(URL url) { 2 //進行本地URL的構建 3 URL local = URLBuilder.from(url).setProtocol("injvm").setHost("127.0.0.1").setPort(0).build(); 4 //根據本地的URL來實現對應的Invoker 5 Exporter<?> exporter = protocol.export(PROXY_FACTORY.getInvoker(this.ref, this.interfaceClass, local)); 6 this.exporters.add(exporter); 7 logger.info("Export dubbo service " + this.interfaceClass.getName() + " to local registry url : " + local); 8 }View Code
可見釋出到本地是重新構建了protocol,injvm就是代表在本地的JVM裡,host與port都統一預設127.0.0.1:0。
2、服務釋出到遠端
1 public <T> Exporter<T> export(Invoker<T> originInvoker) throws RpcException { 2 //獲取註冊中心的URL,比如:zookeeper://127.0.0.1:2181/...... 3 URL registryUrl = this.getRegistryUrl(originInvoker); 4 //獲取所有服務提供者的URL,比如:dubbo://192.168.1.6:20880/....... 5 URL providerUrl = this.getProviderUrl(originInvoker); 6 //獲取訂閱URL,比如:provider://192.168.1.6:20880/...... 7 URL overrideSubscribeUrl = this.getSubscribedOverrideUrl(providerUrl); 8 //建立監聽器 9 RegistryProtocol.OverrideListener overrideSubscribeListener = new RegistryProtocol.OverrideListener(overrideSubscribeUrl, originInvoker); 10 //向訂閱中心推送監聽器 11 this.overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); 12 providerUrl = this.overrideUrlWithConfig(providerUrl, overrideSubscribeListener); 13 //匯出服務 14 RegistryProtocol.ExporterChangeableWrapper<T> exporter = this.doLocalExport(originInvoker, providerUrl); 15 Registry registry = this.getRegistry(originInvoker); 16 //獲取已註冊的服務提供者的URL,比如dubbo://192.168.1.6:20880/....... 17 URL registeredProviderUrl = this.getRegisteredProviderUrl(providerUrl, registryUrl); 18 // 向服務提供者與消費者登錄檔中註冊服務提供者 19 ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); 20 // 獲取 register 引數 21 boolean register = registeredProviderUrl.getParameter("register", true); 22 // 根據 register 的值決定是否註冊服務 23 if (register) { 24 this.register(registryUrl, registeredProviderUrl); 25 providerInvokerWrapper.setReg(true); 26 } 27 // 向註冊中心進行訂閱 override 資料 28 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); 29 exporter.setRegisterUrl(registeredProviderUrl); 30 exporter.setSubscribeUrl(overrideSubscribeUrl); 31 // 建立並返回 DestroyableExporter 32 return new RegistryProtocol.DestroyableExporter(exporter); 33 }View Code
上面的原始碼主要是根據前面生成的URL進行服務的釋出和註冊(註冊在下一節展開原始碼)。當執行到doLocalExport也就是釋出本地服務到遠端時候會呼叫 DubboProtocol 的 export 方法大致會經歷下面一些步驟來匯出服務
-
- 從Invoker獲取providerUrl,構建serviceKey(group/service:version:port),構建DubboExporter並以serviceKey為key放入本地map快取
- 處理url攜帶的本地存根和callback回撥
- 根據url開啟伺服器埠,暴露本地服務。先以url.getAddress為key查詢本地快取serverMap獲取ExchangeServer,如果不存在,則通過createServer建立。
- createServer方法,設定心跳時間,判斷url中的傳輸方式(key=server,對應Transporter服務)是否支援,設定codec=dubbo,最後根據url和ExchangeHandler物件繫結server返回,這裡的ExchangeHandler非常重要,它就是消費方呼叫時,底層通訊層回撥的Handler,從而獲取包含實際Service實現的Invoker執行器,它是定義在DubboProtocol類中的ExchangeHandlerAdapter內部類。
- 返回DubboExporter物件
到這裡大致的服務釋出圖如下:
六、服務註冊
服務註冊操作對於 Dubbo 來說不是必需的,通過服務直連的方式就可以繞過註冊中心。但通常我們不會這麼做,直連方式不利於服務治理,僅推薦在測試服務時使用。對於 Dubbo 來說,註冊中心雖不是必需,但卻是必要的。原始碼如下:
1 public void register(URL url) { 2 super.register(url); 3 failedRegistered.remove(url); 4 failedUnregistered.remove(url); 5 try { 6 // 模板方法,由子類實現 7 doRegister(url); 8 } catch (Exception e) { 9 Throwable t = e; 10 11 // 獲取 check 引數,若 check = true 將會直接丟擲異常 12 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) 13 && url.getParameter(Constants.CHECK_KEY, true) 14 && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); 15 boolean skipFailback = t instanceof SkipFailbackWrapperException; 16 if (check || skipFailback) { 17 if (skipFailback) { 18 t = t.getCause(); 19 } 20 throw new IllegalStateException("Failed to register"); 21 } else { 22 logger.error("Failed to register"); 23 } 24 25 // 記錄註冊失敗的連結 26 failedRegistered.add(url); 27 } 28 } 29 30 //進入doRegister方法 31 32 protected void doRegister(URL url) { 33 try { 34 // 通過 Zookeeper 客戶端建立節點,節點路徑由 toUrlPath 方法生成,路徑格式如下: 35 // /${group}/${serviceInterface}/providers/${url} 36 // 比如 37 // /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1...... 38 zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); 39 } catch (Throwable e) { 40 throw new RpcException("Failed to register..."); 41 } 42 } 43 44 //進入create方法 45 46 public void create(String path, boolean ephemeral) { 47 if (!ephemeral) { 48 // 如果要建立的節點型別非臨時節點,那麼這裡要檢測節點是否存在 49 if (checkExists(path)) { 50 return; 51 } 52 } 53 int i = path.lastIndexOf('/'); 54 if (i > 0) { 55 // 遞迴建立上一級路徑 56 create(path.substring(0, i), false); 57 } 58 59 // 根據 ephemeral 的值建立臨時或持久節點 60 if (ephemeral) { 61 createEphemeral(path); 62 } else { 63 createPersistent(path); 64 } 65 } 66 67 //進入createEphemeral 68 69 public void createEphemeral(String path) { 70 try { 71 // 通過 Curator 框架建立節點 72 client.create().withMode(CreateMode.EPHEMERAL).forPath(path); 73 } catch (NodeExistsException e) { 74 } catch (Exception e) { 75 throw new IllegalStateException(e.getMessage(), e); 76 } 77 }View Code
根據上面的方法,可以將當前服務對應的配置資訊(儲存在URL中的)註冊到註冊中心/dubbo/org.apache.dubbo.demo.DemoService/providers/ 。裡面直接使用了Curator進行建立節點(Curator是Netflix公司開源的一套zookeeper客戶端框架)
七、總結
到這裡Dubbo的服務註冊流程終於是解釋完。核心在於Dubbo使用規定好的URL+SPI進行尋找和發現服務,通過URL定位註冊中心,再通過將服務的URL釋出到註冊中心從而使得消費者可以知道服務的有哪些,裡面可以看見對於URL這種複雜的物件並且需要經常更改的,通常採用建造者模式。而2.7.3版本的Dubbo原始碼也使用了Java8以後的新特性Lambda表示式來構建隱式函式。而一整套流程下來可以在ZooInspector這個zk視覺化客戶端看見我們建立的節點,前提是註冊中心為zk。
&n