// 同步引用public<T>Tget(ReferenceConfigBase<T>rc,booleancheck){Stringkey=generator.generateKey(rc);Class<?>type=rc.getInterfaceClass();booleansingleton=rc.getSingleton()==null||rc.getSingleton();Tproxy=null;// Check existing proxy of the same 'key' and 'type' first.if(singleton){// 单例对象,从缓存 referenceKeyMap 中获取proxy=get(key,(Class<T>)type);}else{logger.warn(CONFIG_API_WRONG_USE,"","","Using non-singleton ReferenceConfig and ReferenceCache at the same time may cause memory leak. "+"Call ReferenceConfig#get() directly for non-singleton ReferenceConfig instead of using ReferenceCache#get(ReferenceConfig)");}// 第一次获取if(proxy==null){// 添加到 referenceTypeMapList<ReferenceConfigBase<?>>referencesOfType=ConcurrentHashMapUtils.computeIfAbsent(referenceTypeMap,type,_t->Collections.synchronizedList(newArrayList<>()));referencesOfType.add(rc);// 添加到 referenceKeyMapList<ReferenceConfigBase<?>>referenceConfigList=ConcurrentHashMapUtils.computeIfAbsent(referenceKeyMap,key,_k->Collections.synchronizedList(newArrayList<>()));referenceConfigList.add(rc);// 获取代理对象proxy=rc.get(check);}returnproxy;}
ReferenceConfig#get 获取代理对象
源码位置: org.apache.dubbo.config.ReferenceConfig#get
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 获取代理对象publicTget(booleancheck){...if(ref==null){if(getScopeModel().isLifeCycleManagedExternally()){// prepare model for referencegetScopeModel().getDeployer().prepare();}else{// ensure start module, compatible with old api usage// 启动模块getScopeModel().getDeployer().start();}// 初始化init(check);}returnref;}
// 初始化protectedsynchronizedvoidinit(booleancheck){if(initialized&&ref!=null){return;}try{// 刷新配置if(!this.isRefreshed()){this.refresh();}//auto detect proxy typeStringproxyType=getProxy();if(StringUtils.isBlank(proxyType)&&DubboStub.class.isAssignableFrom(interfaceClass)){setProxy(CommonConstants.NATIVE_STUB);}// init serviceMetadatainitServiceMetadata(consumer);serviceMetadata.setServiceType(getServiceInterfaceClass());// TODO, uncomment this line once service key is unifiedserviceMetadata.generateServiceKey();// 添加配置,如 application, consumer, interface Map<String,String>referenceParameters=appendConfig();ModuleServiceRepositoryrepository=getScopeModel().getServiceRepository();ServiceDescriptorserviceDescriptor;if(CommonConstants.NATIVE_STUB.equals(getProxy())){serviceDescriptor=StubSuppliers.getServiceDescriptor(interfaceName);repository.registerService(serviceDescriptor);}else{serviceDescriptor=repository.registerService(interfaceClass);}// 创建 consumerModelconsumerModel=newConsumerModel(serviceMetadata.getServiceKey(),proxy,serviceDescriptor,getScopeModel(),serviceMetadata,createAsyncMethodInfo(),interfaceClassLoader);// Compatible with dependencies on ServiceModel#getReferenceConfig() , and will be removed in a future version.consumerModel.setConfig(this);// 注册 consumerModelrepository.registerConsumer(consumerModel);serviceMetadata.getAttachments().putAll(referenceParameters);// 创建代理对象,这个最重要ref=createProxy(referenceParameters);serviceMetadata.setTarget(ref);serviceMetadata.addAttribute(PROXY_CLASS_REF,ref);// 设置销毁回调函数consumerModel.setDestroyRunner(getDestroyRunner());consumerModel.setProxyObject(ref);consumerModel.initMethodModels();// 检查可用性,dubbo3 默认为 falseif(check){checkInvokerAvailable(0);}}catch(Throwablet){logAndCleanup(t);throwt;}// 标记已初始化initialized=true;}
// 创建代理对象privateTcreateProxy(Map<String,String>referenceParameters){urls.clear();// mesh mode 这一节不解析,以后会继续解析meshModeHandleUrl(referenceParameters);if(StringUtils.isNotEmpty(url)){// user specified URL, could be peer-to-peer address, or register center's address.// url 不为空, 说明是直连parseUrl(referenceParameters);}else{// if protocols not in jvm checkRegistry// 从注册中心来获取 urlsaggregateUrlFromRegistry(referenceParameters);}// 根据 urls 来创建 invokercreateInvoker();...// 发布服务定义MetadataUtils.publishServiceDefinition(consumerUrl,consumerModel.getServiceModel(),getApplicationModel());// create service proxy// 获取代理对象return(T)proxyFactory.getProxy(invoker,ProtocolUtils.isGeneric(generic));}
// 根据 urls 来创建 invoker// 一个 url 表示一种注册中心privatevoidcreateInvoker(){// 单注册中心if(urls.size()==1){URLcurUrl=urls.get(0);// 利用 SPI 机制生成对应的 invoker, 这时 url 是 registryUrl, 所以实现类就是 RegistryProtocolinvoker=protocolSPI.refer(interfaceClass,curUrl);// registry url, mesh-enable and unloadClusterRelated is true, not need Cluster.if(!UrlUtils.isRegistry(curUrl)&&!curUrl.getParameter(UNLOAD_CLUSTER_RELATED,false)){List<Invoker<?>>invokers=newArrayList<>();invokers.add(invoker);invoker=Cluster.getCluster(getScopeModel(),Cluster.DEFAULT).join(newStaticDirectory(curUrl,invokers),true);}}else{// 多注册中心List<Invoker<?>>invokers=newArrayList<>();URLregistryUrl=null;for(URLurl:urls){// For multi-registry scenarios, it is not checked whether each referInvoker is available.// Because this invoker may become available later.// 每个 url 都是创建一个 invokerinvokers.add(protocolSPI.refer(interfaceClass,url));if(UrlUtils.isRegistry(url)){// use last registry urlregistryUrl=url;}}...省略invokers聚合的代码}}
// 刷新 invokerprivatebooleanrefreshInvoker(MigrationStepstep,Floatthreshold,MigrationRulenewRule){if(step==null||threshold==null){thrownewIllegalStateException("Step or threshold of migration rule cannot be null");}MigrationSteporiginStep=currentStep;// 这里的 step 为 FORCE_APPLICATIONif((currentStep==null||currentStep!=step)||!currentThreshold.equals(threshold)){booleansuccess=true;switch(step){caseAPPLICATION_FIRST:migrationInvoker.migrateToApplicationFirstInvoker(newRule);break;caseFORCE_APPLICATION:// 强制使用服务级别引用success=migrationInvoker.migrateToForceApplicationInvoker(newRule);break;caseFORCE_INTERFACE:default:success=migrationInvoker.migrateToForceInterfaceInvoker(newRule);}...returnsuccess;}// ignore if step is same with previous, will continue override rule for MigrationInvokerreturntrue;}
// 回调 urls@Overridepublicsynchronizedvoidnotify(List<URL>instanceUrls){if(isDestroyed()){return;}// Set the context of the address notification thread.RpcServiceContext.getServiceContext().setConsumerUrl(getConsumerUrl());// 3.x added for extend URL address// 执行 AddressListener 回调ExtensionLoader<AddressListener>addressListenerExtensionLoader=getUrl().getOrDefaultModuleModel().getExtensionLoader(AddressListener.class);List<AddressListener>supportedListeners=addressListenerExtensionLoader.getActivateExtension(getUrl(),(String[])null);if(supportedListeners!=null&&!supportedListeners.isEmpty()){for(AddressListeneraddressListener:supportedListeners){instanceUrls=addressListener.notify(instanceUrls,getConsumerUrl(),this);}}// 刷新 invokerrefreshOverrideAndInvoker(instanceUrls);}