@Overridepublicvoidsubscribe(URLurl,NotifyListenerlistener){...// 移除 urlremoveFailedSubscribed(url,listener);try{// Sending a subscription request to the server side// 调用子类的订阅方法,这里以 ZookeeperRegistry 为例doSubscribe(url,listener);}catch(Exceptione){Throwablet=e;List<URL>urls=getCacheUrls(url);if(CollectionUtils.isNotEmpty(urls)){notify(url,listener,urls);}else{// If the startup detection is opened, the Exception is thrown directly.booleancheck=getUrl().getParameter(Constants.CHECK_KEY,true)&&url.getParameter(Constants.CHECK_KEY,true);booleanskipFailback=tinstanceofSkipFailbackWrapperException;// 检查 check 参数,如果为 true,表示第一次订阅服务要成功if(check||skipFailback){if(skipFailback){t=t.getCause();}thrownewIllegalStateException("Failed to subscribe "+url+", cause: "+t.getMessage(),t);}else{logger.error(REGISTRY_FAILED_NOTIFY_EVENT,"","","Failed to subscribe "+url+", waiting for retry, cause: "+t.getMessage(),t);}}// 记录失败的url, 稍后定时任务会再次订阅addFailedSubscribed(url,listener);}}
@OverridepublicvoiddoSubscribe(finalURLurl,finalNotifyListenerlistener){try{checkDestroyed();// 订阅所有接口,dubbo-admin 服务会使用,这个不分析if(ANY_VALUE.equals(url.getServiceInterface())){...}else{CountDownLatchlatch=newCountDownLatch(1);try{List<URL>urls=newArrayList<>();/*
Iterate over the category value in URL.
With default settings, the path variable can be when url is a consumer URL:
/dubbo/[service name]/providers,
/dubbo/[service name]/configurators
/dubbo/[service name]/routers
*/// 监听每一个路径for(Stringpath:toCategoriesPath(url)){ConcurrentMap<NotifyListener,ChildListener>listeners=ConcurrentHashMapUtils.computeIfAbsent(zkListeners,url,k->newConcurrentHashMap<>());// 这里把 listener 添加进去了,等 url 更改时,再执行回调函数ChildListenerzkListener=ConcurrentHashMapUtils.computeIfAbsent(listeners,listener,k->newRegistryChildListenerImpl(url,k,latch));if(zkListenerinstanceofRegistryChildListenerImpl){// latch 为了监听到 urls 时,通知主线程((RegistryChildListenerImpl)zkListener).setLatch(latch);}// 创建根路径,比如 /dubbo/${interfaceName}/consumerszkClient.create(path,false,true);// 获取所有的子路径,用于第一次初始化 urlsList<String>children=zkClient.addChildListener(path,zkListener);if(children!=null){// The invocation point that may cause 1-1.urls.addAll(toUrlsWithEmpty(url,path,children));}}// 执行回调函数notify(url,listener,urls);}finally{// tells the listener to run only after the sync notification of main thread finishes.latch.countDown();}}}catch(Throwablee){...}}