// 每流转一个节点,就会执行下面的方法protectedvoidexecuteSynchronous(FlowNodeflowNode){commandContext.getHistoryManager().recordActivityStart(execution);....// Execute any boundary events, sub process boundary events will be executed from the activity behaviorif(!inCompensation&&flowNodeinstanceofActivity){// Only activities can have boundary eventsList<BoundaryEvent>boundaryEvents=((Activity)flowNode).getBoundaryEvents();if(CollectionUtil.isNotEmpty(boundaryEvents)){// 执行 BoundaryEventexecuteBoundaryEvents(boundaryEvents,execution);}}// Execute actual behaviorActivityBehavioractivityBehavior=(ActivityBehavior)flowNode.getBehavior();if(activityBehavior!=null){executeActivityBehavior(activityBehavior,flowNode);}else{logger.debug("No activityBehavior on activity '{}' with execution {}",flowNode.getId(),execution.getId());Context.getAgenda().planTakeOutgoingSequenceFlowsOperation(execution,true);}}
// 执行 BoundaryEventprotectedvoidexecuteBoundaryEvents(Collection<BoundaryEvent>boundaryEvents,ExecutionEntityexecution){// The parent execution becomes a scope, and a child execution is created for each of the boundary events// 遍历 boundaryEventsfor(BoundaryEventboundaryEvent:boundaryEvents){if(CollectionUtil.isEmpty(boundaryEvent.getEventDefinitions())||(boundaryEvent.getEventDefinitions().get(0)instanceofCompensateEventDefinition)){continue;}// A Child execution of the current execution is created to represent the boundary event being active// 创建子节点, 会新增一条 ACT_RU_EXECUTION 表的数据ExecutionEntitychildExecutionEntity=commandContext.getExecutionEntityManager().createChildExecution((ExecutionEntity)execution);childExecutionEntity.setParentId(execution.getId());childExecutionEntity.setCurrentFlowElement(boundaryEvent);childExecutionEntity.setScope(false);// 获取 behaviorActivityBehaviorboundaryEventBehavior=((ActivityBehavior)boundaryEvent.getBehavior());logger.debug("Executing boundary event activityBehavior {} with execution {}",boundaryEventBehavior.getClass(),childExecutionEntity.getId());// 执行 behaviorboundaryEventBehavior.execute(childExecutionEntity);}}
// 执行 behaviorpublicvoidexecute(DelegateExecutionexecution){ExecutionEntityexecutionEntity=(ExecutionEntity)execution;if(!(execution.getCurrentFlowElement()instanceofBoundaryEvent)){thrownewActivitiException("Programmatic error: "+this.getClass()+" should not be used for anything else than a boundary event");}JobManagerjobManager=Context.getCommandContext().getJobManager();// 创建定时任务TimerJobEntitytimerJob=jobManager.createTimerJob(timerEventDefinition,interrupting,executionEntity,TriggerTimerEventJobHandler.TYPE,TimerEventHandler.createConfiguration(execution.getCurrentActivityId(),timerEventDefinition.getEndDate(),timerEventDefinition.getCalendarName()));if(timerJob!=null){// 调度定时任务, 会新增一条 ACT_RU_TIMER_JOB 表jobManager.scheduleTimerJob(timerJob);}}publicvoidscheduleTimerJob(TimerJobEntitytimerJob){...// 会新增一条 ACT_RU_TIMER_JOB 表processEngineConfiguration.getTimerJobEntityManager().insert(timerJob);..}
// 在工作流框架启动时,这个轮询任务就会执行publicsynchronizedvoidrun(){log.info("{} starting to acquire async jobs due");Thread.currentThread().setName("activiti-acquire-timer-jobs");finalCommandExecutorcommandExecutor=asyncExecutor.getProcessEngineConfiguration().getCommandExecutor();while(!isInterrupted){try{// 获取到期的 timerJobfinalAcquiredTimerJobEntitiesacquiredJobs=commandExecutor.execute(newAcquireTimerJobsCmd(asyncExecutor));commandExecutor.execute(newCommand<Void>(){@OverridepublicVoidexecute(CommandContextcommandContext){for(TimerJobEntityjob:acquiredJobs.getJobs()){// 移动 timerJob 到 job 中,也就是 ACT_RU_TIMER_JOB 表数据到 ACT_RU_JOB 表jobManager.moveTimerJobToExecutableJob(job);}returnnull;}});}catch(ActivitiOptimisticLockingExceptionoptimisticLockingException){// 发生这个异常,说明同时有另外一个服务获取相同的 timerJobif(log.isDebugEnabled()){log.debug("Optimistic locking exception during timer job acquisition. If you have multiple timer executors running against the same database, "+"this exception means that this thread tried to acquire a timer job, which already was acquired by another timer executor acquisition thread."+"This is expected behavior in a clustered environment. "+"You can ignore this message if you indeed have multiple timer executor acquisition threads running against the same database. "+"Exception message: {}",optimisticLockingException.getMessage());}}catch(Throwablee){log.error("exception during timer job acquisition: {}",e.getMessage(),e);millisToWait=asyncExecutor.getDefaultTimerJobAcquireWaitTimeInMillis();}...省略等待的代码}log.info("{} stopped async job due acquisition");}
// commandContext.getJobManager().execute(job)@Overridepublicvoidexecute(Jobjob){if(jobinstanceofJobEntity){if(Job.JOB_TYPE_MESSAGE.equals(job.getJobType())){executeMessageJob((JobEntity)job);}elseif(Job.JOB_TYPE_TIMER.equals(job.getJobType())){// 执行 timerJobexecuteTimerJob((JobEntity)job);}}else{thrownewActivitiException("Only jobs with type JobEntity are supported to be executed");}}