nacos 基于 grpc 的长连接来实现 client 和 server 的通信。
在有多个 server 端时,最初开始 client 的连接会均匀分布在 server 端,当重新上线 server 时,这时候 client 的连接会偏移到其他 server 端,这样会造成 server 端请求负载不均匀。
// GrpcClient 发送 ConnectionSetupRequest 请求,建立连接@OverridepublicConnectionconnectToServer(ServerInfoserverInfo){try{...intport=serverInfo.getServerPort()+rpcPortOffset();ManagedChannelmanagedChannel=createNewManagedChannel(serverInfo.getServerIp(),port);RequestGrpc.RequestFutureStubnewChannelStubTemp=createNewChannelStub(managedChannel);if(newChannelStubTemp!=null){// 检查连接Responseresponse=serverCheck(serverInfo.getServerIp(),port,newChannelStubTemp);if(response==null||!(responseinstanceofServerCheckResponse)){shuntDownChannel(managedChannel);returnnull;}BiRequestStreamGrpc.BiRequestStreamStubbiRequestStreamStub=BiRequestStreamGrpc.newStub(newChannelStubTemp.getChannel());GrpcConnectiongrpcConn=newGrpcConnection(serverInfo,grpcExecutor);grpcConn.setConnectionId(((ServerCheckResponse)response).getConnectionId());//create stream request and bind connection event to this connection.StreamObserver<Payload>payloadStreamObserver=bindRequestStream(biRequestStreamStub,grpcConn);// stream observer to send response to servergrpcConn.setPayloadStreamObserver(payloadStreamObserver);grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);grpcConn.setChannel(managedChannel);//send a setup request.// 发送 ConnectionSetupRequest 请求,建立连接ConnectionSetupRequestconSetupRequest=newConnectionSetupRequest();...grpcConn.sendRequest(conSetupRequest);//wait to register connection setupThread.sleep(100L);returngrpcConn;}returnnull;}catch(Exceptione){LOGGER.error("[{}]Fail to connect to server!,error={}",GrpcClient.this.getName(),e);}returnnull;}
// GrpcBiStreamRequestAcceptor 处理 client 请求@OverridepublicvoidonNext(Payloadpayload){...// 处理 ConnectionSetupRequest 请求if(parseObjinstanceofConnectionSetupRequest){ConnectionSetupRequestsetUpRequest=(ConnectionSetupRequest)parseObj;Map<String,String>labels=setUpRequest.getLabels();StringappName="-";if(labels!=null&&labels.containsKey(Constants.APPNAME)){appName=labels.get(Constants.APPNAME);}ConnectionMetametaInfo=newConnectionMeta(connectionId,payload.getMetadata().getClientIp(),remoteIp,remotePort,localPort,ConnectionType.GRPC.getType(),setUpRequest.getClientVersion(),appName,setUpRequest.getLabels());metaInfo.setTenant(setUpRequest.getTenant());Connectionconnection=newGrpcConnection(metaInfo,responseObserver,GrpcServerConstants.CONTEXT_KEY_CHANNEL.get());connection.setAbilities(setUpRequest.getAbilities());booleanrejectSdkOnStarting=metaInfo.isSdkSource()&&!ApplicationUtils.isStarted();// 注册 connectionId 和 connectionif(rejectSdkOnStarting||!connectionManager.register(connectionId,connection)){//Not register to the connection manager if current server is over limit or server is starting.try{Loggers.REMOTE_DIGEST.warn("[{}]Connection register fail,reason:{}",connectionId,rejectSdkOnStarting?" server is not started":" server is over limited.");connection.request(newConnectResetRequest(),3000L);connection.close();}catch(Exceptione){//Do nothing.if(connectionManager.traced(clientIp)){Loggers.REMOTE_DIGEST.warn("[{}]Send connect reset request error,error={}",connectionId,e);}}}...}}