03 注销实例

nacos 基于 2.2.4 版本

注销实例的 curl

1
2
3
4
5
curl --location --request DELETE 'http://localhost:8848/nacos/v2/ns/instance' \
--header 'Content-Type: application/x-www-form-urlencoded' \
--data-urlencode 'serviceName=test' \
--data-urlencode 'ip=1.2.3.4' \
--data-urlencode 'port=80'

注销实例的主流程

源码位置: com.alibaba.nacos.naming.controllers.v2.InstanceControllerV2#deregister

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
public Result<String> deregister(InstanceForm instanceForm) throws NacosException {
    // check param
    instanceForm.validate();
    checkWeight(instanceForm.getWeight());
    // build instance
    Instance instance = buildInstance(instanceForm);
    // 移除 instance
    instanceServiceV2.removeInstance(instanceForm.getNamespaceId(), buildCompositeServiceName(instanceForm), instance);
    // 发布 DeregisterInstanceTraceEvent 事件
    NotifyCenter.publishEvent(new DeregisterInstanceTraceEvent(System.currentTimeMillis(), "",
            false, DeregisterInstanceReason.REQUEST, instanceForm.getNamespaceId(), instanceForm.getGroupName(),
            instanceForm.getServiceName(), instance.getIp(), instance.getPort()));
    return Result.success("ok");
}

源码位置: com.alibaba.nacos.naming.core.InstanceOperatorClientImpl#removeInstance

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@Override
public void removeInstance(String namespaceId, String serviceName, Instance instance) {
    // 判断 instance 是否已经注册过, 如果没有,则不用处理
    boolean ephemeral = instance.isEphemeral();
    String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);
    if (!clientManager.contains(clientId)) {
        Loggers.SRV_LOG.warn("remove instance from non-exist client: {}", clientId);
        return;
    }
    Service service = getService(namespaceId, serviceName, ephemeral);
    // 注销实例,如果是临时实例,EphemeralClientOperationServiceImpl,如果是持久化实例,PersistentClientOperationServiceImpl
    clientOperationService.deregisterInstance(service, instance, clientId);
}

临时实例注销

源码位置: com.alibaba.nacos.naming.core.v2.service.impl.EphemeralClientOperationServiceImpl#deregisterInstance

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
public void deregisterInstance(Service service, Instance instance, String clientId) {
    // 判断 service 是否存在
    if (!ServiceManager.getInstance().containSingleton(service)) {
        Loggers.SRV_LOG.warn("remove instance from non-exist service: {}", service);
        return;
    }
    Service singleton = ServiceManager.getInstance().getSingleton(service);
    Client client = clientManager.getClient(clientId);
    if (!clientIsLegal(client, clientId)) {
        return;
    }
    // 移除内存中的 instance 对象,这里会发布 ClientChangedEvent 事件,这个很重要
    InstancePublishInfo removedInstance = client.removeServiceInstance(singleton);
    client.setLastUpdatedTime();
    client.recalculateRevision();
    if (null != removedInstance) {
        // 发布 ClientDeregisterServiceEvent 事件
        NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(singleton, clientId));
        // 发布 InstanceMetadataEvent 事件
        NotifyCenter.publishEvent(
                new MetadataEvent.InstanceMetadataEvent(singleton, removedInstance.getMetadataId(), true));
    }
}

源码位置: com.alibaba.nacos.naming.consistency.ephemeral.distro.v2.DistroClientDataProcessor#syncToAllServer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// DistroClientDataProcessor 接受 ClientChangedEvent, 负责同步数据给其他节点
private void syncToAllServer(ClientEvent event) {
    Client client = event.getClient();
    // Only ephemeral data sync by Distro, persist client should sync by raft.
    if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
        return;
    }
    if (event instanceof ClientEvent.ClientDisconnectEvent) {
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        distroProtocol.sync(distroKey, DataOperation.DELETE);
    } else if (event instanceof ClientEvent.ClientChangedEvent) {
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        distroProtocol.sync(distroKey, DataOperation.CHANGE);
    }
}

源码位置: com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager#handleClientOperation

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// ClientServiceIndexesManager 会监听 ClientDeregisterServiceEvent 事件
private void handleClientOperation(ClientOperationEvent event) {
    Service service = event.getService();
    String clientId = event.getClientId();
    if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
        addPublisherIndexes(service, clientId);
    } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
        // 移除 service 的 clientId
        removePublisherIndexes(service, clientId);
    } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
        addSubscriberIndexes(service, clientId);
    } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
        removeSubscriberIndexes(service, clientId);
    }
}

private void removePublisherIndexes(Service service, String clientId) {
    publisherIndexes.computeIfPresent(service, (s, ids) -> {
        ids.remove(clientId);
        // 发布 ServiceChangedEvent 事件
        NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
        return ids.isEmpty() ? null : ids;
    });
}

源码位置: com.alibaba.nacos.naming.push.v2.NamingSubscriberServiceV2Impl#onEvent

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// NamingSubscriberServiceV2Impl 会监听 ServiceChangedEvent 事件
@Override
public void onEvent(Event event) {
    if (event instanceof ServiceEvent.ServiceChangedEvent) {
        // If service changed, push to all subscribers.
        // 注销 instance, 必须推送给所有的订阅者
        ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
        Service service = serviceChangedEvent.getService();
        delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
        MetricsMonitor.incrementServiceChangeCount(service.getNamespace(), service.getGroup(), service.getName());
    } else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
        // If service is subscribed by one client, only push this client.
        ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
        Service service = subscribedEvent.getService();
        delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(),
                subscribedEvent.getClientId()));
    }
}

持久化实例注销

源码位置: com.alibaba.nacos.naming.core.v2.service.impl.PersistentClientOperationServiceImpl#deregisterInstance

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
@Override
public void deregisterInstance(Service service, Instance instance, String clientId) {
    final InstanceStoreRequest request = new InstanceStoreRequest();
    request.setService(service);
    request.setInstance(instance);
    request.setClientId(clientId);
    // 注意这里的 group,在构造函数中进行注册对应的 processor
    final WriteRequest writeRequest = WriteRequest.newBuilder().setGroup(group())
            .setData(ByteString.copyFrom(serializer.serialize(request))).setOperation(DataOperation.DELETE.name())
            .build();
    
    try {
        // 由 CPProtcol 写入请求到本地,然后同步到其他节点,最后应用状态机
        protocol.write(writeRequest);
        Loggers.RAFT.info("Client unregistered. service={}, clientId={}, instance={}", service, instance, clientId);
    } catch (Exception e) {
        throw new NacosRuntimeException(NacosException.SERVER_ERROR, e);
    }
}


// 构造函数中注册 requestProcessor, 这个可以分组的
public PersistentClientOperationServiceImpl(final PersistentIpPortClientManager clientManager) {
    this.clientManager = clientManager;
    this.protocol = ApplicationUtils.getBean(ProtocolManager.class).getCpProtocol();
    this.protocol.addRequestProcessors(Collections.singletonList(this));
}

// 处理状态机
@Override
public Response onApply(WriteRequest request) {
    final Lock lock = readLock;
    lock.lock();
    try {
        final InstanceStoreRequest instanceRequest = serializer.deserialize(request.getData().toByteArray());
        final DataOperation operation = DataOperation.valueOf(request.getOperation());
        switch (operation) {
            case ADD:
                onInstanceRegister(instanceRequest.service, instanceRequest.instance,
                        instanceRequest.getClientId());
                break;
            case DELETE:
                // 注销实例
                onInstanceDeregister(instanceRequest.service, instanceRequest.getClientId());
                break;
            case CHANGE:
                if (instanceAndServiceExist(instanceRequest)) {
                    onInstanceRegister(instanceRequest.service, instanceRequest.instance,
                            instanceRequest.getClientId());
                }
                break;
            default:
                return Response.newBuilder().setSuccess(false).setErrMsg("unsupport operation : " + operation)
                        .build();
        }
        return Response.newBuilder().setSuccess(true).build();
    } catch (Exception e) {
        Loggers.RAFT.warn("Persistent client operation failed. ", e);
        return Response.newBuilder().setSuccess(false)
                .setErrMsg("Persistent client operation failed. " + e.getMessage()).build();
    } finally {
        lock.unlock();
    }
}

// 注销实例, 这里的逻辑和临时实例注销的逻辑是一样的,所以就不继续解析了
private void onInstanceDeregister(Service service, String clientId) {
    Service singleton = ServiceManager.getInstance().getSingleton(service);
    Client client = clientManager.getClient(clientId);
    if (client == null) {
        Loggers.RAFT.warn("client not exist onInstanceDeregister, clientId : {} ", clientId);
        return;
    }
    // 移除内存的 instance,发布 ClientChangedEvent 事件
    client.removeServiceInstance(singleton);
    client.setLastUpdatedTime();
    if (client.getAllPublishedService().isEmpty()) {
        clientManager.clientDisconnected(clientId);
    }
    // 发布 ClientDeregisterServiceEvent 事件       
    NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(singleton, clientId));
}

测试类

com.alibaba.nacos.test.naming.CPInstancesAPI_ITCase#registerInstance_ephemeral_true

0%