时间轮和线程池实现任务执行器

java 的线程池可以充当一个任务执行器的,但是有时候不符合我们的要求,所以需要自定义开发。
满足1:可以根据任务数量来动态调整核心线程数最大线程数
满足2:支持重复执行的任务。

RepeatTask

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
public class RepeatTask extends AbstractTask {

    private final long maxDelay;

    public RepeatTask(Runnable runnable, long maxDelay, TimeUnit timeUnit) {
        super(runnable);
        this.maxDelay = timeUnit.toMillis(maxDelay);
    }

    @Override
    public void run() {
        long prevTime = System.currentTimeMillis();
        try {
            super.run();
        } finally {
            long diff = System.currentTimeMillis() - prevTime;
            diff = Long.max(maxDelay - diff, 0);
            taskExecutor.schedule(this, diff, TimeUnit.MILLISECONDS);
        }
    }
}

通过计算执行的时间来判断下一次的延时时间,从而实现重复执行

DefaultTaskExecutor

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
private final HashedWheelTimer timer;
private final ThreadPoolExecutor threadPoolExecutor;
private final TaskExecutorPoolSizeAdjuster poolSizeAdjuster;

public DefaultTaskExecutor(ThreadPoolExecutor threadPoolExecutor, TaskExecutorPoolSizeAdjuster poolSizeAdjuster) {
    assert threadPoolExecutor != null;
    this.timer = new HashedWheelTimer(new DefaultThreadFactory("TaskExecutor-Timer"), 10, TimeUnit.MILLISECONDS, 100, true, -1, threadPoolExecutor);
    this.threadPoolExecutor = threadPoolExecutor;
    this.poolSizeAdjuster = poolSizeAdjuster;
    init();
}

private void init() {
    if (poolSizeAdjuster != null) {
        submit(new RepeatTask(() -> {
            int maxPoolSize = poolSizeAdjuster.calctMaximumPoolSize();
            threadPoolExecutor.setMaximumPoolSize(maxPoolSize);
            int corePoolSize = poolSizeAdjuster.calcCorePoolSize();
            threadPoolExecutor.setCorePoolSize(corePoolSize);
        }, 10, TimeUnit.SECONDS));
    }
}

使用 HashedWheelTimer调度延时任务。 在构造方法中传入 TaskExecutorPoolSizeAdjuster 来动态调整线程数。

示例代码

demo-task-executor

0%