21 Client 订阅配置

nacos 基于 2.2.4 版本

这里的 client 是指 nacos SDK,也就是模块 nacos-client.

添加订阅者

源码位置: com.alibaba.nacos.client.config.NacosConfigService#addListener

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 添加监听器
@Override
public void addListener(String dataId, String group, Listener listener) throws NacosException {
    worker.addTenantListeners(dataId, group, Arrays.asList(listener));
}

// 添加监听器 
public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners)
        throws NacosException {
    group = blank2defaultGroup(group);
    String tenant = agent.getTenant();
    // 添加到 CacheData 里面,对同一个 dataId, group, tenant 可能有多个 listener
    CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
    synchronized (cache) {
        for (Listener listener : listeners) {
            cache.addListener(listener);
        }
        // 不删除
        cache.setDiscard(false);
        // 和服务器不一致
        cache.setConsistentWithServer(false);
        // 通知监听配置
        agent.notifyListenConfig();
    }
    
}

通知监听配置

源码位置: com.alibaba.nacos.client.config.impl.ClientWorker.ConfigRpcTransportClient#notifyListenConfig

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 通知监听配置
@Override
public void notifyListenConfig() {
    // 向队列 listenExecutebell 添加一个通知
    listenExecutebell.offer(bellItem);
}

// 客户端启动会调用这个方法
@Override
public void startInternal() {
    executor.schedule(() -> {
        while (!executor.isShutdown() && !executor.isTerminated()) {
            try {
                // 获取通知, 最大间隔时间为 5 秒
                listenExecutebell.poll(5L, TimeUnit.SECONDS);
                if (executor.isShutdown() || executor.isTerminated()) {
                    continue;
                }
                // 执行配置监听
                executeConfigListen();
            } catch (Throwable e) {
                LOGGER.error("[rpc listen execute] [rpc listen] exception", e);
                try {
                    Thread.sleep(50L);
                } catch (InterruptedException interruptedException) {
                    //ignore
                }
                notifyListenConfig();
            }
        }
    }, 0L, TimeUnit.MILLISECONDS);
    
}

源码位置: ``

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// 执行配置监听
@Override
public void executeConfigListen() {
    Map<String, List<CacheData>> listenCachesMap = new HashMap<>(16);
    Map<String, List<CacheData>> removeListenCachesMap = new HashMap<>(16);
    long now = System.currentTimeMillis();
    // 每隔一段时间都需要全同步配置
    boolean needAllSync = now - lastAllSyncTime >= ALL_SYNC_INTERNAL;
    // 遍历 cacheMap, 这个 map 都是要监听的配置
    for (CacheData cache : cacheMap.get().values()) {
        
        synchronized (cache) {
            
            //check local listeners consistent.
            // 判断是否和服务端一致,不一致,需要刷新配置
            if (cache.isConsistentWithServer()) {
                // 检查配置 md5 值, 不一致就推送给订阅者
                cache.checkListenerMd5();
                if (!needAllSync) {
                    continue;
                }
            }
            
            // 不是删除的配置,
            if (!cache.isDiscard()) {
                //get listen  config
                if (!cache.isUseLocalConfigInfo()) {
                    List<CacheData> cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId()));
                    if (cacheDatas == null) {
                        cacheDatas = new LinkedList<>();
                        listenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);
                    }
                    // 添加要监听的配置
                    cacheDatas.add(cache);
                }
            } else if (cache.isDiscard() && CollectionUtils.isEmpty(cache.getListeners())) {
                // 是删除的配置,并且订阅者是空 
                if (!cache.isUseLocalConfigInfo()) {
                    List<CacheData> cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId()));
                    if (cacheDatas == null) {
                        cacheDatas = new LinkedList<>();
                        removeListenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);
                    }
                    // 添加要删除的订阅者
                    cacheDatas.add(cache);
                }
            }
        }
        
    }
    
    //execute check listen ,return true if has change keys.
    // 拉取配置,检查是否改变
    boolean hasChangedKeys = checkListenCache(listenCachesMap);
    
    //execute check remove listen.
    // 删除监听
    checkRemoveListenCache(removeListenCachesMap);
    
    // 记录全同步的时间
    if (needAllSync) {
        lastAllSyncTime = now;
    }
    //If has changed keys,notify re sync md5.
    // 有配置改变,重新再运行一次, 推送配置给订阅者
    if (hasChangedKeys) {
        notifyListenConfig();
    }
    
}

源码位置: com.alibaba.nacos.client.config.impl.ClientWorker.ConfigRpcTransportClient#checkListenCache

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
// 拉取配置,检查是否改变
private boolean checkListenCache(Map<String, List<CacheData>> listenCachesMap) {
    
    final AtomicBoolean hasChangedKeys = new AtomicBoolean(false);
    if (!listenCachesMap.isEmpty()) {
        List<Future> listenFutures = new ArrayList<>();
        // 遍历 listenCachesMap, 每一个 taskId, 有一个线程负责拉取配置
        for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {
            String taskId = entry.getKey();
            ExecutorService executorService = ensureSyncExecutor(taskId);
            Future future = executorService.submit(() -> {
                List<CacheData> listenCaches = entry.getValue();
                //reset notify change flag.
                // 重置
                for (CacheData cacheData : listenCaches) {
                    cacheData.getReceiveNotifyChanged().set(false);
                }
                // 构建 ConfigBatchListenRequest 请求,里面有 md5 值
                ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches);
                configChangeListenRequest.setListen(true);
                try {
                    RpcClient rpcClient = ensureRpcClient(taskId);
                    // 请求服务端,如果 md5 值不一样,就会返回
                    ConfigChangeBatchListenResponse listenResponse = (ConfigChangeBatchListenResponse) requestProxy(
                            rpcClient, configChangeListenRequest);
                    if (listenResponse != null && listenResponse.isSuccess()) {
                        
                        // 表示是否拉取过配置
                        Set<String> changeKeys = new HashSet<String>();
                        
                        List<ConfigChangeBatchListenResponse.ConfigContext> changedConfigs = listenResponse.getChangedConfigs();
                        //handle changed keys,notify listener
                        if (!CollectionUtils.isEmpty(changedConfigs)) {
                            hasChangedKeys.set(true);
                            // 遍历改变的配置
                            for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : changedConfigs) {
                                String changeKey = GroupKey.getKeyTenant(changeConfig.getDataId(),
                                        changeConfig.getGroup(), changeConfig.getTenant());
                                changeKeys.add(changeKey);
                                boolean isInitializing = cacheMap.get().get(changeKey).isInitializing();
                                // 刷新配置,通知订阅者
                                refreshContentAndCheck(changeKey, !isInitializing);
                            }
                            
                        }
                        
                        // 在刷新配置时,如果配置有变动,就会执行这个逻辑
                        for (CacheData cacheData : listenCaches) {
                            if (cacheData.getReceiveNotifyChanged().get()) {
                                String changeKey = GroupKey.getKeyTenant(cacheData.dataId, cacheData.group,
                                        cacheData.getTenant());
                                // 判断配置是否已经拉取了
                                if (!changeKeys.contains(changeKey)) {
                                    boolean isInitializing = cacheMap.get().get(changeKey).isInitializing();
                                    // 刷新配置,通知订阅者
                                    refreshContentAndCheck(changeKey, !isInitializing);
                                }
                            }
                        }
                        
                        //handler content configs
                        for (CacheData cacheData : listenCaches) {
                            cacheData.setInitializing(false);
                            String groupKey = GroupKey.getKeyTenant(cacheData.dataId, cacheData.group,
                                    cacheData.getTenant());
                            if (!changeKeys.contains(groupKey)) {
                                synchronized (cacheData) {
                                    // 设置和服务端一致
                                    if (!cacheData.getReceiveNotifyChanged().get()) {
                                        cacheData.setConsistentWithServer(true);
                                    }
                                }
                            }
                        }
                        
                    }
                } catch (Throwable e) {
                    LOGGER.error("Execute listen config change error ", e);
                    try {
                        Thread.sleep(50L);
                    } catch (InterruptedException interruptedException) {
                        //ignore
                    }
                    notifyListenConfig();
                }
            });
            listenFutures.add(future);
            
        }
        // 等待所有线程执行完毕
        for (Future future : listenFutures) {
            try {
                future.get();
            } catch (Throwable throwable) {
                LOGGER.error("Async listen config change error ", throwable);
            }
        }
        
    }
    return hasChangedKeys.get();
}

刷新配置,通知订阅者

源码位置: com.alibaba.nacos.client.config.impl.ClientWorker#refreshContentAndCheck

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 刷新配置,通知订阅者
private void refreshContentAndCheck(CacheData cacheData, boolean notify) {
    try {
        // 获取配置
        ConfigResponse response = getServerConfig(cacheData.dataId, cacheData.group, cacheData.tenant, 3000L,
                notify);
        cacheData.setEncryptedDataKey(response.getEncryptedDataKey());
        cacheData.setContent(response.getContent());
        if (null != response.getConfigType()) {
            cacheData.setType(response.getConfigType());
        }
        if (notify) {
            LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
                    agent.getName(), cacheData.dataId, cacheData.group, cacheData.tenant, cacheData.getMd5(),
                    ContentUtils.truncateContent(response.getContent()), response.getConfigType());
        }
        // 检查 md5 值
        cacheData.checkListenerMd5();
    } catch (Exception e) {
        LOGGER.error("refresh content and check md5 fail ,dataId={},group={},tenant={} ", cacheData.dataId,
                cacheData.group, cacheData.tenant, e);
    }
}

// 检查 md5 值
void checkListenerMd5() {
    for (ManagerListenerWrap wrap : listeners) {
        // 不一致,通知订阅者
        if (!md5.equals(wrap.lastCallMd5)) {
            safeNotifyListener(dataId, group, content, type, md5, encryptedDataKey, wrap);
        }
    }
}

rpcClient 初始化

源码位置: com.alibaba.nacos.client.config.impl.ClientWorker.ConfigRpcTransportClient#ensureRpcClient

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
private RpcClient ensureRpcClient(String taskId) throws NacosException {
    synchronized (ClientWorker.this) {
        Map<String, String> labels = getLabels();
        Map<String, String> newLabels = new HashMap<>(labels);
        newLabels.put("taskId", taskId);
        RpcClient rpcClient = RpcClientFactory.createClient(uuid + "_config-" + taskId, getConnectionType(),
                newLabels, RpcClientTlsConfig.properties(this.properties));
        if (rpcClient.isWaitInitiated()) {
            // 初始化 rpcClient
            initRpcClientHandler(rpcClient);
            rpcClient.setTenant(getTenant());
            rpcClient.clientAbilities(initAbilities());
            // 启动
            rpcClient.start();
        }
        
        return rpcClient;
    }
    
}

源码位置: com.alibaba.nacos.client.config.impl.ClientWorker.ConfigRpcTransportClient#initRpcClientHandler

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
private void initRpcClientHandler(final RpcClient rpcClientInner) {
    /*
     * Register Config Change /Config ReSync Handler
     */
    // 注册配置通知的 requestHandler
    rpcClientInner.registerServerRequestHandler((request) -> {
        if (request instanceof ConfigChangeNotifyRequest) {
            ConfigChangeNotifyRequest configChangeNotifyRequest = (ConfigChangeNotifyRequest) request;
            LOGGER.info("[{}] [server-push] config changed. dataId={}, group={},tenant={}",
                    rpcClientInner.getName(), configChangeNotifyRequest.getDataId(),
                    configChangeNotifyRequest.getGroup(), configChangeNotifyRequest.getTenant());
            String groupKey = GroupKey.getKeyTenant(configChangeNotifyRequest.getDataId(),
                    configChangeNotifyRequest.getGroup(), configChangeNotifyRequest.getTenant());
            
            CacheData cacheData = cacheMap.get().get(groupKey);
            if (cacheData != null) {
                synchronized (cacheData) {
                    cacheData.getReceiveNotifyChanged().set(true);
                    cacheData.setConsistentWithServer(false);
                    notifyListenConfig();
                }
                
            }
            return new ConfigChangeNotifyResponse();
        }
        return null;
    });
    
    // ClientConfigMetricRequest
    rpcClientInner.registerServerRequestHandler((request) -> {
        if (request instanceof ClientConfigMetricRequest) {
            ClientConfigMetricResponse response = new ClientConfigMetricResponse();
            response.setMetrics(getMetrics(((ClientConfigMetricRequest) request).getMetricsKeys()));
            return response;
        }
        return null;
    });
    
    // 连接事件
    rpcClientInner.registerConnectionListener(new ConnectionEventListener() {
        
        @Override
        public void onConnected() {
            LOGGER.info("[{}] Connected,notify listen context...", rpcClientInner.getName());
            notifyListenConfig();
        }
        
        @Override
        public void onDisConnect() {
            String taskId = rpcClientInner.getLabels().get("taskId");
            LOGGER.info("[{}] DisConnected,clear listen context...", rpcClientInner.getName());
            Collection<CacheData> values = cacheMap.get().values();
            
            for (CacheData cacheData : values) {
                if (StringUtils.isNotBlank(taskId)) {
                    if (Integer.valueOf(taskId).equals(cacheData.getTaskId())) {
                        cacheData.setConsistentWithServer(false);
                    }
                } else {
                    cacheData.setConsistentWithServer(false);
                }
            }
        }
        
    });
    
    // 挑选下一个地址
    rpcClientInner.serverListFactory(new ServerListFactory() {
        @Override
        public String genNextServer() {
            return ConfigRpcTransportClient.super.serverListManager.getNextServerAddr();
            
        }
        
        @Override
        public String getCurrentServer() {
            return ConfigRpcTransportClient.super.serverListManager.getCurrentServerAddr();
            
        }
        
        @Override
        public List<String> getServerList() {
            return ConfigRpcTransportClient.super.serverListManager.getServerUrls();
            
        }
    });
    
    // 地址变动的监听器
    subscriber = new Subscriber() {
        @Override
        public void onEvent(Event event) {
            rpcClientInner.onServerListChange();
        }
        
        @Override
        public Class<? extends Event> subscribeType() {
            return ServerlistChangeEvent.class;
        }
    };
    NotifyCenter.registerSubscriber(subscriber);
}
0%