// 长轮询publicStringdoPollingConfig(HttpServletRequestrequest,HttpServletResponseresponse,Map<String,String>clientMd5Map,intprobeRequestSize)throwsIOException{// Long polling.// 是否支持长轮询, 判断请求头 Long-Pulling-Timeout 是否存在if(LongPollingService.isSupportLongPolling(request)){// 添加长轮询longPollingService.addLongPollingClient(request,response,clientMd5Map,probeRequestSize);returnHttpServletResponse.SC_OK+"";}// Compatible with short polling logic.// 下面的逻辑都是兼容短轮询的,也就是立马对比 md5 值是否发生改变// 如果不一样,返回改变的 groupKey,下面的逻辑就不看了List<String>changedGroups=MD5Util.compareMd5(request,response,clientMd5Map);// Compatible with short polling result.StringoldResult=MD5Util.compareMd5OldResult(changedGroups);StringnewResult=MD5Util.compareMd5ResultString(changedGroups);Stringversion=request.getHeader(Constants.CLIENT_VERSION_HEADER);if(version==null){version="2.0.0";}intversionNum=Protocol.getVersionNumber(version);// Before 2.0.4 version, return value is put into header.if(versionNum<START_LONG_POLLING_VERSION_NUM){response.addHeader(Constants.PROBE_MODIFY_RESPONSE,oldResult);response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW,newResult);}else{request.setAttribute("content",newResult);}// Disable cache.response.setHeader("Pragma","no-cache");response.setDateHeader("Expires",0);response.setHeader("Cache-Control","no-cache,no-store");response.setStatus(HttpServletResponse.SC_OK);returnHttpServletResponse.SC_OK+"";}
// 添加长轮询publicvoidaddLongPollingClient(HttpServletRequestreq,HttpServletResponsersp,Map<String,String>clientMd5Map,intprobeRequestSize){// str 就是长轮询的超时时间,由客户端传入Stringstr=req.getHeader(LongPollingService.LONG_POLLING_HEADER);StringnoHangUpFlag=req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);intdelayTime=SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME,500);// Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.longtimeout=-1L;// 固定长轮询的时间if(isFixedPolling()){timeout=Math.max(10000,getFixedPollingInterval());// Do nothing but set fix polling timeout.}else{// 计算长轮询时间timeout=Math.max(10000,Long.parseLong(str)-delayTime);longstart=System.currentTimeMillis();List<String>changedGroups=MD5Util.compareMd5(req,rsp,clientMd5Map);// 如果 md5 值有变化,立即返回结果if(changedGroups.size()>0){generateResponse(req,rsp,changedGroups);LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}",System.currentTimeMillis()-start,"instant",RequestUtil.getRemoteIp(req),"polling",clientMd5Map.size(),probeRequestSize,changedGroups.size());return;}elseif(noHangUpFlag!=null&&noHangUpFlag.equalsIgnoreCase(TRUE_STR)){// 不挂起请求LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}",System.currentTimeMillis()-start,"nohangup",RequestUtil.getRemoteIp(req),"polling",clientMd5Map.size(),probeRequestSize,changedGroups.size());return;}}Stringip=RequestUtil.getRemoteIp(req);ConnectionCheckResponseconnectionCheckResponse=checkLimit(req);// 检查限流if(!connectionCheckResponse.isSuccess()){generate503Response(req,rsp,connectionCheckResponse.getMessage());return;}// Must be called by http thread, or send response.// 开始异步finalAsyncContextasyncContext=req.startAsync();// AsyncContext.setTimeout() is incorrect, Control by oneselfasyncContext.setTimeout(0L);StringappName=req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);Stringtag=req.getHeader("Vipserver-Tag");// 提交长轮询任务ConfigExecutor.executeLongPolling(newClientLongPolling(asyncContext,clientMd5Map,ip,probeRequestSize,timeout,appName,tag));}
// 遍历订阅者// 接受 LocalDataChangeEvent 事件,会执行这个方法@Overridepublicvoidrun(){try{ConfigCacheService.getContentBetaMd5(groupKey);for(Iterator<ClientLongPolling>iter=allSubs.iterator();iter.hasNext();){ClientLongPollingclientSub=iter.next();if(clientSub.clientMd5Map.containsKey(groupKey)){// If published tag is not in the beta list, then it skipped.// beta 配置有变动if(isBeta&&!CollectionUtils.contains(betaIps,clientSub.ip)){continue;}// If published tag is not in the tag list, then it skipped.// tag 配置有变动if(StringUtils.isNotBlank(tag)&&!tag.equals(clientSub.tag)){continue;}getRetainIps().put(clientSub.ip,System.currentTimeMillis());// 删除iter.remove();// Delete subscribers' relationships.LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}",(System.currentTimeMillis()-changeTime),"in-advance",RequestUtil.getRemoteIp((HttpServletRequest)clientSub.asyncContext.getRequest()),"polling",clientSub.clientMd5Map.size(),clientSub.probeRequestSize,groupKey);// 返回响应clientSub.sendResponse(Arrays.asList(groupKey));}}}catch(Throwablet){LogUtil.DEFAULT_LOG.error("data change error: {}",ExceptionUtil.getStackTrace(t));}}