05 任务执行器的设计

nacos 基于 2.2.4 版本

任务执行器的设计

在 nacos 中,所有的任务都实现了 NacosTask 接口,所有的任务执行器都实现了 NacosTaskExecuteEngine 接口。

接下来看看这两个接口的设计

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// NacosTask
public interface NacosTask {

  /**
   * Judge Whether this nacos task should do.
   *
   * @return true means the nacos task should be done, otherwise false
   */
  // 对于立即执行的任务,默认 return true.
  // 对于延时执行的任务,判断 System.currentTimeMillis() - this.lastProcessTime >= this.taskInterval.
  boolean shouldProcess();
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// NacosTaskExecuteEngine
// 从接口中可以看出
// 1. addTask, 一个 key 对应一个 task
// 2. addProcessor, 一个 key 对应一个 processor, 有默认的 processor
public interface NacosTaskExecuteEngine<T extends NacosTask> extends Closeable {
    
    /**
     * Get Task size in execute engine.
     *
     * @return size of task
     */
    int size();
    
    /**
     * Whether the execute engine is empty.
     *
     * @return true if the execute engine has no task to do, otherwise false
     */
    boolean isEmpty();
    
    /**
     * Add task processor {@link NacosTaskProcessor} for execute engine.
     *
     * @param key           key of task
     * @param taskProcessor task processor
     */
    void addProcessor(Object key, NacosTaskProcessor taskProcessor);
    
    /**
     * Remove task processor {@link NacosTaskProcessor} form execute engine for key.
     *
     * @param key key of task
     */
    void removeProcessor(Object key);
    
    /**
     * Try to get {@link NacosTaskProcessor} by key, if non-exist, will return default processor.
     *
     * @param key key of task
     * @return task processor for task key or default processor if task processor for task key non-exist
     */
    NacosTaskProcessor getProcessor(Object key);
    
    /**
     * Get all processor key.
     *
     * @return collection of processors
     */
    Collection<Object> getAllProcessorKey();
    
    /**
     * Set default task processor. If do not find task processor by task key, use this default processor to process
     * task.
     *
     * @param defaultTaskProcessor default task processor
     */
    void setDefaultTaskProcessor(NacosTaskProcessor defaultTaskProcessor);
    
    /**
     * Add task into execute pool.
     *
     * @param key  key of task
     * @param task task
     */
    void addTask(Object key, T task);
    
    /**
     * Remove task.
     *
     * @param key key of task
     * @return nacos task
     */
    T removeTask(Object key);
    
    /**
     * Get all task keys.
     *
     * @return collection of task keys.
     */
    Collection<Object> getAllTaskKeys();
}

具体实现类(举例)

PushDelayTaskPushDelayTaskExecuteEngine

源码位置: com.alibaba.nacos.naming.push.v2.task.PushDelayTask

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// PushDelayTask 的构造方法
// delay: 延时的事件
// targetClient: 推送的 clientId
public PushDelayTask(Service service, long delay, String targetClient) {
    this.service = service;
    this.pushToAll = false;
    this.targetClients = new HashSet<>(1);
    this.targetClients.add(targetClient);
    setTaskInterval(delay);
    // 设置上一次处理时间,用来判断是否过期
    setLastProcessTime(System.currentTimeMillis());
}

// 每一个延时任务都会有 merge 方法, 用来合并相同的 task, 这样可以更高效的处理任务
// 比如因为客户端重试,发起了两个一样的 task,经过 merge 之后,处理一个就行。 
@Override
public void merge(AbstractDelayTask task) {
    if (!(task instanceof PushDelayTask)) {
        return;
    }
    PushDelayTask oldTask = (PushDelayTask) task;
    if (isPushToAll() || oldTask.isPushToAll()) {
        pushToAll = true;
        targetClients = null;
    } else {
        targetClients.addAll(oldTask.getTargetClients());
    }
    setLastProcessTime(Math.min(getLastProcessTime(), task.getLastProcessTime()));
    Loggers.PUSH.info("[PUSH] Task merge for {}", service);
}
    
// shouldProcess 方法在父类上面, 判断当前任务都是过期
@Override
public boolean shouldProcess() {
    return (System.currentTimeMillis() - this.lastProcessTime >= this.taskInterval);
}

源码位置: com.alibaba.nacos.naming.push.v2.task.PushDelayTaskExecuteEngine#PushDelayTaskExecuteEngine

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// PushDelayTaskExecuteEngine 的初始化
public PushDelayTaskExecuteEngine(ClientManager clientManager, ClientServiceIndexesManager indexesManager,
                                  ServiceStorage serviceStorage, NamingMetadataManager metadataManager,
                                  PushExecutor pushExecutor, SwitchDomain switchDomain) {
    ...
    // 设置默认的processor, 用来处理任务, 这里没有特殊的 processor
    setDefaultTaskProcessor(new PushDelayTaskProcessor(this));
}

// 在父类 NacosDelayTaskExecuteEngine 中用单一的线程池来启动,然后处理任务
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
    ...
    tasks = new ConcurrentHashMap<>(initCapacity);
    // 线程池
    processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
    // 最后调用自己的方法来处理任务
    processingExecutor
            .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}

// 处理任务
private class ProcessRunnable implements Runnable {
    
    @Override
    public void run() {
        try {
            processTasks();
        } catch (Throwable e) {
            getEngineLog().error(e.toString(), e);
        }
    }
}

// 在父类 NacosDelayTaskExecuteEngine 中
// com.alibaba.nacos.common.task.engine.NacosDelayTaskExecuteEngine#processTasks
protected void processTasks() {
    // 获取所有的 key,因为 addTask 方法,所以每一个 task 都会关联到一个 key
    Collection<Object> keys = getAllTaskKeys();
    for (Object taskKey : keys) {
        // 判断 task 是否到期,每个 task 创建时都是指定 taskInterval
        AbstractDelayTask task = removeTask(taskKey);
        if (null == task) {
            continue;
        }
        // 获取相应的 processor 来处理,一般来说就是默认的 processor, 比如 PushDelayTaskProcessor, 下面会说这个类
        NacosTaskProcessor processor = getProcessor(taskKey);
        if (null == processor) {
            getEngineLog().error("processor not found for task, so discarded. " + task);
            continue;
        }
        try {
            // 处理 task,如果处理失败,重新添加 task
            // ReAdd task if process failed
            if (!processor.process(task)) {
                retryFailedTask(taskKey, task);
            }
        } catch (Throwable e) {
            getEngineLog().error("Nacos task execute error ", e);
            retryFailedTask(taskKey, task);
        }
    }
}

// 重新添加 task
private void retryFailedTask(Object key, AbstractDelayTask task) {
    task.setLastProcessTime(System.currentTimeMillis());
    addTask(key, task);
}

源码位置: com.alibaba.nacos.naming.push.v2.task.PushDelayTaskExecuteEngine.PushDelayTaskProcessor

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// PushDelayTaskProcessor 处理 PushDelayTask, 重新包装为 PushExecuteTask, 然后放入线程池中运行
private static class PushDelayTaskProcessor implements NacosTaskProcessor {
    
    private final PushDelayTaskExecuteEngine executeEngine;
    
    public PushDelayTaskProcessor(PushDelayTaskExecuteEngine executeEngine) {
        this.executeEngine = executeEngine;
    }
    
    @Override
    public boolean process(NacosTask task) {
        PushDelayTask pushDelayTask = (PushDelayTask) task;
        Service service = pushDelayTask.getService();
        NamingExecuteTaskDispatcher.getInstance()
                .dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask));
        return true;
    }
}

测试类

com.alibaba.nacos.common.task.engine.NacosDelayTaskExecuteEngineTest

0%