欢迎光临散文网 会员登陆 & 注册

基于Flink+Hudi构建企业万亿级云上实时数据湖教程

2023-07-19 18:07 作者:学课拼课  | 我要投稿

private boolean flushBucket(DataBucket bucket) {

    // 获取最近一次未提交的instant time

    String instant = instantToWrite(true);


    // 如果获取不到instant,说明没有输入数据,方法返回

    if (instant == null) {

        // in case there are empty checkpoints that has no input data

        LOG.info("No inflight instant when flushing data, skip.");

        return false;

    }


    // 获取bucket缓存中的HoodieRecord

    List<HoodieRecord> records = bucket.writeBuffer();

    // 检查buffer中必须要有数据

    ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records");

    // 根据write.insert.drop.duplicates配置项,决定insert是否去重

    if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {

        records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1);

    }

    // 修改buffer中第一条数据的location信息(instant time和fileID)

    bucket.preWrite(records);

    // 执行writeFunction,写入数据

    // writeFunction后面分析

    final List<WriteStatus> writeStatus 


基于Flink+Hudi构建企业万亿级云上实时数据湖教程的评论 (共 条)

分享到微博请遵守国家法律