02 注册实例

nacos 基于 2.2.4 版本

注册实例的 curl

1
2
3
4
5
curl --location 'http://localhost:8848/nacos/v2/ns/instance' \
--header 'Content-Type: application/x-www-form-urlencoded' \
--data-urlencode 'serviceName=test' \
--data-urlencode 'ip=1.2.3.4' \
--data-urlencode 'port=80'

注册实例的主流程

源码位置: com.alibaba.nacos.naming.controllers.v2.InstanceControllerV2#register

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
public Result<String> register(InstanceForm instanceForm) throws NacosException {
    // check param
    instanceForm.validate();
    checkWeight(instanceForm.getWeight());
    // build instance
    Instance instance = buildInstance(instanceForm);
    // 注册实例
    instanceServiceV2.registerInstance(instanceForm.getNamespaceId(), buildCompositeServiceName(instanceForm), instance);
    // 发布 traceEvent
    NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(), "",
            false, instanceForm.getNamespaceId(), instanceForm.getGroupName(), instanceForm.getServiceName(),
            instance.getIp(), instance.getPort()));
    return Result.success("ok");
}

源码位置: com.alibaba.nacos.naming.core.InstanceOperatorClientImpl#registerInstance

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
    NamingUtils.checkInstanceIsLegal(instance);
    
    boolean ephemeral = instance.isEphemeral();
    String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);
    // 创建 client
    createIpPortClientIfAbsent(clientId);
    // 构建 service 对象,在 nacos2.0 中,临时属性在 service 上, instance 的临时属性已经没有了
    Service service = getService(namespaceId, serviceName, ephemeral);
    // 具体实现类负责注册,如果是临时实例,EphemeralClientOperationServiceImpl,如果是持久化实例,PersistentClientOperationServiceImpl
    clientOperationService.registerInstance(service, instance, clientId);
}

临时实例注册

源码位置: com.alibaba.nacos.naming.core.v2.service.impl.EphemeralClientOperationServiceImpl#registerInstance

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
public void registerInstance(Service service, Instance instance, String clientId) throws NacosException {
    NamingUtils.checkInstanceIsLegal(instance);

    // 获得单例的 service,如果没有就会注册
    Service singleton = ServiceManager.getInstance().getSingleton(service);
    if (!singleton.isEphemeral()) {
        throw new NacosRuntimeException(NacosException.INVALID_PARAM,
                String.format("Current service %s is persistent service, can't register ephemeral instance.",
                        singleton.getGroupedServiceName()));
    }
    // 获取 client,并检查 client
    Client client = clientManager.getClient(clientId);
    if (!clientIsLegal(client, clientId)) {
        return;
    }
    // InstancePublishInfo 就是 nacos 内部实例
    InstancePublishInfo instanceInfo = getPublishInfo(instance);
    // 添加 service 和 instance,这里会发布 ClientChangedEvent 事件,非常重要
    client.addServiceInstance(singleton, instanceInfo);
    client.setLastUpdatedTime();
    client.recalculateRevision();
    // 发布 ClientRegisterServiceEvent 事件
    NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
    // 发布 InstanceMetadataEvent 事件
    NotifyCenter
            .publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}

源码位置: com.alibaba.nacos.naming.consistency.ephemeral.distro.v2.DistroClientDataProcessor#syncToAllServer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// DistroClientDataProcessor 会监听 ClientChangedEvent 事件
private void syncToAllServer(ClientEvent event) {
    Client client = event.getClient();
    // Only ephemeral data sync by Distro, persist client should sync by raft.
    if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
        return;
    }
    if (event instanceof ClientEvent.ClientDisconnectEvent) {
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        distroProtocol.sync(distroKey, DataOperation.DELETE);
    } else if (event instanceof ClientEvent.ClientChangedEvent) {
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        // 同步到其他节点
        distroProtocol.sync(distroKey, DataOperation.CHANGE);
    }
}

源码位置: com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager#handleClientOperation

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// ClientServiceIndexesManager 会监听 ClientRegisterServiceEvent 事件
private void handleClientOperation(ClientOperationEvent event) {
    Service service = event.getService();
    String clientId = event.getClientId();
    if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
        // 添加 client 的 publishIndex
        addPublisherIndexes(service, clientId);
    } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
        removePublisherIndexes(service, clientId);
    } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
        addSubscriberIndexes(service, clientId);
    } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
        removeSubscriberIndexes(service, clientId);
    }
}
private void addPublisherIndexes(Service service, String clientId) {
    // service 和 clientId 是一对多的关系
    publisherIndexes.computeIfAbsent(service, key -> new ConcurrentHashSet<>());
    publisherIndexes.get(service).add(clientId);
    // 发布 ServiceChangedEvent 事件
    NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
}

源码位置: com.alibaba.nacos.naming.push.v2.NamingSubscriberServiceV2Impl#onEvent

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// NamingSubscriberServiceV2Impl 监听 ServiceChangedEvent
public void onEvent(Event event) {
    if (event instanceof ServiceEvent.ServiceChangedEvent) {
        // If service changed, push to all subscribers.
        // service 下的 instance 改变之后,要推送给所有的订阅者
        ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
        Service service = serviceChangedEvent.getService();
        delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
        MetricsMonitor.incrementServiceChangeCount(service.getNamespace(), service.getGroup(), service.getName());
    } else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
        // If service is subscribed by one client, only push this client.
        ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
        Service service = subscribedEvent.getService();
        delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(),
                subscribedEvent.getClientId()));
    }
}

持久化实例注册

源码位置: com.alibaba.nacos.naming.core.v2.service.impl.PersistentClientOperationServiceImpl#registerInstance

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@Override
public void registerInstance(Service service, Instance instance, String clientId) {
    // 和临时实例注册一样,获取单例的  service
    Service singleton = ServiceManager.getInstance().getSingleton(service);
    if (singleton.isEphemeral()) {
        throw new NacosRuntimeException(NacosException.INVALID_PARAM,
                String.format("Current service %s is ephemeral service, can't register persistent instance.",
                        singleton.getGroupedServiceName()));
    }
    // 包装为 writeRequest 对象
    final InstanceStoreRequest request = new InstanceStoreRequest();
    request.setService(service);
    request.setInstance(instance);
    request.setClientId(clientId);
    // 这里设置了 group,在构造函数中会初始化 group 的 RequestProcessor 
    final WriteRequest writeRequest = WriteRequest.newBuilder().setGroup(group())
            .setData(ByteString.copyFrom(serializer.serialize(request))).setOperation(DataOperation.ADD.name())
            .build();
    
    try {
        // CPProtocol 负责写请求,同步到其他的节点,然后应用状态机
        protocol.write(writeRequest);
        Loggers.RAFT.info("Client registered. service={}, clientId={}, instance={}", service, instance, clientId);
    } catch (Exception e) {
        throw new NacosRuntimeException(NacosException.SERVER_ERROR, e);
    }
}

// 构造函数中,初始化话了 
public PersistentClientOperationServiceImpl(final PersistentIpPortClientManager clientManager) {
    this.clientManager = clientManager;
    this.protocol = ApplicationUtils.getBean(ProtocolManager.class).getCpProtocol();
    // 自己负责来处理 apply WriteRequest
    this.protocol.addRequestProcessors(Collections.singletonList(this));
}

// 应用 raft 的状态机
// protocol.write(writeRequest) 之后, 就会回调这个方法
@Override
public Response onApply(WriteRequest request) {
    final Lock lock = readLock;
    lock.lock();
    try {
        final InstanceStoreRequest instanceRequest = serializer.deserialize(request.getData().toByteArray());
        final DataOperation operation = DataOperation.valueOf(request.getOperation());
        switch (operation) {
            case ADD:
                // 处理实例注册
                onInstanceRegister(instanceRequest.service, instanceRequest.instance,
                        instanceRequest.getClientId());
                break;
            case DELETE:
                onInstanceDeregister(instanceRequest.service, instanceRequest.getClientId());
                break;
            case CHANGE:
                if (instanceAndServiceExist(instanceRequest)) {
                    onInstanceRegister(instanceRequest.service, instanceRequest.instance,
                            instanceRequest.getClientId());
                }
                break;
            default:
                return Response.newBuilder().setSuccess(false).setErrMsg("unsupport operation : " + operation)
                        .build();
        }
        return Response.newBuilder().setSuccess(true).build();
    } catch (Exception e) {
        Loggers.RAFT.warn("Persistent client operation failed. ", e);
        return Response.newBuilder().setSuccess(false)
                .setErrMsg("Persistent client operation failed. " + e.getMessage()).build();
    } finally {
        lock.unlock();
    }
}

// 处理实例注册, 基本和临时实例注册一样, 后面就不重复分析了
private void onInstanceRegister(Service service, Instance instance, String clientId) {
    // 获取 service 和 client
    Service singleton = ServiceManager.getInstance().getSingleton(service);
    if (!clientManager.contains(clientId)) {
        clientManager.clientConnected(clientId, new ClientAttributes());
    }
    Client client = clientManager.getClient(clientId);
    InstancePublishInfo instancePublishInfo = getPublishInfo(instance);
    // 添加 service 和 instance,发布 ClientChangedEvent 事件
    client.addServiceInstance(singleton, instancePublishInfo);
    client.setLastUpdatedTime();
    // 发布 ClientRegisterServiceEvent 事件
    NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
}

测试类

com.alibaba.nacos.test.naming.CPInstancesAPI_ITCase#registerInstance_ephemeral_true

0%