Flink分区相关
0、要点
Flink的分区列不会存数据,也就是两个列有一个分区列,则文件只会存另一个列的数据
1、CreateTable
根据SQL的执行流程,进入TableEnvironmentImpl.executeInternal,createTable分支
} else if (operation instanceof CreateTableOperation) { CreateTableOperation createTableOperation = (CreateTableOperation) operation; if (createTableOperation.isTemporary()) { catalogManager.createTemporaryTable( createTableOperation.getCatalogTable(), createTableOperation.getTableIdentifier(), createTableOperation.isIgnoreIfExists()); } else { catalogManager.createTable( createTableOperation.getCatalogTable(), createTableOperation.getTableIdentifier(), createTableOperation.isIgnoreIfExists()); } return TableResultImpl.TABLE_RESULT_OK;
1.1 GenericInMemoryCatalog
之后调用catalog.createTable,以GenericInMemoryCatalog来说,其中有几个分区的Map,但实际这里并不存储分区信息,可以看到,这里创建的是空表
} else { tables.put(tablePath, table.copy()); if (isPartitionedTable(tablePath)) { partitions.put(tablePath, new LinkedHashMap()); partitionStats.put(tablePath, new LinkedHashMap()); partitionColumnStats.put(tablePath, new LinkedHashMap()); } }
1.2 Catalog中的分区Map
partitionStats和partitionColumnStats是放一些统计信息的,partitions目前看是单独的分区操作时会用到,如createPartition(对应SQL语句ALTER TABLE ADD PARTITION),并且这一块存储的只有Alter语句里修改的partition信息,主要还是一些描述信息,并不是主要用于记录分区,信息来源在SqlToOperationConverter.convertAlterTable当中
for (int i = 0; i1.3 AbstractCatalogTable
真正有用的信息是在table表的信息当中,核心在tables.put(tablePath, table.copy());这一句当中,table.copy()存储了表信息,最终调用到实现类CatalogTableImpl,其父类的构造函数有分区信息。表中存储了相应的分区信息,SQL最终操作的都是表,所以都是从这取的分区信息,注意这是一个StringList
public AbstractCatalogTable( TableSchema tableSchema, List partitionKeys, Map options, String comment) { this.tableSchema = checkNotNull(tableSchema, "tableSchema cannot be null"); this.partitionKeys = checkNotNull(partitionKeys, "partitionKeys cannot be null"); this.options = checkNotNull(options, "options cannot be null");2、DescribeTable
同样的在ableEnvironmentImpl.executeInternal,describe分支
} else if (operation instanceof DescribeTableOperation) { DescribeTableOperation describeTableOperation = (DescribeTableOperation) operation; Optional result = catalogManager.getTable(describeTableOperation.getSqlIdentifier()); if (result.isPresent()) { return buildDescribeResult(result.get().getResolvedSchema()); } else { throw new ValidationException( String.format( "Tables or views with the identifier '%s' doesn't exist", describeTableOperation.getSqlIdentifier().asSummaryString())); }2.1 获取及解析表
首先这边getTable方法,获取Table,由CatalogManager做入口,正常的表都是走的getPermanentTable
public Optional getTable(ObjectIdentifier objectIdentifier) { CatalogBaseTable temporaryTable = temporaryTables.get(objectIdentifier); if (temporaryTable != null) { final ResolvedCatalogBaseTable resolvedTable = resolveCatalogBaseTable(temporaryTable); return Optional.of(ContextResolvedTable.temporary(objectIdentifier, resolvedTable)); } else { return getPermanentTable(objectIdentifier); } }这里对Table进行了多层封装,最底层的还是来自GenericInMemoryCatalog当中,前面CreateTable的时候有一个tables的Map,这里就是从这个里面拿当时存储的Table类
public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException { checkNotNull(tablePath); if (!tableExists(tablePath)) { throw new TableNotExistException(getName(), tablePath); } else { return tables.get(tablePath).copy(); } }返回到最上层前会对这个Table进行解析封装,在CatalogManager.resolveCatalogTable进行解析,这里有一个重要的点就是对分区的校验,必须对应表的列
// Validate partition keys are included in physical columns final List physicalColumns = resolvedSchema.getColumns().stream() .filter(Column::isPhysical) .map(Column::getName) .collect(Collectors.toList()); table.getPartitionKeys() .forEach( partitionKey -> { if (!physicalColumns.contains(partitionKey)) { throw new ValidationException( String.format( "Invalid partition key '%s'. A partition key must " + "reference a physical column in the schema. " + "Available columns are: %s", partitionKey, physicalColumns)); } });2.2 返回信息
最终构建返回信息,这里有一个关注点就是接口只传入了表的Schema信息,没有传入分区信息
return buildDescribeResult(result.get().getResolvedSchema());Describe返回控制台是一个表结构的形式,所以这里会构建一个表的格式
private TableResultInternal buildDescribeResult(ResolvedSchema schema) { Object[][] rows = buildTableColumns(schema); return buildResult(generateTableColumnsNames(), generateTableColumnsDataTypes(), rows); }buildTableColumns是把Shema信息构建成行数据,因为Describe输出的表是有固定字段的,所以这里要对应固定字段填值
具体的列名在generateTableColumnsNames当中指定,这个也是最后返回信息里的表头
private String[] generateTableColumnsNames() { return new String[] {"name", "type", "null", "key", "extras", "watermark"}; }generateTableColumnsDataTypes设置上面几个列的字段类型
private DataType[] generateTableColumnsDataTypes() { return new DataType[] { DataTypes.STRING(), DataTypes.STRING(), DataTypes.BOOLEAN(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING() }; }最终把Shema构建的行信息插入表中就构成了返回信息,是一个TableResultImpl的类型
private TableResultInternal buildResult(String[] headers, DataType[] types, Object[][] rows) { ResolvedSchema schema = ResolvedSchema.physical(headers, types); ResultProvider provider = new StaticResultProvider( Arrays.stream(rows).map(Row::of).collect(Collectors.toList())); return TableResultImpl.builder() .resultKind(ResultKind.SUCCESS_WITH_CONTENT) .schema(ResolvedSchema.physical(headers, types)) .resultProvider(provider) .setPrintStyle( PrintStyle.tableauWithDataInferredColumnWidths( schema, provider.getRowDataStringConverter(), Integer.MAX_VALUE, true, false)) .build(); }整体的输出形态如下
±--------±-------±-----±----±-------±----------+
| name | type | null | key | extras | watermark |
±--------±-------±-----±----±-------±----------+
| user | BIGINT | TRUE | | | |
| product | STRING | TRUE | | | |
±--------±-------±-----±----±-------±----------+
3、Insert
3.1 封装SinkModifyOperation
首先是封装SinkModifyOperation的时候,其中有表,在SqlToOperationConverter.convertSqlInsert当中,getTableOrError最终调用的跟前面describe获取表一样,从Catalog拿表并且对分区进行校验,这些步骤都不少,所以SinkModifyOperation里封装的contextResolvedTable是带分区信息的
ContextResolvedTable contextResolvedTable = catalogManager.getTableOrError(identifier); return new SinkModifyOperation( contextResolvedTable, query, insert.getStaticPartitionKVs(), insert.isOverwrite(), dynamicOptions);3.2 转TableSink
在SQL转换流程的PlannerBase.translateToRel当中,走catalogSink分支,在getTableSink接口调用时,走到TableFactoryUtil.findAndCreateTableSink
public static TableSink findAndCreateTableSink(TableSinkFactory.Context context) { try { return TableFactoryService.find( TableSinkFactory.class, context.getTable().toProperties()) .createTableSink(context); } catch (Throwable t) { throw new TableException("findAndCreateTableSink failed.", t); } }这里在toProperties接口当中,会把分区传入成为一项Property
public Map toProperties() { DescriptorProperties descriptor = new DescriptorProperties(false); descriptor.putTableSchema(SCHEMA, getSchema()); descriptor.putPartitionKeys(getPartitionKeys()); Map properties = new HashMap(getOptions()); descriptor.putProperties(properties); return descriptor.asMap(); }在下一步的createTableSink接口调用的时候,也会调用到toProperties,但目前这个好像只有CSV的两个实现类,所以TableSink的具体过程待研究
getTableSink最后调用的是createDynamicTableSink,这里面封装了table,就是跟前面一样的从catalog拿的表,所以这个表是包含分区信息的
val tableSink = FactoryUtil.createDynamicTableSink( factory, objectIdentifier, tableToFind, Collections.emptyMap(), getTableConfig, getFlinkContext.getClassLoader, isTemporary)3.3 分区分配
从文件数据源追踪下去,有一个分区分配器的类PartitionComputer,在fileconnector当中,有四个实现类,分别是file和hive的
Sink在SQL转换的时候调用translateToPlanInternal,有构建SinkRuntimeProvider的流程,在CommonExecSink.createSinkTransformation
final SinkRuntimeProvider runtimeProvider = tableSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded));在FileSystemTableSink当中,最终构建了分区计算器
private DataStreamSink createBatchSink( DataStream inputStream, Context sinkContext, final int parallelism) { FileSystemOutputFormat.Builder builder = new FileSystemOutputFormat.Builder(); builder.setPartitionComputer(partitionComputer()); private RowDataPartitionComputer partitionComputer() { return new RowDataPartitionComputer( defaultPartName, DataType.getFieldNames(physicalRowDataType).toArray(new String[0]), DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]), partitionKeys.toArray(new String[0])); }在这里面,会根据分区列名生成分区index用作后续使用,也就是必须跟列有对应关系,不过计算分区的时候还是用分区名的List
this.partitionIndexes = Arrays.stream(partitionColumns).mapToInt(columnList::indexOf).toArray(); this.partitionTypes = Arrays.stream(partitionIndexes) .mapToObj(columnTypeList::get) .toArray(LogicalType[]::new); this.partitionFieldGetters = IntStream.range(0, partitionTypes.length) .mapToObj( i -> RowData.createFieldGetter( partitionTypes[i], partitionIndexes[i])) .toArray(RowData.FieldGetter[]::new); List partitionIndexList = Arrays.stream(partitionIndexes).boxed().collect(Collectors.toList());generatePartValues会计算数据的分区,基于前面RowDataPartitionComputer初始化时基于分区构建的各种对象进行计算
public LinkedHashMap generatePartValues(RowData in) { LinkedHashMap partSpec = new LinkedHashMap(); for (int i = 0; iFileSystemTableSink最下层写的实现类是GroupedPartitionWriter和DynamicPartitionWriter,GroupedPartitionWriter的write如下
public void write(T in) throws Exception { String partition = generatePartitionPath(computer.generatePartValues(in)); if (!partition.equals(currentPartition)) { if (currentFormat != null) { currentFormat.close(); } currentFormat = context.createNewOutputFormat(manager.createPartitionDir(partition)); currentPartition = partition; } currentFormat.writeRecord(computer.projectColumnsToWrite(in)); }3.4 分区列不写数据
RowDataPartitionComputer.projectColumnsToWrite计算需要写数据的列,也就是说只要这几列会写数据,核心就是去除分区列
for (int i = 0; inonPartitionIndexes在构建RowDataPartitionComputer,可以看到,就是遍历列名,然后去除分区列
this.nonPartitionIndexes = IntStream.range(0, columnNames.length) .filter(c -> !partitionIndexList.contains(c)) .toArray();3.5 分区目录
在PartitionPathUtils.generatePartitionPath当中定义了分区目录的形式,以{列名=分区值的形式},因为目前key就是列名
for (Map.Entry e : partitionSpec.entrySet()) { if (i > 0) { suffixBuf.append(Path.SEPARATOR); } suffixBuf.append(escapePathName(e.getKey())); suffixBuf.append('='); suffixBuf.append(escapePathName(e.getValue())); i++; }4、Select
参照Sink的流程,在CommonExecTableSourceScan.translateToPlanInternal有构建Source的流程
final ScanTableSource tableSource = tableSourceSpec.getScanTableSource(planner.getFlinkContext()); ScanTableSource.ScanRuntimeProvider provider = tableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);4.1 getScanTableSource
在getScanTableSource接口当中,由createDynamicTableSource创建TableSource
tableSource = FactoryUtil.createDynamicTableSource( factory, contextResolvedTable.getIdentifier(), contextResolvedTable.getResolvedTable(), loadOptionsFromCatalogTable(contextResolvedTable, flinkContext), flinkContext.getTableConfig(), flinkContext.getClassLoader(), contextResolvedTable.isTemporary());以FileSystemTableFactory为例,最终创建FileSystemTableSource,其中传入的参数就有分区信息,分区信息同样是来自Catalog的表
public DynamicTableSource createDynamicTableSource(Context context) { FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); validate(helper); return new FileSystemTableSource( context.getObjectIdentifier(), context.getPhysicalRowDataType(), context.getCatalogTable().getPartitionKeys(), helper.getOptions(), discoverDecodingFormat(context, BulkReaderFormatFactory.class), discoverDecodingFormat(context, DeserializationFormatFactory.class)); }4.2 getScanRuntimeProvider
之后进行provider创建,是基于上面的FileSystemTableSource进行的调用,最终到它的getScanRuntimeProvider接口当中,内部有很多跟分区相关的操作
首先是无分区就返回一个默认简单的类
// When this table has no partition, just return an empty source. if (!partitionKeys.isEmpty() && getOrFetchPartitions().isEmpty()) { return InputFormatProvider.of(new CollectionInputFormat(new ArrayList(), null)); }后续又是用表的列字段对分区进行过滤
// Filter out partition columns not in producedDataType final List partitionKeysToExtract = DataType.getFieldNames(producedDataType).stream() .filter(this.partitionKeys::contains) .collect(Collectors.toList());后续这个过滤过的分区会被传入format,format是Flink最后执行读写的类(但是这里有些传了有些没传,需要看一下差别)
format虽然不一样,但是最终都是调用的return createSourceProvider(format);,在createSourceProvider当中有获取分区的操作(不是分区key,是分区值)
private SourceProvider createSourceProvider(BulkFormat bulkFormat) { final FileSource.FileSourceBuilder fileSourceBuilder = FileSource.forBulkFileFormat(bulkFormat, paths());这里的paths()就是基于remainingPartitions获取要读取的分区目录
private Path[] paths() { if (partitionKeys.isEmpty()) { return new Path[] {path}; } else { return getOrFetchPartitions().stream() .map(FileSystemTableSource.this::toFullLinkedPartSpec) .map(PartitionPathUtils::generatePartitionPath) .map(n -> new Path(path, n)) .toArray(Path[]::new); } }4.3 remainingPartitions
这个是分区下推使用的一个东西,当支持分区下推时,就会把这个值设置为分区,在PartitionPushDownSpec的apply当中
这个不是存储的分区列,而是实际的分区值
public void apply(DynamicTableSource tableSource, SourceAbilityContext context) { if (tableSource instanceof SupportsPartitionPushDown) { ((SupportsPartitionPushDown) tableSource).applyPartitions(partitions); } else { throw new TableException( String.format( "%s does not support SupportsPartitionPushDown.", tableSource.getClass().getName())); } }这里applyPartitions就是设置remainingPartitions的
public void applyPartitions(List remainingPartitions) { this.remainingPartitions = remainingPartitions; }还有其他的地方会进行设置,在getOrFetchPartitions当中
private List getOrFetchPartitions() { if (remainingPartitions == null) { remainingPartitions = listPartitions().get(); } return remainingPartitions; }这里listPartitions就是去数据目录扫描分区
public Optional listPartitions() { try { return Optional.of( PartitionPathUtils.searchPartSpecAndPaths( path.getFileSystem(), path, partitionKeys.size()) .stream() .map(tuple2 -> tuple2.f0) .map( spec -> { LinkedHashMap ret = new LinkedHashMap(); spec.forEach( (k, v) -> ret.put( k, defaultPartName.equals(v) ? null : v)); return ret; }) .collect(Collectors.toList())); } catch (Exception e) { throw new TableException("Fetch partitions fail.", e); } }5、分区下推
能力由PartitionPushDownSpec决定,规则是PushPartitionIntoTableSourceScanRule
分区信息同样的从catalog中的table上获取
List partitionFieldNames = tableSourceTable .contextResolvedTable() .getResolvedTable() .getPartitionKeys();之后从过滤条件中提取分区相关的条件
// extract partition predicates RelBuilder relBuilder = call.builder(); RexBuilder rexBuilder = relBuilder.getRexBuilder(); Tuple2 allPredicates = RexNodeExtractor.extractPartitionPredicateList( filter.getCondition(), FlinkRelOptUtil.getMaxCnfNodeCount(scan), inputFieldNames.toArray(new String[0]), rexBuilder, partitionFieldNames.toArray(new String[0])); RexNode partitionPredicate = RexUtil.composeConjunction( rexBuilder, JavaConversions.seqAsJavaList(allPredicates._1));之后获取分区列类型,这里又是从分区列获取的
// build pruner LogicalType[] partitionFieldTypes = partitionFieldNames.stream() .map( name -> { int index = inputFieldNames.indexOf(name); if (index后续会进行分区过滤,这里就会生成上面的remainingPartitions,获取首先是调用TableSource的listPartitions,如果能直接获取到就用它的值,获取不到会进行一个获取逻辑处理readPartitionFromCatalogAndPrune,这里要注意分区条件已经被转换成了RexNode形态,最终过滤还是基于catalog,目前看只有HiveCatalog有处理逻辑
return catalog.listPartitionsByFilter(tablePath, partitionFilters).stream() .map(CatalogPartitionSpec::getPartitionSpec) .collect(Collectors.toList());之后,会基于上面的分区过滤,构建新的TableSourceTable,替换其中的tableStats
FlinkStatistic newStatistic = FlinkStatistic.builder() .statistic(tableSourceTable.getStatistic()) .tableStats(newTableStat) .build(); TableSourceTable newTableSourceTable = tableSourceTable.copy( dynamicTableSource, newStatistic, new SourceAbilitySpec[] {partitionPushDownSpec}); LogicalTableScan newScan = LogicalTableScan.create(scan.getCluster(), newTableSourceTable, scan.getHints());