Kylin Job生成与调度流程分析

Job生成源码分析

用户在Kylin平台对Cube做Build/Merge/Refresh操作时,会请求同一个URL:http://:/kylin/api/cubes/{cubeName}/rebuild,入口如下:

CubeController.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@RequestMapping(value = "/{cubeName}/rebuild", method = { RequestMethod.PUT }, produces = { "application/json" })
@ResponseBody
public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest req) {
return buildInternal(cubeName, new TSRange(req.getStartTime(), req.getEndTime()), null, null, null,
req.getBuildType(), req.isForce() || req.isForceMergeEmptySegment(), req.getPriorityOffset());
}

private JobInstance buildInternal(String cubeName, TSRange tsRange, SegmentRange segRange, //
Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd,
String buildType, boolean force, Integer priorityOffset) {
try {
String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
CubeInstance cube = jobService.getCubeManager().getCube(cubeName);
checkBuildingSegment(cube);
return jobService.submitJob(cube, tsRange, segRange, sourcePartitionOffsetStart, sourcePartitionOffsetEnd,
CubeBuildTypeEnum.valueOf(buildType), force, submitter, priorityOffset);
} catch (Throwable e) {
logger.error(e.getLocalizedMessage(), e);
throw new InternalErrorException(e.getLocalizedMessage(), e);
}
}

在CubeController中调用了JobService类中的submitJob() -> submitJobInternal()方法,由于Merge/Refresh Job生成过程与Build极其相似,为了描述方便,只介绍Build过程:
JobService.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public JobInstance submitJobInternal(CubeInstance cube, TSRange tsRange, SegmentRange segRange, //
Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd, //
CubeBuildTypeEnum buildType, boolean force, String submitter, Integer priorityOffset) throws IOException {
...

DefaultChainedExecutable job;

CubeSegment newSeg = null;
if (buildType == CubeBuildTypeEnum.BUILD) {
ISource source = SourceManager.getSource(cube);
SourcePartition src = new SourcePartition(tsRange, segRange, sourcePartitionOffsetStart,
sourcePartitionOffsetEnd);
src = source.enrichSourcePartitionBeforeBuild(cube, src);
newSeg = getCubeManager().appendSegment(cube, src);
job = EngineFactory.createBatchCubingJob(newSeg, submitter, priorityOffset);
} else if (...) {...}

getExecutableManager().addJob(job);

JobInstance jobInstance = getSingleJobInstance(job);
return jobInstance;
}

可以看出生成一个完整的Build Job就是生成DefaultChainedExecutable的过程,最后通过ExecutableManager提交Job,等待任务监听线程监听到,并调度顺序执行里面的子任务。下面是生成的过程:
EngineFactory.java:

1
2
3
public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter, Integer priorityOffset) {
return batchEngine(newSegment).createBatchCubingJob(newSegment, submitter, priorityOffset);
}

这里调用了IBatchCubingEngine接口的createBatchCubingJob()方法,该接口的实现类有MRBatchCubingEngine2和SparkBatchCubingEngine2,根据配置来选择构建的计算引擎,下面着重分析一下MR的构建过程:
MRBatchCubingEngine2.java:

1
2
3
4
@Override
public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter, Integer priorityOffset) {
return new BatchCubingJobBuilder2(newSegment, submitter, priorityOffset).build();
}

这是一个入口类,构建的主要逻辑都封装在BatchCubingJobBuilder2类中:
BatchCubingJobBuilder2.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
private final IMRBatchCubingInputSide inputSide;
private final IMRBatchCubingOutputSide2 outputSide;

public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter, Integer priorityOffset) {
super(newSegment, submitter, priorityOffset);
this.inputSide = MRUtil.getBatchCubingInputSide(seg);
this.outputSide = MRUtil.getBatchCubingOutputSide2(seg);
}

public CubingJob build() {
logger.info("MR_V2 new job to BUILD segment " + seg);

final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
final String jobId = result.getId();
final String cuboidRootPath = getCuboidRootPath(jobId);

// Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables
inputSide.addStepPhase1_CreateFlatTable(result);

// Phase 2: Build Dictionary
result.addTask(createFactDistinctColumnsStep(jobId));

if (isEnableUHCDictStep()) {
result.addTask(createBuildUHCDictStep(jobId));
}

result.addTask(createBuildDictionaryStep(jobId));
result.addTask(createSaveStatisticsStep(jobId));

// add materialize lookup tables if needed
LookupMaterializeContext lookupMaterializeContext = addMaterializeLookupTableSteps(result);

outputSide.addStepPhase2_BuildDictionary(result);

if (seg.getCubeDesc().isShrunkenDictFromGlobalEnabled()) {
result.addTask(createExtractDictionaryFromGlobalJob(jobId));
}

// Phase 3: Build Cube
addLayerCubingSteps(result, jobId, cuboidRootPath); // layer cubing, only selected algorithm will execute
addInMemCubingSteps(result, jobId, cuboidRootPath); // inmem cubing, only selected algorithm will execute
outputSide.addStepPhase3_BuildCube(result);

// Phase 4: Update Metadata & Cleanup
result.addTask(createUpdateCubeInfoAfterBuildStep(jobId, lookupMaterializeContext));
inputSide.addStepPhase4_Cleanup(result);
outputSide.addStepPhase4_Cleanup(result);

...
return result;
}

其中inputSide和outputSide变量分别代表着Kylin中的数据源输入(Hive/Kafka)和数据存储输出(HBase),因为在构建过程中,有很多过程可以直接调用现成的工具,例如,创建中间表可以使用hive -e加sql语句来实现,HFile导入HBase使用bulkload方法完成,而不用Kylin来实现,Kylin只负责调度这些job和构建算法,简化了Kylin的工作。

在Kylin中,每一个Job都实现了继承了AbstractExecutable,其中核心方法是execute(),并在execute()函数中依次执行了onExecuteStart()、doWork()、doExecuteFinished()方法。类关系图如下:
Executable

下面的流程图列出了所有可能的task:
all_task
以上就是一个Build Job生成过程。

Job调度流程

初始化线程池

Kylin使用了Spring框架提供RESTful接口,JobService实现了InitializingBean接口,这就意味着在初始化bean的时候Spring会调用这个类实现的afterPropertiesSet方法执行初始化操作。

JobService.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
@SuppressWarnings("unchecked")
@Override
public void afterPropertiesSet() throws Exception {
...

final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
final Scheduler<AbstractExecutable> scheduler = (Scheduler<AbstractExecutable>) SchedulerFactory
.scheduler(kylinConfig.getSchedulerType());

scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock());

...
}

其中有DefaultScheduler和DistributedScheduler类实现了Scheduler接口。默认情况下,使用DefaultScheduler调度,即只有一个实例用于构建任务的调度(即 kylin.server.mode 设置为 all 或者 job 模式)。Kylin2.0开始支持DistributedScheduler,即支持多个任务引擎一起运行(多个 Kylin 节点上配置它的角色为 job 或 all),从而保证任务构建的高可用。

先看看DefaultScheduler类的init()方法:
DefaultScheduler.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Override
public synchronized void init(JobEngineConfig jobEngineConfig, JobLock lock) throws SchedulerException {
// 从zookeeper中获取一个lock,这个锁是互斥的
jobLock = lock;

/**
* 在kylin server中分为三种运行模式(可以通过kylin.server.mode配置项配置,默认为all),分别为all、job和query,
* 前两种是可以执行任务的,而query模式下kylin server只提供元数据的操作以及SQL查询,不能执行构建cube、合并cube之类的任务。
* 因此可以看到只有在前两种模式下,该函数会启动一个线程创建一个DefaultScheduler对象
*/
String serverMode = jobEngineConfig.getConfig().getServerMode();
if (!("job".equals(serverMode.toLowerCase(Locale.ROOT)) || "all".equals(serverMode.toLowerCase(Locale.ROOT)))) {
logger.info("server mode: " + serverMode + ", no need to run job scheduler");
return;
}

...

// 创建一个大小为1的线程池,这个线程池中周期性的调度查看是否有可执行的任务。
fetcherPool = Executors.newScheduledThreadPool(1);

// 真正调度任务执行的线程池的大小,默认为10个,使用的队列是无最大上限的。
int corePoolSize = jobEngineConfig.getMaxConcurrentJobLimit();
jobPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, Long.MAX_VALUE, TimeUnit.DAYS,
new SynchronousQueue<Runnable>());

// 所有正在执行的任务都会保存在context中
context = new DefaultContext(Maps.<String, Executable> newConcurrentMap(), jobEngineConfig.getConfig());

// 根据配置对象获取manager对象
ExecutableManager executableManager = getExecutableManager();
executableManager.resumeAllRunningJobs();

int pollSecond = jobEngineConfig.getPollIntervalSecond();

JobExecutor jobExecutor = new JobExecutor() {
@Override
public void execute(AbstractExecutable executable) {
jobPool.execute(new JobRunner(executable));
}
};

// FetcherRunner线程是周期性的查看其它任务是否可执行的线程
fetcher = jobEngineConfig.getJobPriorityConsidered()
? new PriorityFetcherRunner(jobEngineConfig, context, jobExecutor)
: new DefaultFetcherRunner(jobEngineConfig, context, jobExecutor);

// scheduleAtFixedRate如果执行时间小于指定的间隔时间的情况下,callable或runnable每隔period执行一次,如果执行时间大于指定的间隔时间,每隔程序执行时间执行一次。
// 默认每1分钟抓取1次Job信息
fetcherPool.scheduleAtFixedRate(fetcher, pollSecond / 10, pollSecond, TimeUnit.SECONDS);
}

任务调度模型:
启动了两个线程池,fetcherPool只持有一个线程,这个线程就是周期性的检查任务队列中是否有可执行的任务,如果存在可执行的任务则将它作为参数创建一个新的线程对象,交给线程池jobPool调度执行,线程池jobPool调度到这个线程的时候,执行线程的run函数。

在DistributedScheduler类中的init()方法中,多了一个watchPool线程池,该线程池同样只持有一个线程,用于监听Zookeeper,以至于当有一个job server挂了,其它job servers可以接管它的工作。

1
2
3
watchPool = Executors.newFixedThreadPool(1);
WatcherProcessImpl watcherProcess = new WatcherProcessImpl(this.serverName);
lockWatch = this.jobLock.watchLocks(getWatchPath(), watchPool, watcherProcess);

监听、调度Ready状态的Job

newScheduledThreadPool.scheduleAtFixedRate()调度策略:如果执行时间小于指定的间隔时间的情况下,callable或runnable每隔period执行一次,如果执行时间大于指定的间隔时间,每隔程序执行时间执行一次。

下面看看默认一分钟执行一次的fetcher线程的run方法:
DefaultFetcherRunner.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
synchronized public void run() {
...

for (final String id : getExecutableManger().getAllJobIdsInCache()) {
// ... 统计和过滤了所有状态不为Ready的job

// 调度Ready 状态的 Job
addToJobPool(executable, executable.getDefaultPriority());
}
...
}

protected void addToJobPool(AbstractExecutable executable, int priority) {
...
context.addRunningJob(executable);
jobExecutor.execute(executable);
...
}

该方法中抓取所有Job,统计各种状态信息,最后将Ready状态的job添加到jobPool线程池中去执行。

Job调度流程

jobPool线程池执行的就是的DefaultChainedExecutable线程。DefaultChainedExecutable是一个比较特殊的job,因为它本身并不会执行任务的逻辑,而是相当于多个具体job的容器,顺序的执行这些job。它和MapReduceExecutable等类都一样继承于AbstractExecutable(有start、doWork、finish方法),不同的是它还实现了ChainedExecutable接口。它的实现类是CubingJob。
DefaultChainedExecutable是用于链式执行多个job的job,它的内部可以链接多个job,然后按照job的加入顺序依次执行。

其中onExecuteStart()和onExecuteFinished()方法用于执行前后更新job状态(RUNNING、DISCARDED、STOPPED、ERROR、READY)。在doWork()方法中顺序执行多个job。
DefaultChainedExecutable.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
List<? extends Executable> executables = getTasks();
final int size = executables.size();
for (int i = 0; i < size; ++i) {
Executable subTask = executables.get(i);
ExecutableState state = subTask.getStatus();
if (state == ExecutableState.RUNNING) {
// there is already running subtask, no need to start a new subtask
break;
} else if (state == ExecutableState.STOPPED) {
// the job is paused
break;
} else if (state == ExecutableState.ERROR) {
throw new IllegalStateException(
"invalid subtask state, subtask:" + subTask.getName() + ", state:" + subTask.getStatus());
}
if (subTask.isRunnable()) {
return subTask.execute(context);
}
}
return new ExecuteResult(ExecuteResult.State.SUCCEED);
}

可以看出它会顺序的从executables 数组获取job然后查看job是否可以执行(状态是否为READY),但是令人诧异的是每一个job执行完成之后它并不会调度执行下一个job,而是直接返回了,这也就意味着它只会执行executables链表中的第一个可执行的job,那接下来的怎么办呢? 这其中的奥秘在于每次成功执行一个job之后会调用这个job的onExecuteFinished函数,根据上面的逻辑,每执行完一个job都会跳出doWork函数执行onExecuteFinished函数,而在DefaultChainedExecutable的onExecuteFinished函数中,它会顺序的检查每一个任务的执行状况,如果最近一个任务执行失败,则标记整个job执行失败,如果成功则检查是否全部任务都执行成功,如果是则将整个任务标记为成功,然后检查是否有任何一个任务执行失败,如果是则将这个标记为失败(这一步理论上不需要再检查,因为每一个job完成之后都会检查),如果没有任何job失败并且也没有全部执行成功则再次将自身标记为READY就可以返回了。虽然每次只执行了executables链表中的第一个可执行的job,但是每次执行完成之后都会将自身标记为READY,回想起之前JobRunner线程在每次job执行execute函数之后立即调度下一次查看是否有READY的job,这样在DefaultChainedExecutable对象中的前一个job执行完成之后就会立即调度下一个job执行(因为前一个任务的状态不再是READY),并且提供了检查每次任务执行完成都检查完成状态的逻辑,这样的结构还是挺巧妙的。

以上就是一个Build Job从生成到调度执行的全部过程。