Flink分区相关

03-01 1161阅读

0、要点

  Flink的分区列不会存数据,也就是两个列有一个分区列,则文件只会存另一个列的数据

Flink分区相关
(图片来源网络,侵删)

1、CreateTable

  根据SQL的执行流程,进入TableEnvironmentImpl.executeInternal,createTable分支

} else if (operation instanceof CreateTableOperation) {
    CreateTableOperation createTableOperation = (CreateTableOperation) operation;
    if (createTableOperation.isTemporary()) {
        catalogManager.createTemporaryTable(
                createTableOperation.getCatalogTable(),
                createTableOperation.getTableIdentifier(),
                createTableOperation.isIgnoreIfExists());
    } else {
        catalogManager.createTable(
                createTableOperation.getCatalogTable(),
                createTableOperation.getTableIdentifier(),
                createTableOperation.isIgnoreIfExists());
    }
    return TableResultImpl.TABLE_RESULT_OK;

1.1 GenericInMemoryCatalog

  之后调用catalog.createTable,以GenericInMemoryCatalog来说,其中有几个分区的Map,但实际这里并不存储分区信息,可以看到,这里创建的是空表

} else {
    tables.put(tablePath, table.copy());
    if (isPartitionedTable(tablePath)) {
        partitions.put(tablePath, new LinkedHashMap());
        partitionStats.put(tablePath, new LinkedHashMap());
        partitionColumnStats.put(tablePath, new LinkedHashMap());
    }
}

1.2 Catalog中的分区Map

  partitionStats和partitionColumnStats是放一些统计信息的,partitions目前看是单独的分区操作时会用到,如createPartition(对应SQL语句ALTER TABLE ADD PARTITION),并且这一块存储的只有Alter语句里修改的partition信息,主要还是一些描述信息,并不是主要用于记录分区,信息来源在SqlToOperationConverter.convertAlterTable当中

for (int i = 0; i  

1.3 AbstractCatalogTable

  真正有用的信息是在table表的信息当中,核心在tables.put(tablePath, table.copy());这一句当中,table.copy()存储了表信息,最终调用到实现类CatalogTableImpl,其父类的构造函数有分区信息。表中存储了相应的分区信息,SQL最终操作的都是表,所以都是从这取的分区信息,注意这是一个StringList

public AbstractCatalogTable(
        TableSchema tableSchema,
        List partitionKeys,
        Map options,
        String comment) {
    this.tableSchema = checkNotNull(tableSchema, "tableSchema cannot be null");
    this.partitionKeys = checkNotNull(partitionKeys, "partitionKeys cannot be null");
    this.options = checkNotNull(options, "options cannot be null");

2、DescribeTable

  同样的在ableEnvironmentImpl.executeInternal,describe分支

} else if (operation instanceof DescribeTableOperation) {
    DescribeTableOperation describeTableOperation = (DescribeTableOperation) operation;
    Optional result =
            catalogManager.getTable(describeTableOperation.getSqlIdentifier());
    if (result.isPresent()) {
        return buildDescribeResult(result.get().getResolvedSchema());
    } else {
        throw new ValidationException(
                String.format(
                        "Tables or views with the identifier '%s' doesn't exist",
                        describeTableOperation.getSqlIdentifier().asSummaryString()));
    }

2.1 获取及解析表

  首先这边getTable方法,获取Table,由CatalogManager做入口,正常的表都是走的getPermanentTable

public Optional getTable(ObjectIdentifier objectIdentifier) {
    CatalogBaseTable temporaryTable = temporaryTables.get(objectIdentifier);
    if (temporaryTable != null) {
        final ResolvedCatalogBaseTable resolvedTable =
                resolveCatalogBaseTable(temporaryTable);
        return Optional.of(ContextResolvedTable.temporary(objectIdentifier, resolvedTable));
    } else {
        return getPermanentTable(objectIdentifier);
    }
}

  这里对Table进行了多层封装,最底层的还是来自GenericInMemoryCatalog当中,前面CreateTable的时候有一个tables的Map,这里就是从这个里面拿当时存储的Table类

public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException {
    checkNotNull(tablePath);
    if (!tableExists(tablePath)) {
        throw new TableNotExistException(getName(), tablePath);
    } else {
        return tables.get(tablePath).copy();
    }
}

  返回到最上层前会对这个Table进行解析封装,在CatalogManager.resolveCatalogTable进行解析,这里有一个重要的点就是对分区的校验,必须对应表的列

// Validate partition keys are included in physical columns
final List physicalColumns =
        resolvedSchema.getColumns().stream()
                .filter(Column::isPhysical)
                .map(Column::getName)
                .collect(Collectors.toList());
table.getPartitionKeys()
        .forEach(
                partitionKey -> {
                    if (!physicalColumns.contains(partitionKey)) {
                        throw new ValidationException(
                                String.format(
                                        "Invalid partition key '%s'. A partition key must "
 + "reference a physical column in the schema. "
 + "Available columns are: %s",
                                        partitionKey, physicalColumns));
                    }
                });

2.2 返回信息

  最终构建返回信息,这里有一个关注点就是接口只传入了表的Schema信息,没有传入分区信息

return buildDescribeResult(result.get().getResolvedSchema());

  Describe返回控制台是一个表结构的形式,所以这里会构建一个表的格式

private TableResultInternal buildDescribeResult(ResolvedSchema schema) {
    Object[][] rows = buildTableColumns(schema);
    return buildResult(generateTableColumnsNames(), generateTableColumnsDataTypes(), rows);
}

  buildTableColumns是把Shema信息构建成行数据,因为Describe输出的表是有固定字段的,所以这里要对应固定字段填值

  具体的列名在generateTableColumnsNames当中指定,这个也是最后返回信息里的表头

private String[] generateTableColumnsNames() {
    return new String[] {"name", "type", "null", "key", "extras", "watermark"};
}

  generateTableColumnsDataTypes设置上面几个列的字段类型

private DataType[] generateTableColumnsDataTypes() {
    return new DataType[] {
        DataTypes.STRING(),
        DataTypes.STRING(),
        DataTypes.BOOLEAN(),
        DataTypes.STRING(),
        DataTypes.STRING(),
        DataTypes.STRING()
    };
}

  最终把Shema构建的行信息插入表中就构成了返回信息,是一个TableResultImpl的类型

private TableResultInternal buildResult(String[] headers, DataType[] types, Object[][] rows) {
    ResolvedSchema schema = ResolvedSchema.physical(headers, types);
    ResultProvider provider =
            new StaticResultProvider(
                    Arrays.stream(rows).map(Row::of).collect(Collectors.toList()));
    return TableResultImpl.builder()
            .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
            .schema(ResolvedSchema.physical(headers, types))
            .resultProvider(provider)
            .setPrintStyle(
                    PrintStyle.tableauWithDataInferredColumnWidths(
                            schema,
                            provider.getRowDataStringConverter(),
                            Integer.MAX_VALUE,
                            true,
                            false))
            .build();
}

  整体的输出形态如下

±--------±-------±-----±----±-------±----------+

| name | type | null | key | extras | watermark |

±--------±-------±-----±----±-------±----------+

| user | BIGINT | TRUE | | | |

| product | STRING | TRUE | | | |

±--------±-------±-----±----±-------±----------+

3、Insert

3.1 封装SinkModifyOperation

  首先是封装SinkModifyOperation的时候,其中有表,在SqlToOperationConverter.convertSqlInsert当中,getTableOrError最终调用的跟前面describe获取表一样,从Catalog拿表并且对分区进行校验,这些步骤都不少,所以SinkModifyOperation里封装的contextResolvedTable是带分区信息的

ContextResolvedTable contextResolvedTable = catalogManager.getTableOrError(identifier);
return new SinkModifyOperation(
        contextResolvedTable,
        query,
        insert.getStaticPartitionKVs(),
        insert.isOverwrite(),
        dynamicOptions);

3.2 转TableSink

  在SQL转换流程的PlannerBase.translateToRel当中,走catalogSink分支,在getTableSink接口调用时,走到TableFactoryUtil.findAndCreateTableSink

public static  TableSink findAndCreateTableSink(TableSinkFactory.Context context) {
    try {
        return TableFactoryService.find(
                        TableSinkFactory.class, context.getTable().toProperties())
                .createTableSink(context);
    } catch (Throwable t) {
        throw new TableException("findAndCreateTableSink failed.", t);
    }
}

  这里在toProperties接口当中,会把分区传入成为一项Property

public Map toProperties() {
    DescriptorProperties descriptor = new DescriptorProperties(false);
    descriptor.putTableSchema(SCHEMA, getSchema());
    descriptor.putPartitionKeys(getPartitionKeys());
    Map properties = new HashMap(getOptions());
    descriptor.putProperties(properties);
    return descriptor.asMap();
}

  在下一步的createTableSink接口调用的时候,也会调用到toProperties,但目前这个好像只有CSV的两个实现类,所以TableSink的具体过程待研究

  getTableSink最后调用的是createDynamicTableSink,这里面封装了table,就是跟前面一样的从catalog拿的表,所以这个表是包含分区信息的

val tableSink = FactoryUtil.createDynamicTableSink(
  factory,
  objectIdentifier,
  tableToFind,
  Collections.emptyMap(),
  getTableConfig,
  getFlinkContext.getClassLoader,
  isTemporary)

3.3 分区分配

  从文件数据源追踪下去,有一个分区分配器的类PartitionComputer,在fileconnector当中,有四个实现类,分别是file和hive的

  Sink在SQL转换的时候调用translateToPlanInternal,有构建SinkRuntimeProvider的流程,在CommonExecSink.createSinkTransformation

final SinkRuntimeProvider runtimeProvider =
        tableSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded));

  在FileSystemTableSink当中,最终构建了分区计算器

private DataStreamSink createBatchSink(
        DataStream inputStream, Context sinkContext, final int parallelism) {
    FileSystemOutputFormat.Builder builder = new FileSystemOutputFormat.Builder();
    builder.setPartitionComputer(partitionComputer());
private RowDataPartitionComputer partitionComputer() {
    return new RowDataPartitionComputer(
            defaultPartName,
            DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
            DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]),
            partitionKeys.toArray(new String[0]));
}

  在这里面,会根据分区列名生成分区index用作后续使用,也就是必须跟列有对应关系,不过计算分区的时候还是用分区名的List

this.partitionIndexes =
        Arrays.stream(partitionColumns).mapToInt(columnList::indexOf).toArray();
this.partitionTypes =
        Arrays.stream(partitionIndexes)
                .mapToObj(columnTypeList::get)
                .toArray(LogicalType[]::new);
this.partitionFieldGetters =
        IntStream.range(0, partitionTypes.length)
                .mapToObj(
                        i ->
                                RowData.createFieldGetter(
                                        partitionTypes[i], partitionIndexes[i]))
                .toArray(RowData.FieldGetter[]::new);
List partitionIndexList =
        Arrays.stream(partitionIndexes).boxed().collect(Collectors.toList());

  generatePartValues会计算数据的分区,基于前面RowDataPartitionComputer初始化时基于分区构建的各种对象进行计算

public LinkedHashMap generatePartValues(RowData in) {
    LinkedHashMap partSpec = new LinkedHashMap();
    for (int i = 0; i  

  FileSystemTableSink最下层写的实现类是GroupedPartitionWriter和DynamicPartitionWriter,GroupedPartitionWriter的write如下

public void write(T in) throws Exception {
    String partition = generatePartitionPath(computer.generatePartValues(in));
    if (!partition.equals(currentPartition)) {
        if (currentFormat != null) {
            currentFormat.close();
        }
        currentFormat = context.createNewOutputFormat(manager.createPartitionDir(partition));
        currentPartition = partition;
    }
    currentFormat.writeRecord(computer.projectColumnsToWrite(in));
}

3.4 分区列不写数据

  RowDataPartitionComputer.projectColumnsToWrite计算需要写数据的列,也就是说只要这几列会写数据,核心就是去除分区列

for (int i = 0; i  

  nonPartitionIndexes在构建RowDataPartitionComputer,可以看到,就是遍历列名,然后去除分区列

this.nonPartitionIndexes =
        IntStream.range(0, columnNames.length)
                .filter(c -> !partitionIndexList.contains(c))
                .toArray();

3.5 分区目录

  在PartitionPathUtils.generatePartitionPath当中定义了分区目录的形式,以{列名=分区值的形式},因为目前key就是列名

for (Map.Entry e : partitionSpec.entrySet()) {
    if (i > 0) {
        suffixBuf.append(Path.SEPARATOR);
    }
    suffixBuf.append(escapePathName(e.getKey()));
    suffixBuf.append('=');
    suffixBuf.append(escapePathName(e.getValue()));
    i++;
}

4、Select

  参照Sink的流程,在CommonExecTableSourceScan.translateToPlanInternal有构建Source的流程

final ScanTableSource tableSource =
        tableSourceSpec.getScanTableSource(planner.getFlinkContext());
ScanTableSource.ScanRuntimeProvider provider =
        tableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);

4.1 getScanTableSource

  在getScanTableSource接口当中,由createDynamicTableSource创建TableSource

tableSource =
        FactoryUtil.createDynamicTableSource(
                factory,
                contextResolvedTable.getIdentifier(),
                contextResolvedTable.getResolvedTable(),
                loadOptionsFromCatalogTable(contextResolvedTable, flinkContext),
                flinkContext.getTableConfig(),
                flinkContext.getClassLoader(),
                contextResolvedTable.isTemporary());

  以FileSystemTableFactory为例,最终创建FileSystemTableSource,其中传入的参数就有分区信息,分区信息同样是来自Catalog的表

public DynamicTableSource createDynamicTableSource(Context context) {
    FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
    validate(helper);
    return new FileSystemTableSource(
            context.getObjectIdentifier(),
            context.getPhysicalRowDataType(),
            context.getCatalogTable().getPartitionKeys(),
            helper.getOptions(),
            discoverDecodingFormat(context, BulkReaderFormatFactory.class),
            discoverDecodingFormat(context, DeserializationFormatFactory.class));
}

4.2 getScanRuntimeProvider

  之后进行provider创建,是基于上面的FileSystemTableSource进行的调用,最终到它的getScanRuntimeProvider接口当中,内部有很多跟分区相关的操作

  首先是无分区就返回一个默认简单的类

// When this table has no partition, just return an empty source.
if (!partitionKeys.isEmpty() && getOrFetchPartitions().isEmpty()) {
    return InputFormatProvider.of(new CollectionInputFormat(new ArrayList(), null));
}

  后续又是用表的列字段对分区进行过滤

// Filter out partition columns not in producedDataType
final List partitionKeysToExtract =
        DataType.getFieldNames(producedDataType).stream()
                .filter(this.partitionKeys::contains)
                .collect(Collectors.toList());

  后续这个过滤过的分区会被传入format,format是Flink最后执行读写的类(但是这里有些传了有些没传,需要看一下差别)

  format虽然不一样,但是最终都是调用的return createSourceProvider(format);,在createSourceProvider当中有获取分区的操作(不是分区key,是分区值)

private SourceProvider createSourceProvider(BulkFormat bulkFormat) {
    final FileSource.FileSourceBuilder fileSourceBuilder =
            FileSource.forBulkFileFormat(bulkFormat, paths());

  这里的paths()就是基于remainingPartitions获取要读取的分区目录

private Path[] paths() {
    if (partitionKeys.isEmpty()) {
        return new Path[] {path};
    } else {
        return getOrFetchPartitions().stream()
                .map(FileSystemTableSource.this::toFullLinkedPartSpec)
                .map(PartitionPathUtils::generatePartitionPath)
                .map(n -> new Path(path, n))
                .toArray(Path[]::new);
    }
}

4.3 remainingPartitions

  这个是分区下推使用的一个东西,当支持分区下推时,就会把这个值设置为分区,在PartitionPushDownSpec的apply当中

  这个不是存储的分区列,而是实际的分区值

public void apply(DynamicTableSource tableSource, SourceAbilityContext context) {
    if (tableSource instanceof SupportsPartitionPushDown) {
        ((SupportsPartitionPushDown) tableSource).applyPartitions(partitions);
    } else {
        throw new TableException(
                String.format(
                        "%s does not support SupportsPartitionPushDown.",
                        tableSource.getClass().getName()));
    }
}

  这里applyPartitions就是设置remainingPartitions的

public void applyPartitions(List remainingPartitions) {
    this.remainingPartitions = remainingPartitions;
}

  还有其他的地方会进行设置,在getOrFetchPartitions当中

private List getOrFetchPartitions() {
    if (remainingPartitions == null) {
        remainingPartitions = listPartitions().get();
    }
    return remainingPartitions;
}

  这里listPartitions就是去数据目录扫描分区

public Optional listPartitions() {
    try {
        return Optional.of(
                PartitionPathUtils.searchPartSpecAndPaths(
                                path.getFileSystem(), path, partitionKeys.size())
                        .stream()
                        .map(tuple2 -> tuple2.f0)
                        .map(
                                spec -> {
                                    LinkedHashMap ret = new LinkedHashMap();
                                    spec.forEach(
                                            (k, v) ->
     ret.put(
             k,
             defaultPartName.equals(v)
                     ? null
                     : v));
                                    return ret;
                                })
                        .collect(Collectors.toList()));
    } catch (Exception e) {
        throw new TableException("Fetch partitions fail.", e);
    }
}

5、分区下推

  能力由PartitionPushDownSpec决定,规则是PushPartitionIntoTableSourceScanRule

  分区信息同样的从catalog中的table上获取

List partitionFieldNames =
        tableSourceTable
                .contextResolvedTable()
                .getResolvedTable()
                .getPartitionKeys();

  之后从过滤条件中提取分区相关的条件

// extract partition predicates
RelBuilder relBuilder = call.builder();
RexBuilder rexBuilder = relBuilder.getRexBuilder();
Tuple2 allPredicates =
        RexNodeExtractor.extractPartitionPredicateList(
                filter.getCondition(),
                FlinkRelOptUtil.getMaxCnfNodeCount(scan),
                inputFieldNames.toArray(new String[0]),
                rexBuilder,
                partitionFieldNames.toArray(new String[0]));
RexNode partitionPredicate =
        RexUtil.composeConjunction(
                rexBuilder, JavaConversions.seqAsJavaList(allPredicates._1));

  之后获取分区列类型,这里又是从分区列获取的

// build pruner
LogicalType[] partitionFieldTypes =
        partitionFieldNames.stream()
                .map(
                        name -> {
                            int index = inputFieldNames.indexOf(name);
                            if (index  

  后续会进行分区过滤,这里就会生成上面的remainingPartitions,获取首先是调用TableSource的listPartitions,如果能直接获取到就用它的值,获取不到会进行一个获取逻辑处理readPartitionFromCatalogAndPrune,这里要注意分区条件已经被转换成了RexNode形态,最终过滤还是基于catalog,目前看只有HiveCatalog有处理逻辑

return catalog.listPartitionsByFilter(tablePath, partitionFilters).stream()
        .map(CatalogPartitionSpec::getPartitionSpec)
        .collect(Collectors.toList());

  之后,会基于上面的分区过滤,构建新的TableSourceTable,替换其中的tableStats

FlinkStatistic newStatistic =
        FlinkStatistic.builder()
                .statistic(tableSourceTable.getStatistic())
                .tableStats(newTableStat)
                .build();
TableSourceTable newTableSourceTable =
        tableSourceTable.copy(
                dynamicTableSource,
                newStatistic,
                new SourceAbilitySpec[] {partitionPushDownSpec});
LogicalTableScan newScan =
        LogicalTableScan.create(scan.getCluster(), newTableSourceTable, scan.getHints());
VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]