@Overridepublicvoidinit(RaftConfigconfig){// 判断是否已经初始化if(initialized.compareAndSet(false,true)){this.raftConfig=config;// 这里是一个空方法NotifyCenter.registerToSharePublisher(RaftEvent.class);// raftServer 初始化this.raftServer.init(this.raftConfig);// raftServer 启动this.raftServer.start();// There is only one consumer to ensure that the internal consumption// is sequential and there is no concurrent competition// 监听 RaftEvent 事件NotifyCenter.registerSubscriber(newSubscriber<RaftEvent>(){@OverridepublicvoidonEvent(RaftEventevent){Loggers.RAFT.info("This Raft event changes : {}",event);finalStringgroupId=event.getGroupId();Map<String,Map<String,Object>>value=newHashMap<>();Map<String,Object>properties=newHashMap<>();finalStringleader=event.getLeader();finalLongterm=event.getTerm();finalList<String>raftClusterInfo=event.getRaftClusterInfo();finalStringerrMsg=event.getErrMsg();// Leader information needs to be selectively updated. If it is valid data,// the information in the protocol metadata is updated.MapUtil.putIfValNoEmpty(properties,MetadataKey.LEADER_META_DATA,leader);MapUtil.putIfValNoNull(properties,MetadataKey.TERM_META_DATA,term);MapUtil.putIfValNoEmpty(properties,MetadataKey.RAFT_GROUP_MEMBER,raftClusterInfo);MapUtil.putIfValNoEmpty(properties,MetadataKey.ERR_MSG,errMsg);value.put(groupId,properties);// 保存元数据metaData.load(value);// The metadata information is injected into the metadata information of the node// 会发布 MembersChangeEvent 事件injectProtocolMetaData(metaData);}@OverridepublicClass<?extendsEvent>subscribeType(){returnRaftEvent.class;}});}}
synchronizedvoidstart(){// 判断是否已经启动if(!isStarted){Loggers.RAFT.info("========= The raft protocol is starting... =========");try{// init raft group node// 初始化 raft 的节点列表com.alipay.sofa.jraft.NodeManagerraftNodeManager=com.alipay.sofa.jraft.NodeManager.getInstance();for(Stringaddress:raftConfig.getMembers()){PeerIdpeerId=PeerId.parsePeer(address);conf.addPeer(peerId);raftNodeManager.addAddress(peerId.getEndpoint());}// 设置节点配置nodeOptions.setInitialConf(conf);// 创建 rpcServer 和自定义请求处理器, 这个 server 在多个 raft group 中是共享的rpcServer=JRaftUtils.initRpcServer(this,localPeerId);// rpcServer 初始化if(!this.rpcServer.init(null)){Loggers.RAFT.error("Fail to init [BaseRpcServer].");thrownewRuntimeException("Fail to init [BaseRpcServer].");}// Initialize multi raft group service frameworkisStarted=true;// 创建 raftGroupcreateMultiRaftGroup(processors);Loggers.RAFT.info("========= The raft protocol start finished... =========");}catch(Exceptione){Loggers.RAFT.error("raft protocol start failure, cause: ",e);thrownewJRaftException(e);}}}
// 创建 raftGroup// 从这个方法可以看出// 每个 groupName 都会对应一个 processor,一个 NacosStateMachine,一个 Node。synchronizedvoidcreateMultiRaftGroup(Collection<RequestProcessor4CP>processors){// There is no reason why the LogProcessor cannot be processed because of the synchronizationif(!this.isStarted){// 添加 processorthis.processors.addAll(processors);return;}// raft 日志存储路径finalStringparentPath=Paths.get(EnvUtil.getNacosHome(),"data/protocol/raft").toString();// 遍历 processorsfor(RequestProcessor4CPprocessor:processors){finalStringgroupName=processor.group();// 判断 group 是否重复if(multiRaftGroup.containsKey(groupName)){thrownewDuplicateRaftGroupException(groupName);}// Ensure that each Raft Group has its own configuration and NodeOptionsConfigurationconfiguration=conf.copy();NodeOptionscopy=nodeOptions.copy();// 初始化目录JRaftUtils.initDirectory(parentPath,groupName,copy);// Here, the LogProcessor is passed into StateMachine, and when the StateMachine// triggers onApply, the onApply of the LogProcessor is actually called// raft 的状态机NacosStateMachinemachine=newNacosStateMachine(this,processor);// 设置状态机和配置copy.setFsm(machine);copy.setInitialConf(configuration);// Set snapshot interval, default 1800 secondsintdoSnapshotInterval=ConvertUtils.toInt(raftConfig.getVal(RaftSysConstants.RAFT_SNAPSHOT_INTERVAL_SECS),RaftSysConstants.DEFAULT_RAFT_SNAPSHOT_INTERVAL_SECS);// If the business module does not implement a snapshot processor, cancel the snapshotdoSnapshotInterval=CollectionUtils.isEmpty(processor.loadSnapshotOperate())?0:doSnapshotInterval;// 设置快照copy.setSnapshotIntervalSecs(doSnapshotInterval);Loggers.RAFT.info("create raft group : {}",groupName);// 创建 raftGroupServiceRaftGroupServiceraftGroupService=newRaftGroupService(groupName,localPeerId,copy,rpcServer,true);// Because BaseRpcServer has been started before, it is not allowed to start again here// 启动 raftGroupServiceNodenode=raftGroupService.start(false);// 设置节点machine.setNode(node);// 更新配置RouteTable.getInstance().updateConfiguration(groupName,configuration);// 定时任务,注册自己到 raft 集群中RaftExecutor.executeByCommon(()->registerSelfToCluster(groupName,localPeerId,configuration));// Turn on the leader auto refresh for this groupRandomrandom=newRandom();longperiod=nodeOptions.getElectionTimeoutMs()+random.nextInt(5*1000);// 定时任务,刷新 raft 配置,可以获取集群成员列表RaftExecutor.scheduleRaftMemberRefreshJob(()->refreshRouteTable(groupName),nodeOptions.getElectionTimeoutMs(),period,TimeUnit.MILLISECONDS);// 添加 multiRaftGroupmultiRaftGroup.put(groupName,newRaftGroupTuple(node,processor,raftGroupService,machine));}}
// 增加节点 voidregisterSelfToCluster(StringgroupId,PeerIdselfIp,Configurationconf){while(!isShutdown){try{// 获取 groupId 对应的成员列表List<PeerId>peerIds=cliService.getPeers(groupId,conf);if(peerIds.contains(selfIp)){return;}// 添加自己的 ip 到集群中Statusstatus=cliService.addPeer(groupId,conf,selfIp);if(status.isOk()){return;}Loggers.RAFT.warn("Failed to join the cluster, retry...");}catch(Exceptione){Loggers.RAFT.error("Failed to join the cluster, retry...",e);}ThreadUtils.sleep(1_000L);}}
// 删除节点@OverridepublicvoidmemberChange(Set<String>addresses){// 这里会重试 5 次for(inti=0;i<5;i++){// 删除节点if(this.raftServer.peerChange(jRaftMaintainService,addresses)){return;}ThreadUtils.sleep(100L);}Loggers.RAFT.warn("peer removal failed");}booleanpeerChange(JRaftMaintainServicemaintainService,Set<String>newPeers){// This is only dealing with node deletion, the Raft protocol, where the node adds itself to the cluster when it starts upSet<String>oldPeers=newHashSet<>(this.raftConfig.getMembers());this.raftConfig.setMembers(localPeerId.toString(),newPeers);oldPeers.removeAll(newPeers);// 检查节点是否有删除,为空,表示节点不变或者有新的节点加入if(oldPeers.isEmpty()){returntrue;}Set<String>waitRemove=oldPeers;AtomicIntegersuccessCnt=newAtomicInteger(0);// 遍历 multiRaftGroup 来删除multiRaftGroup.forEach(newBiConsumer<String,RaftGroupTuple>(){@Overridepublicvoidaccept(Stringgroup,RaftGroupTupletuple){Map<String,String>params=newHashMap<>();params.put(JRaftConstants.GROUP_ID,group);params.put(JRaftConstants.COMMAND_NAME,JRaftConstants.REMOVE_PEERS);params.put(JRaftConstants.COMMAND_VALUE,StringUtils.join(waitRemove,StringUtils.COMMA));// 执行删除命令, REMOVE_PEERSRestResult<String>result=maintainService.execute(params);if(result.ok()){successCnt.incrementAndGet();}else{Loggers.RAFT.error("Node removal failed : {}",result);}}});returnsuccessCnt.get()==multiRaftGroup.size();}// 源码位置:com.alibaba.nacos.core.distributed.raft.utils.JRaftOps#REMOVE_PEERSREMOVE_PEERS(JRaftConstants.REMOVE_PEERS){@OverridepublicRestResult<String>execute(CliServicecliService,StringgroupId,Nodenode,Map<String,String>args){finalConfigurationconf=node.getOptions().getInitialConf();finalStringpeers=args.get(JRaftConstants.COMMAND_VALUE);// 遍历节点for(Strings:peers.split(",")){List<PeerId>peerIds=cliService.getPeers(groupId,conf);finalPeerIdwaitRemove=PeerId.parsePeer(s);// 不包含,则不需要删除if(!peerIds.contains(waitRemove)){continue;}// 删除节点Statusstatus=cliService.removePeer(groupId,conf,waitRemove);if(!status.isOk()){returnRestResultUtils.failed(status.getErrorMsg());}}returnRestResultUtils.success();}},
// raftProtocol 来写请求@OverridepublicResponsewrite(WriteRequestrequest)throwsException{// 异步请求CompletableFuture<Response>future=writeAsync(request);// Here you wait for 10 seconds, as long as possible, for the request to completereturnfuture.get(10_000L,TimeUnit.MILLISECONDS);}@OverridepublicCompletableFuture<Response>writeAsync(WriteRequestrequest){// raftServer 提交请求returnraftServer.commit(request.getGroup(),request,newCompletableFuture<>());}
// leader 节点 apply 请求publicvoidapplyOperation(Nodenode,Messagedata,FailoverClosureclosure){finalTasktask=newTask();// 设置回调task.setDone(newNacosClosure(data,status->{// 把响应设置给 closure, closure 就是 FailoverClosureImplNacosClosure.NacosStatusnacosStatus=(NacosClosure.NacosStatus)status;closure.setThrowable(nacosStatus.getThrowable());closure.setResponse(nacosStatus.getResponse());closure.run(nacosStatus);}));// add request type field at the head of task data.// 封装请求,WriteRequest 或者 ReadRequestbyte[]requestTypeFieldBytes=newbyte[2];requestTypeFieldBytes[0]=ProtoMessageUtil.REQUEST_TYPE_FIELD_TAG;if(datainstanceofReadRequest){requestTypeFieldBytes[1]=ProtoMessageUtil.REQUEST_TYPE_READ;}else{requestTypeFieldBytes[1]=ProtoMessageUtil.REQUEST_TYPE_WRITE;}byte[]dataBytes=data.toByteArray();// 设置数据task.setData((ByteBuffer)ByteBuffer.allocate(requestTypeFieldBytes.length+dataBytes.length).put(requestTypeFieldBytes).put(dataBytes).position(0));// apply 请求,写入主节点日志,复制日志到从节点,超过半数节点成功,然后执行状态机 NacosStateMachinenode.apply(task);}
// raft 处理 ReadRequest CompletableFuture<Response>get(finalReadRequestrequest){finalStringgroup=request.getGroup();CompletableFuture<Response>future=newCompletableFuture<>();// 检查 group 是否存在finalRaftGroupTupletuple=findTupleByGroup(group);if(Objects.isNull(tuple)){future.completeExceptionally(newNoSuchRaftGroupException(group));returnfuture;}finalNodenode=tuple.node;finalRequestProcessorprocessor=tuple.processor;try{// raft 协议的一致性读,如果返回成功,可以确保数据是一致的,直接本地处理就可以node.readIndex(BytesUtil.EMPTY_BYTES,newReadIndexClosure(){@Overridepublicvoidrun(Statusstatus,longindex,byte[]reqCtx){if(status.isOk()){try{Responseresponse=processor.onRequest(request);future.complete(response);}catch(Throwablet){MetricsMonitor.raftReadIndexFailed();future.completeExceptionally(newConsistencyException("The conformance protocol is temporarily unavailable for reading",t));}return;}// 返回错误,从 leader 中读取数据MetricsMonitor.raftReadIndexFailed();Loggers.RAFT.error("ReadIndex has error : {}, go to Leader read.",status.getErrorMsg());MetricsMonitor.raftReadFromLeader();readFromLeader(request,future);}});returnfuture;}catch(Throwablee){MetricsMonitor.raftReadFromLeader();Loggers.RAFT.warn("Raft linear read failed, go to Leader read logic : {}",e.toString());// run raft readreadFromLeader(request,future);returnfuture;}}// 从 leader 中读取数据publicvoidreadFromLeader(finalReadRequestrequest,finalCompletableFuture<Response>future){commit(request.getGroup(),request,future);}// 提交请求, 这个方法上面已经解析过了publicCompletableFuture<Response>commit(finalStringgroup,finalMessagedata,finalCompletableFuture<Response>future){LoggerUtils.printIfDebugEnabled(Loggers.RAFT,"data requested this time : {}",data);finalRaftGroupTupletuple=findTupleByGroup(group);if(tuple==null){future.completeExceptionally(newIllegalArgumentException("No corresponding Raft Group found : "+group));returnfuture;}FailoverClosureImplclosure=newFailoverClosureImpl(future);finalNodenode=tuple.node;if(node.isLeader()){// The leader node directly applies this requestapplyOperation(node,data,closure);}else{// Forward to Leader for request processinginvokeToLeader(group,data,rpcRequestTimeoutMs,closure);}returnfuture;}