// 处理请求publicResult<ServiceInfo>list(@RequestParam(value="namespaceId",defaultValue=Constants.DEFAULT_NAMESPACE_ID)StringnamespaceId,@RequestParam(value="groupName",defaultValue=Constants.DEFAULT_GROUP)StringgroupName,@RequestParam("serviceName")StringserviceName,@RequestParam(value="clusterName",defaultValue=StringUtils.EMPTY)StringclusterName,@RequestParam(value="ip",defaultValue=StringUtils.EMPTY)Stringip,@RequestParam(value="port",defaultValue="0")Integerport,@RequestParam(value="healthyOnly",defaultValue="false")BooleanhealthyOnly,@RequestParam(value="app",defaultValue=StringUtils.EMPTY)Stringapp,@RequestHeader(value=HttpHeaderConsts.USER_AGENT_HEADER,required=false)StringuserAgent,@RequestHeader(value=HttpHeaderConsts.CLIENT_VERSION_HEADER,required=false)StringclientVersion){if(StringUtils.isEmpty(userAgent)){userAgent=StringUtils.defaultIfEmpty(clientVersion,StringUtils.EMPTY);}StringcompositeServiceName=NamingUtils.getGroupedName(serviceName,groupName);// 根据 ip 和 port 来进行 udp 推送Subscribersubscriber=newSubscriber(ip+":"+port,userAgent,app,ip,namespaceId,compositeServiceName,port,clusterName);// 获取所有的实例returnResult.success(instanceServiceV2.listInstance(namespaceId,compositeServiceName,subscriber,clusterName,healthyOnly));}
// ClientServiceIndexesManager 监听 ClientSubscribeServiceEvent 事件privatevoidhandleClientOperation(ClientOperationEventevent){Serviceservice=event.getService();StringclientId=event.getClientId();if(eventinstanceofClientOperationEvent.ClientRegisterServiceEvent){addPublisherIndexes(service,clientId);}elseif(eventinstanceofClientOperationEvent.ClientDeregisterServiceEvent){removePublisherIndexes(service,clientId);}elseif(eventinstanceofClientOperationEvent.ClientSubscribeServiceEvent){// 添加 service 对应的 clientId, 表示这个 service 变动之后,需要推送给这个 clientIdaddSubscriberIndexes(service,clientId);}elseif(eventinstanceofClientOperationEvent.ClientUnsubscribeServiceEvent){removeSubscriberIndexes(service,clientId);}}// 添加 service 对应的 clientId, 然后发布 ServiceSubscribedEvent 事件,这里传入了 clientIdprivatevoidaddSubscriberIndexes(Serviceservice,StringclientId){subscriberIndexes.computeIfAbsent(service,key->newConcurrentHashSet<>());// Fix #5404, Only first time add need notify event.if(subscriberIndexes.get(service).add(clientId)){NotifyCenter.publishEvent(newServiceEvent.ServiceSubscribedEvent(service,clientId));}}
// NamingSubscriberServiceV2Impl 监听 ServiceSubscribedEvent 事件@OverridepublicvoidonEvent(Eventevent){...}elseif(eventinstanceofServiceEvent.ServiceSubscribedEvent){// If service is subscribed by one client, only push this client.ServiceEvent.ServiceSubscribedEventsubscribedEvent=(ServiceEvent.ServiceSubscribedEvent)event;Serviceservice=subscribedEvent.getService();// 添加延时任务来推送实例数据, 接下来看看 delayTaskEngine 是如何处理 task delayTaskEngine.addTask(service,newPushDelayTask(service,PushConfig.getInstance().getPushTaskDelay(),subscribedEvent.getClientId()));}}
// 执行 push 任务publicvoidrun(){try{PushDataWrapperwrapper=generatePushData();ClientManagerclientManager=delayTaskEngine.getClientManager();// 获取所有推送的 clientIdfor(Stringeach:getTargetClientIds()){Clientclient=clientManager.getClient(each);if(null==client){// means this client has disconnectcontinue;}// 获取 service 对应的 subscriberSubscribersubscriber=client.getSubscriber(service);// skip if nullif(subscriber==null){continue;}// 执行具体的 push, 接下来看看是如何获取对应的 pushExecutordelayTaskEngine.getPushExecutor().doPushWithCallback(each,subscriber,wrapper,newServicePushCallback(each,subscriber,wrapper.getOriginalData(),delayTask.isPushToAll()));}}catch(Exceptione){Loggers.PUSH.error("Push task for service"+service.getGroupedServiceName()+" execute failed ",e);delayTaskEngine.addTask(service,newPushDelayTask(service,1000L));}}