Flink Hive Source 并行度推断源码解析

批读 Hive

HiveOptions 中有两个配置

1
2
3
4
5
6
7
8
9
10
11
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM =
key("table.exec.hive.infer-source-parallelism")
.defaultValue(true)
.withDescription(
"If is false, parallelism of source are set by config.\n" +
"If is true, source parallelism is inferred according to splits number.\n");

public static final ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX =
key("table.exec.hive.infer-source-parallelism.max")
.defaultValue(1000)
.withDescription("Sets max infer parallelism for source operator.");
  • table.exec.hive.infer-source-parallelism:默认值是 true,表示 source 的并行度是根据数据分区数和文件数推断的,如果设置为 false 的话表示并行度是以配置的为准
  • table.exec.hive.infer-source-parallelism.max:默认值是 1000,表示读取 Hive 数据的 source 最大并行度

这两个参数只在 HiveParallelismInference 类中使用,观察到 HiveParallelismInference 类是专门针对 Hive 并行度配置的工具类,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/**
* A utility class to calculate parallelism for Hive connector considering various factors.
*/
class HiveParallelismInference {

private static final Logger LOG = LoggerFactory.getLogger(HiveParallelismInference.class);

private final ObjectPath tablePath;
private final boolean infer;
private final int inferMaxParallelism;

private int parallelism;

HiveParallelismInference(ObjectPath tablePath, ReadableConfig flinkConf) {
this.tablePath = tablePath;
// 获取 table.exec.hive.infer-source-parallelism 配置并赋值,
this.infer = flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM);
// 获取 table.exec.hive.infer-source-parallelism.max 配置并赋值
this.inferMaxParallelism = flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX);
Preconditions.checkArgument(
inferMaxParallelism >= 1,
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX.key() + " cannot be less than 1");
// 获取 table.exec.resource.default-parallelism 配置
this.parallelism = flinkConf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
}

/**
* Apply limit to calculate the parallelism.
* Here limit is the limit in query <code>SELECT * FROM xxx LIMIT [limit]</code>.
*/
int limit(Long limit) {
if (limit != null) {
parallelism = Math.min(parallelism, (int) (limit / 1000));
}

// make sure that parallelism is at least 1
return Math.max(1, parallelism);
}

//根据
/**
* Infer parallelism by number of files and number of splits.
* If {@link HiveOptions#TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM} is not set this method does nothing.
*/
HiveParallelismInference infer(
SupplierWithException<Integer, IOException> numFiles,
SupplierWithException<Integer, IOException> numSplits) {
//如果设置 table.exec.hive.infer-source-parallelism 为 false,则直接跳过了
if (!infer) {
return this;
}

try {
// `createInputSplits` is costly,
// so we try to avoid calling it by first checking the number of files
// which is the lower bound of the number of splits
int lowerBound = logRunningTime("getNumFiles", numFiles);
if (lowerBound >= inferMaxParallelism) {
parallelism = inferMaxParallelism;
return this;
}

int splitNum = logRunningTime("createInputSplits", numSplits);
parallelism = Math.min(splitNum, inferMaxParallelism);
} catch (IOException e) {
throw new FlinkHiveException(e);
}
return this;
}

private int logRunningTime(
String operationName, SupplierWithException<Integer, IOException> supplier) throws IOException {
long startTimeMillis = System.currentTimeMillis();
int result = supplier.get();
LOG.info(
"Hive source({}}) {} use time: {} ms, result: {}",
tablePath,
operationName,
System.currentTimeMillis() - startTimeMillis,
result);
return result;
}
}

可以看到注释主要是 infer 方法去做的的并行度推断,该方法有两个参数 numFiles 和 numSplits,该方法只在HiveTableSource 类中的 getDataStream 方法中调用,可以查看下图:

那就来看看这两个方法的实现:

getNumFiles 方法是用来获取 Hive 表分区下面的文件数量的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static int getNumFiles(List<HiveTablePartition> partitions, JobConf jobConf) throws IOException {
int numFiles = 0;
FileSystem fs = null;
for (HiveTablePartition partition : partitions) {
StorageDescriptor sd = partition.getStorageDescriptor();
org.apache.hadoop.fs.Path inputPath = new org.apache.hadoop.fs.Path(sd.getLocation());
if (fs == null) {
fs = inputPath.getFileSystem(jobConf);
}
// it's possible a partition exists in metastore but the data has been removed
if (!fs.exists(inputPath)) {
continue;
}
numFiles += fs.listStatus(inputPath).length;
}
return numFiles;
}

createInputSplits 方法是用来将 Hive 表分区下的文件分割成逻辑上的 InputSplit,这里是在 Flink Hive Connector 里面定义了一个 HiveSourceSplit 类来包装 InputSplit,包含了 Hive 表分区的信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public static List<HiveSourceSplit> createInputSplits(
int minNumSplits,
List<HiveTablePartition> partitions,
JobConf jobConf) throws IOException {
List<HiveSourceSplit> hiveSplits = new ArrayList<>();
FileSystem fs = null;
for (HiveTablePartition partition : partitions) {
StorageDescriptor sd = partition.getStorageDescriptor();
org.apache.hadoop.fs.Path inputPath = new org.apache.hadoop.fs.Path(sd.getLocation());
if (fs == null) {
fs = inputPath.getFileSystem(jobConf);
}
// it's possible a partition exists in metastore but the data has been removed
if (!fs.exists(inputPath)) {
continue;
}
InputFormat format;
try {
format = (InputFormat)
Class.forName(sd.getInputFormat(), true, Thread.currentThread().getContextClassLoader()).newInstance();
} catch (Exception e) {
throw new FlinkHiveException("Unable to instantiate the hadoop input format", e);
}
ReflectionUtils.setConf(format, jobConf);
jobConf.set(INPUT_DIR, sd.getLocation());
//TODO: we should consider how to calculate the splits according to minNumSplits in the future.
org.apache.hadoop.mapred.InputSplit[] splitArray = format.getSplits(jobConf, minNumSplits);
for (org.apache.hadoop.mapred.InputSplit inputSplit : splitArray) {
Preconditions.checkState(inputSplit instanceof FileSplit,
"Unsupported InputSplit type: " + inputSplit.getClass().getName());
hiveSplits.add(new HiveSourceSplit((FileSplit) inputSplit, partition, null));
}
}

return hiveSplits;
}

因为上面两个方法的执行可能需要一点时间,所以专门还写了一个 logRunningTime 记录其执行的时间。

如果文件数大于配置的最大并行度,那么作业的并行度直接以配置的最大并行度为准;否则取 InputSplit 个数与配置的最大并行度两者最小值。

1
2
3
4
5
6
7
8
int lowerBound = logRunningTime("getNumFiles", numFiles);
if (lowerBound >= inferMaxParallelism) {
parallelism = inferMaxParallelism;
return this;
}

int splitNum = logRunningTime("createInputSplits", numSplits);
parallelism = Math.min(splitNum, inferMaxParallelism);

然后就是 limit 方法的限制并行度了:

1
2
3
4
5
6
7
8
9
10
11
12
/**
* Apply limit to calculate the parallelism.
* Here limit is the limit in query <code>SELECT * FROM xxx LIMIT [limit]</code>.
*/
int limit(Long limit) {
if (limit != null) {
parallelism = Math.min(parallelism, (int) (limit / 1000));
}

// make sure that parallelism is at least 1
return Math.max(1, parallelism);
}

这个方法的注释的意思是根据查询语句的 limit 来配置并行度,判断前面得到的并行度与 limit/1000 的大小,取两者最小值。举个例子,前面判断这个 Hive 表分区有非常多的文件,比如 10001 个,那大于默认的最大值 1000,那么返回的并行度是 1000,但是因为查询 Hive 的 SQL 只是 100 条,那么这里取值得到的最小值是 0,最后通过 Math.max(1, parallelism) 返回的 source 并行度是 1。

注意⚠️:上面的并行度配置仅仅针对于批作业查 Hive 数据,不针对流读 Hive 数据。

流读 Hive

在 HiveTableSource 类中的 getDataStream 方法中并没有针对流读配置 Source 并行度。

加入知识星球可以看到上面文章:https://t.zsxq.com/E6Mj6uv

×

纯属好玩

扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

文章目录
  1. 1. 批读 Hive
  2. 2. 流读 Hive