privatevoidopenServer(URLurl){checkDestroyed();Stringkey=url.getAddress();// 判断是否为 server 端booleanisServer=url.getParameter(IS_SERVER_KEY,true);if(isServer){// 延迟初始化ProtocolServerserver=serverMap.get(key);if(server==null){synchronized(this){server=serverMap.get(key);if(server==null){// 创建服务serverMap.put(key,createServer(url));return;}}}// server supports reset, use together with overrideserver.reset(url);}}
privateClientsProvidergetClients(URLurl){// 获取连接数,0表示共享一个连接intconnections=url.getParameter(CONNECTIONS_KEY,0);// whether to share connection// if not configured, connection is shared, otherwise, one connection for one serviceif(connections==0){...// 获取共享client,最终调用 initClient 方法returngetSharedClient(url,connections);}// 获取多个clientList<ExchangeClient>clients=IntStream.range(0,connections).mapToObj((i)->initClient(url)).collect(Collectors.toList());returnnewExclusiveClientsProvider(clients);}
privateExchangeClientinitClient(URLurl){// 使用 nettyStringstr=url.getParameter(CLIENT_KEY,url.getParameter(SERVER_KEY,DEFAULT_REMOTING_CLIENT));...try{ScopeModelscopeModel=url.getScopeModel();intheartbeat=UrlUtils.getHeartbeat(url);// Replace InstanceAddressURL with ServiceConfigURL.url=newServiceConfigURL(DubboCodec.NAME,url.getUsername(),url.getPassword(),url.getHost(),url.getPort(),url.getPath(),url.getAllParameters());// 编解码url=url.addParameter(CODEC_KEY,DubboCodec.NAME);// 心跳url=url.addParameterIfAbsent(HEARTBEAT_KEY,Integer.toString(heartbeat));url=url.setScopeModel(scopeModel);// connection should be lazyreturnurl.getParameter(LAZY_CONNECT_KEY,false)?newLazyConnectExchangeClient(url,requestHandler)// 连接端口,设置 requestHandler:Exchangers.connect(url,requestHandler);}catch(RemotingExceptione){thrownewRpcException("Fail to create remoting client for service("+url+"): "+e.getMessage(),e);}}
@OverridepublicCompletableFuture<Object>reply(ExchangeChannelchannel,Objectmessage)throwsRemotingException{...Invocationinv=(Invocation)message;// 获取 invokerInvoker<?>invoker=inv.getInvoker()==null?getInvoker(channel,inv):inv.getInvoker();// switch TCCLif(invoker.getUrl().getServiceModel()!=null){Thread.currentThread().setContextClassLoader(invoker.getUrl().getServiceModel().getClassLoader());}// 判断回调方法是否存在if(Boolean.TRUE.toString().equals(inv.getObjectAttachmentWithoutConvert(IS_CALLBACK_SERVICE_INVOKE))){StringmethodsStr=invoker.getUrl().getParameters().get("methods");booleanhasMethod=false;if(methodsStr==null||!methodsStr.contains(",")){hasMethod=inv.getMethodName().equals(methodsStr);}else{String[]methods=methodsStr.split(",");for(Stringmethod:methods){if(inv.getMethodName().equals(method)){hasMethod=true;break;}}}if(!hasMethod){logger.warn(PROTOCOL_FAILED_REFER_INVOKER,"","",newIllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored."+" please update the api interface. url is:"+invoker.getUrl())+" ,invocation is :"+inv);returnnull;}}RpcContext.getServiceContext().setRemoteAddress(channel.getRemoteAddress());// 调用业务接口,返回 AsyncRpcResultResultresult=invoker.invoke(inv);returnresult.thenApply(Function.identity());}