nacos 基于 2.2.4 版本
这里的 client
是指 nacos SDK
,也就是模块 nacos-client
.
订阅服务的主流程
源码位置: com.alibaba.nacos.client.naming.NacosNamingService#subscribe
1
2
3
4
5
6
7
8
9
10
11
12
13
// NacosNamingService 订阅服务
@Override
public void subscribe ( String serviceName , String groupName , List < String > clusters , EventListener listener )
throws NacosException {
if ( null == listener ) {
return ;
}
String clusterString = StringUtils . join ( clusters , "," );
// 监听服务改变的回调函数,changeNotifier 订阅了 InstancesChangeEvent 事件
changeNotifier . registerListener ( groupName , serviceName , clusterString , listener );
// clientProxy 的实现类为 NamingClientProxyDelegate
clientProxy . subscribe ( serviceName , groupName , clusterString );
}
源码位置: com.alibaba.nacos.client.naming.remote.NamingClientProxyDelegate#subscribe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// NamingClientProxyDelegate 订阅服务
@Override
public ServiceInfo subscribe ( String serviceName , String groupName , String clusters ) throws NacosException {
NAMING_LOGGER . info ( "[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} " , serviceName , groupName , clusters );
String serviceNameWithGroup = NamingUtils . getGroupedName ( serviceName , groupName );
String serviceKey = ServiceInfo . getKey ( serviceNameWithGroup , clusters );
// 注册 UpdateTask, 发送 http 请求来全量更新, 这个后面说
serviceInfoUpdateService . scheduleUpdateIfAbsent ( serviceName , groupName , clusters );
ServiceInfo result = serviceInfoHolder . getServiceInfoMap (). get ( serviceKey );
if ( null == result || ! isSubscribed ( serviceName , groupName , clusters )) {
// grpc 订阅服务, 返回 serviceInfo
result = grpcClientProxy . subscribe ( serviceName , groupName , clusters );
}
// 处理 serviceInfo, 发布 InstancesChangeEvent 事件
serviceInfoHolder . processServiceInfo ( result );
return result ;
}
源码位置: com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy#subscribe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// grpc 订阅服务
@Override
public ServiceInfo subscribe ( String serviceName , String groupName , String clusters ) throws NacosException {
if ( NAMING_LOGGER . isDebugEnabled ()) {
NAMING_LOGGER . debug ( "[GRPC-SUBSCRIBE] service:{}, group:{}, cluster:{} " , serviceName , groupName , clusters );
}
// 标记服务要订阅,在 redoService 的定时任务中重新订阅
redoService . cacheSubscriberForRedo ( serviceName , groupName , clusters );
// 订阅服务
return doSubscribe ( serviceName , groupName , clusters );
}
// redoService.cacheSubscriberForRedo
public void cacheSubscriberForRedo ( String serviceName , String groupName , String cluster ) {
String key = ServiceInfo . getKey ( NamingUtils . getGroupedName ( serviceName , groupName ), cluster );
SubscriberRedoData redoData = SubscriberRedoData . build ( serviceName , groupName , cluster );
synchronized ( subscribes ) {
// 标记订阅
subscribes . put ( key , redoData );
}
}
// 订阅服务
public ServiceInfo doSubscribe ( String serviceName , String groupName , String clusters ) throws NacosException {
SubscribeServiceRequest request = new SubscribeServiceRequest ( namespaceId , groupName , serviceName , clusters ,
true );
// 发送 SubscribeServiceRequest 请求,会被 SubscribeServiceRequestHandler 处理
SubscribeServiceResponse response = requestToServer ( request , SubscribeServiceResponse . class );
// 标记服务已订阅
redoService . subscriberRegistered ( serviceName , groupName , clusters );
return response . getServiceInfo ();
}
源码位置: com.alibaba.nacos.naming.remote.rpc.handler.SubscribeServiceRequestHandler#handle
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// SubscribeServiceRequestHandler 处理请求
@Override
@Secured ( action = ActionTypes . READ )
public SubscribeServiceResponse handle ( SubscribeServiceRequest request , RequestMeta meta ) throws NacosException {
String namespaceId = request . getNamespace ();
String serviceName = request . getServiceName ();
String groupName = request . getGroupName ();
String app = request . getHeader ( "app" , "unknown" );
String groupedServiceName = NamingUtils . getGroupedName ( serviceName , groupName );
Service service = Service . newService ( namespaceId , groupName , serviceName , true );
// 把订阅的信息包装为 Subscriber 对象
Subscriber subscriber = new Subscriber ( meta . getClientIp (), meta . getClientVersion (), app , meta . getClientIp (),
namespaceId , groupedServiceName , 0 , request . getClusters ());
// 第一次订阅要返回对应的 serviceInfo
ServiceInfo serviceInfo = ServiceUtil . selectInstancesWithHealthyProtection ( serviceStorage . getData ( service ),
metadataManager . getServiceMetadata ( service ). orElse ( null ), subscriber . getCluster (), false ,
true , subscriber . getIp ());
if ( request . isSubscribe ()) {
// 订阅服务, 这个逻辑在【订阅服务】中分析过了
clientOperationService . subscribeService ( service , subscriber , meta . getConnectionId ());
NotifyCenter . publishEvent ( new SubscribeServiceTraceEvent ( System . currentTimeMillis (),
meta . getClientIp (), service . getNamespace (), service . getGroup (), service . getName ()));
} else {
// 取消订阅服务
clientOperationService . unsubscribeService ( service , subscriber , meta . getConnectionId ());
NotifyCenter . publishEvent ( new UnsubscribeServiceTraceEvent ( System . currentTimeMillis (),
meta . getClientIp (), service . getNamespace (), service . getGroup (), service . getName ()));
}
return new SubscribeServiceResponse ( ResponseCode . SUCCESS . getCode (), "success" , serviceInfo );
}
grpc 订阅处理
源码位置: com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy#start
1
2
3
4
5
6
7
8
9
10
11
12
// start 方法在 NamingGrpcClientProxy 构造函数中调用
private void start ( ServerListFactory serverListFactory , ServiceInfoHolder serviceInfoHolder ) throws NacosException {
// serverListFactory 来选择服务
rpcClient . serverListFactory ( serverListFactory );
// 监听 connectionEvent 事件
rpcClient . registerConnectionListener ( redoService );
// client 处理 server 请求, 重点看 NamingPushRequestHandler
rpcClient . registerServerRequestHandler ( new NamingPushRequestHandler ( serviceInfoHolder ));
rpcClient . start ();
// 注册事件订阅
NotifyCenter . registerSubscriber ( this );
}
源码位置: com.alibaba.nacos.client.naming.remote.gprc.NamingPushRequestHandler#requestReply
1
2
3
4
5
6
7
8
9
10
11
12
// client 处理 server 请求
@Override
public Response requestReply ( Request request ) {
if ( request instanceof NotifySubscriberRequest ) {
// 服务实例变动了,服务端推送 serviceInfo
NotifySubscriberRequest notifyRequest = ( NotifySubscriberRequest ) request ;
// 处理 serviceInfo, 发布 InstancesChangeEvent 事件
serviceInfoHolder . processServiceInfo ( notifyRequest . getServiceInfo ());
return new NotifySubscriberResponse ();
}
return null ;
}
源码位置: com.alibaba.nacos.client.naming.event.InstancesChangeNotifier#onEvent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// InstancesChangeNotifier 监听 InstancesChangeEvent 事件
@Override
public void onEvent ( InstancesChangeEvent event ) {
String key = ServiceInfo
. getKey ( NamingUtils . getGroupedName ( event . getServiceName (), event . getGroupName ()), event . getClusters ());
ConcurrentHashSet < EventListener > eventListeners = listenerMap . get ( key );
if ( CollectionUtils . isEmpty ( eventListeners )) {
return ;
}
for ( final EventListener listener : eventListeners ) {
// 遍历回调函数
final com . alibaba . nacos . api . naming . listener . Event namingEvent = transferToNamingEvent ( event );
if ( listener instanceof AbstractEventListener && (( AbstractEventListener ) listener ). getExecutor () != null ) {
(( AbstractEventListener ) listener ). getExecutor (). execute (() -> listener . onEvent ( namingEvent ));
} else {
listener . onEvent ( namingEvent );
}
}
}
UpdateTask 全量更新
源码位置: com.alibaba.nacos.client.naming.core.ServiceInfoUpdateService.UpdateTask#run
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// UpdateTask 定时拉取全量的 instances
@Override
public void run () {
long delayTime = DEFAULT_DELAY ;
try {
// 判断是否订阅服务
if ( ! changeNotifier . isSubscribed ( groupName , serviceName , clusters ) && ! futureMap . containsKey (
serviceKey )) {
NAMING_LOGGER . info ( "update task is stopped, service:{}, clusters:{}" , groupedServiceName , clusters );
isCancel = true ;
return ;
}
ServiceInfo serviceObj = serviceInfoHolder . getServiceInfoMap (). get ( serviceKey );
// 第一次拉取
if ( serviceObj == null ) {
serviceObj = namingClientProxy . queryInstancesOfService ( serviceName , groupName , clusters , 0 , false );
serviceInfoHolder . processServiceInfo ( serviceObj );
lastRefTime = serviceObj . getLastRefTime ();
return ;
}
// 判断过期时间,然后再拉取
if ( serviceObj . getLastRefTime () <= lastRefTime ) {
serviceObj = namingClientProxy . queryInstancesOfService ( serviceName , groupName , clusters , 0 , false );
serviceInfoHolder . processServiceInfo ( serviceObj );
}
lastRefTime = serviceObj . getLastRefTime ();
if ( CollectionUtils . isEmpty ( serviceObj . getHosts ())) {
incFailCount ();
return ;
}
// TODO multiple time can be configured.
// 更新延时时间
delayTime = serviceObj . getCacheMillis () * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE ;
resetFailCount ();
} catch ( NacosException e ) {
handleNacosException ( e );
} catch ( Throwable e ) {
handleUnknownException ( e );
} finally {
if ( ! isCancel ) {
// 下一次拉取任务
executor . schedule ( this , Math . min ( delayTime << failCount , DEFAULT_DELAY * 60 ),
TimeUnit . MILLISECONDS );
}
}
}
测试类
com.alibaba.nacos.test.naming.SubscribeCluster_ITCase#subscribeAdd