java 的线程池可以充当一个任务执行器的,但是有时候不符合我们的要求,所以需要自定义开发。
满足1:可以根据任务数量来动态调整核心线程数和最大线程数。
满足2:支持重复执行的任务。
RepeatTask
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| public class RepeatTask extends AbstractTask {
private final long maxDelay;
public RepeatTask(Runnable runnable, long maxDelay, TimeUnit timeUnit) {
super(runnable);
this.maxDelay = timeUnit.toMillis(maxDelay);
}
@Override
public void run() {
long prevTime = System.currentTimeMillis();
try {
super.run();
} finally {
long diff = System.currentTimeMillis() - prevTime;
diff = Long.max(maxDelay - diff, 0);
taskExecutor.schedule(this, diff, TimeUnit.MILLISECONDS);
}
}
}
|
通过计算执行的时间来判断下一次的延时时间,从而实现重复执行。
DefaultTaskExecutor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| private final HashedWheelTimer timer;
private final ThreadPoolExecutor threadPoolExecutor;
private final TaskExecutorPoolSizeAdjuster poolSizeAdjuster;
public DefaultTaskExecutor(ThreadPoolExecutor threadPoolExecutor, TaskExecutorPoolSizeAdjuster poolSizeAdjuster) {
assert threadPoolExecutor != null;
this.timer = new HashedWheelTimer(new DefaultThreadFactory("TaskExecutor-Timer"), 10, TimeUnit.MILLISECONDS, 100, true, -1, threadPoolExecutor);
this.threadPoolExecutor = threadPoolExecutor;
this.poolSizeAdjuster = poolSizeAdjuster;
init();
}
private void init() {
if (poolSizeAdjuster != null) {
submit(new RepeatTask(() -> {
int maxPoolSize = poolSizeAdjuster.calctMaximumPoolSize();
threadPoolExecutor.setMaximumPoolSize(maxPoolSize);
int corePoolSize = poolSizeAdjuster.calcCorePoolSize();
threadPoolExecutor.setCorePoolSize(corePoolSize);
}, 10, TimeUnit.SECONDS));
}
}
|
使用 HashedWheelTimer
来调度延时任务。
在构造方法中传入 TaskExecutorPoolSizeAdjuster
来动态调整线程数。
示例代码
demo-task-executor