nacos 基于 2.2.4 版本
在 nacos
中,手动创建 service
,更新 service
,删除 service
,更新 instance
,都是通过 raft
协议来实现的,所以来简单介绍下。
service 元数据同步
源码位置: com.alibaba.nacos.naming.core.ServiceOperatorV2Impl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| // 下面这些方法最终都会调用 metadataOperateService 的 updateServiceMetadata 或者 deleteServiceMetadata
// 创建 service
public void create(Service service, ServiceMetadata metadata) throws NacosException {
// 检查 service
if (ServiceManager.getInstance().containSingleton(service)) {
throw new NacosApiException(NacosException.INVALID_PARAM, ErrorCode.SERVICE_ALREADY_EXIST,
String.format("specified service %s already exists!", service.getGroupedServiceName()));
}
// raft 协议更新服务元数据
metadataOperateService.updateServiceMetadata(service, metadata);
}
// 更新 service
@Override
public void update(Service service, ServiceMetadata metadata) throws NacosException {
// 检查 service
if (!ServiceManager.getInstance().containSingleton(service)) {
throw new NacosApiException(NacosException.INVALID_PARAM, ErrorCode.SERVICE_NOT_EXIST,
String.format("service %s not found!", service.getGroupedServiceName()));
}
// raft 协议更新服务元数据
metadataOperateService.updateServiceMetadata(service, metadata);
}
// 删除 service
@Override
public void delete(String namespaceId, String serviceName) throws NacosException {
Service service = getServiceFromGroupedServiceName(namespaceId, serviceName, true);
// raft 协议删除服务元数据
delete(service);
}
|
源码位置: com.alibaba.nacos.naming.core.v2.metadata.NamingMetadataOperateService#updateServiceMetadata
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| // 更新服务元数据
public void updateServiceMetadata(Service service, ServiceMetadata serviceMetadata) {
MetadataOperation<ServiceMetadata> operation = buildMetadataOperation(service);
operation.setMetadata(serviceMetadata);
// 构建 WriteRequest, 这里的 group 是 naming_service_metadata
WriteRequest operationLog = WriteRequest.newBuilder().setGroup(Constants.SERVICE_METADATA)
.setOperation(DataOperation.CHANGE.name()).setData(ByteString.copyFrom(serializer.serialize(operation)))
.build();
// 提交元数据
submitMetadataOperation(operationLog);
}
// 删除服务元数据
public void delete(Service service) throws NacosException {
// 检查 service
if (!ServiceManager.getInstance().containSingleton(service)) {
throw new NacosApiException(NacosException.INVALID_PARAM, ErrorCode.SERVICE_NOT_EXIST,
String.format("service %s not found!", service.getGroupedServiceName()));
}
// 删除 service,必须先注销所有的 instance
if (!serviceStorage.getPushData(service).getHosts().isEmpty()) {
throw new NacosApiException(NacosException.INVALID_PARAM, ErrorCode.SERVICE_DELETE_FAILURE,
"Service " + service.getGroupedServiceName()
+ " is not empty, can't be delete. Please unregister instance first");
}
// 删除服务元数据
metadataOperateService.deleteServiceMetadata(service);
}
|
源码位置: com.alibaba.nacos.naming.core.v2.metadata.NamingMetadataOperateService#submitMetadataOperation
1
2
3
4
5
6
7
8
9
10
11
12
| // 提交元数据, 使用 raft 协议, 最后会被 ServiceMetadataProcessor#onApply 处理
private void submitMetadataOperation(WriteRequest operationLog) {
try {
Response response = cpProtocol.write(operationLog);
if (!response.getSuccess()) {
throw new NacosRuntimeException(NacosException.SERVER_ERROR,
"do metadata operation failed " + response.getErrMsg());
}
} catch (Exception e) {
throw new NacosRuntimeException(NacosException.SERVER_ERROR, "do metadata operation failed", e);
}
}
|
源码位置: com.alibaba.nacos.naming.core.v2.metadata.ServiceMetadataProcessor#onApply
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| @Override
public Response onApply(WriteRequest request) {
readLock.lock();
try {
MetadataOperation<ServiceMetadata> op = serializer.deserialize(request.getData().toByteArray(), processType);
switch (DataOperation.valueOf(request.getOperation())) {
case ADD:
addClusterMetadataToService(op);
break;
case CHANGE:
updateServiceMetadata(op);
break;
case DELETE:
deleteServiceMetadata(op);
break;
default:
return Response.newBuilder().setSuccess(false)
.setErrMsg("Unsupported operation " + request.getOperation()).build();
}
return Response.newBuilder().setSuccess(true).build();
} catch (Exception e) {
Loggers.RAFT.error("onApply {} service metadata operation failed. ", request.getOperation(), e);
String errorMessage = null == e.getMessage() ? e.getClass().getName() : e.getMessage();
return Response.newBuilder().setSuccess(false).setErrMsg(errorMessage).build();
} finally {
readLock.unlock();
}
}
|
instance 元数据同步
源码位置: com.alibaba.nacos.naming.core.InstanceOperatorClientImpl#updateInstance
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| // 更新实例元数据
@Override
public void updateInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
Service service = getService(namespaceId, serviceName, instance.isEphemeral());
// 检查 service
if (!ServiceManager.getInstance().containSingleton(service)) {
throw new NacosApiException(NacosException.INVALID_PARAM, ErrorCode.INSTANCE_ERROR,
"service not found, namespace: " + namespaceId + ", service: " + service);
}
String metadataId = InstancePublishInfo
.genMetadataId(instance.getIp(), instance.getPort(), instance.getClusterName());
// 更新实例元数据
metadataOperateService.updateInstanceMetadata(service, metadataId, buildMetadata(instance));
}
|
源码位置: com.alibaba.nacos.naming.core.v2.metadata.NamingMetadataOperateService#updateInstanceMetadata
1
2
3
4
5
6
7
8
9
10
11
12
| // 更新实例元数据
public void updateInstanceMetadata(Service service, String metadataId, InstanceMetadata instanceMetadata) {
MetadataOperation<InstanceMetadata> operation = buildMetadataOperation(service);
operation.setTag(metadataId);
operation.setMetadata(instanceMetadata);
// 构造 WriteRequest 请求,注意这里的 group 为 naming_instance_metadata
WriteRequest operationLog = WriteRequest.newBuilder().setGroup(Constants.INSTANCE_METADATA)
.setOperation(DataOperation.CHANGE.name()).setData(ByteString.copyFrom(serializer.serialize(operation)))
.build();
// 提交元数据请求
submitMetadataOperation(operationLog);
}
|
源码位置: com.alibaba.nacos.naming.core.v2.metadata.NamingMetadataOperateService#submitMetadataOperation
1
2
3
4
5
6
7
8
9
10
11
12
13
| // 提交元数据请求
private void submitMetadataOperation(WriteRequest operationLog) {
try {
// raft 提交请求,会被 InstanceMetadataProcessor#onApply 来处理
Response response = cpProtocol.write(operationLog);
if (!response.getSuccess()) {
throw new NacosRuntimeException(NacosException.SERVER_ERROR,
"do metadata operation failed " + response.getErrMsg());
}
} catch (Exception e) {
throw new NacosRuntimeException(NacosException.SERVER_ERROR, "do metadata operation failed", e);
}
}
|
源码位置: com.alibaba.nacos.naming.core.v2.metadata.InstanceMetadataProcessor#onApply
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| @Override
public Response onApply(WriteRequest request) {
readLock.lock();
try {
MetadataOperation<InstanceMetadata> op = serializer.deserialize(request.getData().toByteArray(), processType);
switch (DataOperation.valueOf(request.getOperation())) {
case ADD:
case CHANGE:
updateInstanceMetadata(op);
break;
case DELETE:
deleteInstanceMetadata(op);
break;
default:
return Response.newBuilder().setSuccess(false)
.setErrMsg("Unsupported operation " + request.getOperation()).build();
}
return Response.newBuilder().setSuccess(true).build();
} catch (Exception e) {
Loggers.RAFT.error("onApply {} instance metadata operation failed. ", request.getOperation(), e);
String errorMessage = null == e.getMessage() ? e.getClass().getName() : e.getMessage();
return Response.newBuilder().setSuccess(false).setErrMsg(errorMessage).build();
} finally {
readLock.unlock();
}
}
|
测试类
com.alibaba.nacos.test.naming.CPInstancesAPI_ITCase#createService