// 同步数据// 数据会包装为 DistroDataRequest, 会被 DistroDataRequestHandler 处理@Overridepublicvoidrun(){for(DistroDataeach:verifyData){try{if(transportAgent.supportCallbackTransport()){doSyncVerifyDataWithCallback(each);}else{doSyncVerifyData(each);}}catch(Exceptione){Loggers.DISTRO.error("[DISTRO-FAILED] verify data for type {} to {} failed.",resourceType,targetServer,e);}}}
// 接收到 Verify 数据之后, 验证 client 的 revision, 并进行续约@OverridepublicbooleanverifyClient(DistroClientVerifyInfoverifyData){StringclientId=verifyData.getClientId();IpPortBasedClientclient=clients.get(clientId);if(null!=client){// remote node of old version will always verify with zero revisionif(0==verifyData.getRevision()||client.getRevision()==verifyData.getRevision()){// 更新心跳时间, 也就是续约NamingExecuteTaskDispatcher.getInstance().dispatchAndExecuteTask(clientId,newClientBeatUpdateTask(client));returntrue;}else{Loggers.DISTRO.info("[DISTRO-VERIFY-FAILED] IpPortBasedClient[{}] revision local={}, remote={}",client.getClientId(),client.getRevision(),verifyData.getRevision());}}// 验证不通过,返回 false, 最终会返回错误码returnfalse;}
@Overridepublicvoidrun(){try{load();if(!checkCompleted()){GlobalExecutor.submitLoadDataTask(this,distroConfig.getLoadDataRetryDelayMillis());}else{loadCallback.onSuccess();Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");}}catch(Exceptione){loadCallback.onFailed(e);Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ",e);}}// 加载节点数据privatevoidload()throwsException{// 检查节点列表while(memberManager.allMembersWithoutSelf().isEmpty()){Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");TimeUnit.SECONDS.sleep(1);}while(distroComponentHolder.getDataStorageTypes().isEmpty()){Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");TimeUnit.SECONDS.sleep(1);}// 从远端加载快照数据, 用于服务快速启动for(Stringeach:distroComponentHolder.getDataStorageTypes()){if(!loadCompletedMap.containsKey(each)||!loadCompletedMap.get(each)){loadCompletedMap.put(each,loadAllDataSnapshotFromRemote(each));}}}
// DistroFilter 会拦截所有的请求 @OverridepublicvoiddoFilter(ServletRequestservletRequest,ServletResponseservletResponse,FilterChainfilterChain)throwsIOException,ServletException{ReuseHttpServletRequestreq=newReuseHttpServletRequest((HttpServletRequest)servletRequest);HttpServletResponseresp=(HttpServletResponse)servletResponse;StringurlString=req.getRequestURI();if(StringUtils.isNotBlank(req.getQueryString())){urlString+="?"+req.getQueryString();}try{// 获取请求对应的方法Methodmethod=controllerMethodsCache.getMethod(req);Stringpath=newURI(req.getRequestURI()).getPath();if(method==null){thrownewNoSuchMethodException(req.getMethod()+" "+path);}// 方法是否有 @CanDistro 注解,没有就直接放行,不处理if(!method.isAnnotationPresent(CanDistro.class)){filterChain.doFilter(req,resp);return;}// 获取请求参数中的 ip 和 portStringdistroTag=distroTagGenerator.getResponsibleTag(req);// 当前节点是否响应该请求,如果是,直接放行,这个很重要, 后面继续解析if(distroMapper.responsible(distroTag)){filterChain.doFilter(req,resp);return;}// proxy request to other server if necessary:StringuserAgent=req.getHeader(HttpHeaderConsts.USER_AGENT_HEADER);// 判断必须是 client 的请求,不能是 server 之间的请求if(StringUtils.isNotBlank(userAgent)&&userAgent.contains(UtilsAndCommons.NACOS_SERVER_HEADER)){// This request is sent from peer server, should not be redirected again:Loggers.SRV_LOG.error("receive invalid redirect request from peer {}",req.getRemoteAddr());resp.sendError(HttpServletResponse.SC_BAD_REQUEST,"receive invalid redirect request from peer "+req.getRemoteAddr());return;}// 获取转发节点, 根据 ip:port 的 hash 值对 serverList.size() 取余来计算finalStringtargetServer=distroMapper.mapSrv(distroTag);List<String>headerList=newArrayList<>(16);Enumeration<String>headers=req.getHeaderNames();while(headers.hasMoreElements()){StringheaderName=headers.nextElement();headerList.add(headerName);headerList.add(req.getHeader(headerName));}finalStringbody=IoUtils.toString(req.getInputStream(),StandardCharsets.UTF_8.name());finalMap<String,String>paramsValue=HttpClient.translateParameterMap(req.getParameterMap());// 用 HttpClient 来转发请求到对应的节点上RestResult<String>result=HttpClient.request(HTTP_PREFIX+targetServer+req.getRequestURI(),headerList,paramsValue,body,PROXY_CONNECT_TIMEOUT,PROXY_READ_TIMEOUT,StandardCharsets.UTF_8.name(),req.getMethod());Stringdata=result.ok()?result.getData():result.getMessage();try{// 响应客户端请求WebUtils.response(resp,data,result.getCode());}catch(Exceptionignore){Loggers.SRV_LOG.warn("[DISTRO-FILTER] request failed: "+distroMapper.mapSrv(distroTag)+urlString);}}catch(AccessControlExceptione){resp.sendError(HttpServletResponse.SC_FORBIDDEN,"access denied: "+ExceptionUtil.getAllExceptionMsg(e));}catch(NoSuchMethodExceptione){resp.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED,"no such api:"+req.getMethod()+":"+req.getRequestURI());}catch(Exceptione){Loggers.SRV_LOG.warn("[DISTRO-FILTER] Server failed: ",e);resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,"Server failed, "+ExceptionUtil.getAllExceptionMsg(e));}}
// 当前节点是否响应该请求publicbooleanresponsible(StringresponsibleTag){finalList<String>servers=healthyList;if(!switchDomain.isDistroEnabled()||EnvUtil.getStandaloneMode()){returntrue;}if(CollectionUtils.isEmpty(servers)){// means distro config is not ready yetreturnfalse;}// 当前节点地址找不到,不转发请求 StringlocalAddress=EnvUtil.getLocalAddress();intindex=servers.indexOf(localAddress);intlastIndex=servers.lastIndexOf(localAddress);if(lastIndex<0||index<0){returntrue;}// 获取 hash 值,然后取余inttarget=distroHash(responsibleTag)%servers.size();returntarget>=index&&target<=lastIndex;}