基于Flink+Hudi构建企业万亿级云上实时数据湖教程
private boolean flushBucket(DataBucket bucket) {
// 获取最近一次未提交的instant time
String instant = instantToWrite(true);
// 如果获取不到instant,说明没有输入数据,方法返回
if (instant == null) {
// in case there are empty checkpoints that has no input data
LOG.info("No inflight instant when flushing data, skip.");
return false;
}
// 获取bucket缓存中的HoodieRecord
List<HoodieRecord> records = bucket.writeBuffer();
// 检查buffer中必须要有数据
ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records");
// 根据write.insert.drop.duplicates配置项,决定insert是否去重
if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1);
}
// 修改buffer中第一条数据的location信息(instant time和fileID)
bucket.preWrite(records);
// 执行writeFunction,写入数据
// writeFunction后面分析
final List<WriteStatus> writeStatus