定制Flink输出的parquet文件名

问题描述

使用Flink将kafka等数据源的数据流,经过处理后输出到文件,我们一般是这样写代码的:

Schema avroSchema = ...;
StreamingFileSink<GenericRecord> sink = StreamingFileSink
        .forBulkFormat(new Path("my/base/path"), ParquetAvroWriters.forGenericRecord(avroSchema))
        .withBucketAssigner(new MyBucketAssigner())
        .build();

实际使用时发现,生成的parquet文件名称是"part-1-2"这种格式的,且没有扩展名。由于我们的应用对parquet文件名有一定的规范且文件名里包含一些实用信息(例如文件里的记录条数),所以这样是不能满足我们要求的。

然而flink里这个文件名的规则是写死在Bucket.java里的无法修改,只能寻找变通的方法来解决。

解决方法

StreamingFileSink.forBulkFormat()的第二个参数是一个Factory,用于创建BulkWriter,我们可以从这里入手,注入自定义的BulkWriter,在写入文件的时候修改parquet文件名。

以下是若干个相关类,经实测通过。这个方案最大的问题是需要通过反射获取targetFile文件名,所以有可能在未来的flink版本里失效。

StreamingFileSink:

Schema avroSchema = ...;
StreamingFileSink<GenericRecord> sink = StreamingFileSink
        .forBulkFormat(new Path("my/base/path"), new MyParquetWriterFactory(new MyParquetBuilder()))
        .withBucketAssigner(new MyBucketAssigner())
        .build();

MyParquetWriterFactory:

static class MyParquetWriterFactory extends ParquetWriterFactory {
    public MyParquetWriterFactory(ParquetBuilder writerBuilder) {
        super(writerBuilder);
    }

    @Override
    public BulkWriter create(FSDataOutputStream stream) throws IOException {
        BulkWriter writer = super.create(stream);
        return new MyParquetBulkWriter(writer, stream);
    }
}

MyParquetBulkWriter:

/**
 * 包装flink的ParquetBulkWriter,修改part文件名格式
 */
static class MyParquetBulkWriter implements BulkWriter<GenericRecord> {
    private FSDataOutputStream stream;
    private BulkWriter writer;
    private int rowCount;

    public MyParquetBulkWriter(BulkWriter writer, FSDataOutputStream stream) {
        this.writer = writer;
        this.stream = stream;
    }

    @Override
    public void addElement(GenericRecord element) throws IOException {
        writer.addElement(element);
        rowCount++; //记录计数,结果将作为文件名的一部分
    }

    @Override
    public void flush() throws IOException {
        writer.flush();
    }

    @Override
    public void finish() throws IOException {
        // 试图在finish()后改名失败,因为finish()后正式文件并没有生成
        // 通过反射直接修改stream里的targetFile名称可行
        // 这里是修改part文件名的关键部分
        try {
            Field field = stream.getClass().getDeclaredField("targetFile");
            field.setAccessible(true);
            File targetFile = (File) field.get(stream);
            File renamedTargetFile = new File(targetFile.getParent(), rowCount + "_" + System.currentTimeMillis() + ".parquet");
            field.set(stream, renamedTargetFile);
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } finally {
            writer.finish();
        }
    }
}

MyParquetBuilder(其中avroSchema是在外部赋值的):

static class MyParquetBuilder implements ParquetBuilder {
    @Override
    public ParquetWriter createWriter(OutputFile outputFile) throws IOException {
        return 
        ParquetWriter.builder(outputFile).withSchema(avroSchema).withDataModel(GenericData.get()).build();
    }
}

参考资料

Flink streaming - Change part file names when using StreamingFileSink?

大数据系统的若干瓶颈

一、Zookeeper里watch的上限

虽然zk本身没有对watch数量设置上限,但在实际场景里,由于watch数量过多导致系统资源被耗尽的情况偶有发生。

以一个实际场景为例,这个场景里有30000个设备,在zk里每个设备对应一个znode,然后有storm的topology对每个znode加上3个watch(通过curator,一个spout两个bolt),这个topology的并发是200。

计算一下总的watch数量就是30000x200x3=1800万个,按照ZOOKEEPER-1177的描述,平均每个watch占用100字节左右内存,1800万个watch大约占用1.8GB内存。这时必须相应调高zk能够使用的内存数量。

在命令行使用 “echo wchs | nc zkaddress port” 命令可以查看当前zk里watch数量,相当于先telnet到zk再发送zk提供的四字命令(完整命令列表):

zookeeper_watches

二、Impala分区数量的上限

根据Cloudera的建议(见:Impala maximum number of partitions),一个impala表最多使用10万个分区(partition),最好不超过1万个分区。

在实际场景里,假设我们想按“设备ID”和“天”对设备数据进行分类,那么当有20000台设备时,每年所需要的分区数量是20000x365=7300000个,已经大大超出了impala的限制,这时就要考虑调整分区粒度,比如从时间维度调整为每个月分一个区,从设备维度调整为将若干个设备分为一组再以组为单位分区。但无论如何,这些调整通常对业务应用是有代价,需要衡量是否能够接受。

要统计一个表有多少个分区,可以使用explain语句:

impala_partitions

要查看详细分区信息,使用show partitions mytable语句:

impala_show_partitions

三、Impala文件数量限制

(注:经过核实,此问题在通过JDBC且SYNC_DDL时出现,impala-shell里REFRESH通常不会超时)

直接修改HDFS上的文件后,需要使用Impala的REFRESH命令更新Impala元数据。当文件数量过多(例如200万个),REFRESH命令会超时。

解决方法:按Partition依次执行REFRESH命令,只要每个Partition的文件数量不多,就可以实现更新整个表的元数据。

REFRESH [db_name.]table_name [PARTITION (key_col1=val1 [, key_col2=val2...])]

四、HDFS文件数量限制

HDFS最著名的限制是namenode单点失效问题。

根据cloudera博客文章The Small Files Problem的解释,每个文件、每个目录以及每个块(block)会在namenode节点占用150字节内存,假设有一千万个文件,每个文件一个块,则总共占用20000000*150=3GB物理内存。

仍然以设备数据为例,假设我们想把每个设备的数据按天保存到hdfs文件,那么20000个设备每年产生730万个文件,三年2200万个文件,是硬件资源可以承受的数量级。但如果业务要求对设备的不同类别数据分文件存放,例如设备的高频数据与低频数据,则文件总数量还要乘以数据类型的个数,这时就必须考虑namenode物理内存是否够用。

Hortonworks根据文件数量推荐的namenode内存配置表如下(来源),可以看出1000万文件建议配置5.4GB,比前面估计的值(3GB)稍高,这应该是考虑到namenode服务器额外开销和一定冗余度的数值:

hortonworks_namenode_heapsize

使用“hadoop fs -count /”命令可以统计当前hdfs上已有文件数量,不过这个命令无法看到块的数量。

hadoop_file_count2

五、Parquet文件列数限制

Parquet是基于列的存储格式,最大优势是从文件中抽取小部分列时效率很高,同时Impala、Hive和Spark等大数据查询/分析引擎都支持它,所以不少大数据系统底层都是用parquet做数据存储。

由于parquet不支持对已有文件的修改,因此在设计系统时就要考虑文件里包含哪些列,就像为数据库设计表结构一样。值得关注的问题是,一个parquet文件里能包含的列数目是有限的,至于上限值是多少与多个因素有关,很难给出一个确切的数字。我查到的一些建议是尽量不要超过1000个列。

例如在PARQUET-222的讨论里,一个例子是假设一个parquet文件有2.6万个整数类型的列,因为每个writer至少需要64KB x 4的内存,写入这样一个parquet文件至少需要6.34GB内存,很容易超出jvm限制。

(三个月前遇到这个问题,当时没有及时记录,现在找资料又花了好几个小时,这次赶紧记下)

参考 PARQUET-222 和 PARQUET-394