/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.bucket;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
import org.apache.hudi.client.model.HoodieFlinkInternalRow;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.hash.BucketIndexUtil;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.index.bucket.BucketIdentifier;
import org.apache.hudi.sink.StreamWriteFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BucketStreamWriteFunction
extends StreamWriteFunction {
    private static final Logger LOG = LoggerFactory.getLogger(BucketStreamWriteFunction.class);
    private int parallelism;
    private int bucketNum;
    private String indexKeyFields;
    private boolean isNonBlockingConcurrencyControl;
    private Map<String, Map<Integer, String>> bucketIndex;
    private Set<String> incBucketIndex;
    private Functions.Function2<String, Integer, Integer> partitionIndexFunc;
    private boolean isInsertOverwrite;

    public BucketStreamWriteFunction(Configuration config, RowType rowType) {
        super(config, rowType);
    }

    @Override
    public void open(Configuration parameters) throws IOException {
        super.open(parameters);
        this.bucketNum = this.config.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
        this.indexKeyFields = OptionsResolver.getIndexKeyField(this.config);
        this.isNonBlockingConcurrencyControl = OptionsResolver.isNonBlockingConcurrencyControl(this.config);
        this.taskID = this.getRuntimeContext().getIndexOfThisSubtask();
        this.parallelism = this.getRuntimeContext().getNumberOfParallelSubtasks();
        this.bucketIndex = new HashMap<String, Map<Integer, String>>();
        this.incBucketIndex = new HashSet<String>();
        this.partitionIndexFunc = BucketIndexUtil.getPartitionIndexFunc((int)this.bucketNum, (int)this.parallelism);
        this.isInsertOverwrite = OptionsResolver.isInsertOverwrite(this.config);
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        super.initializeState(context);
    }

    @Override
    public void snapshotState() {
        super.snapshotState();
        this.incBucketIndex.clear();
    }

    @Override
    public void processElement(HoodieFlinkInternalRow record, ProcessFunction.Context context, Collector<Object> collector) throws Exception {
        this.defineRecordLocation(record);
        this.bufferRecord(record);
    }

    private void defineRecordLocation(HoodieFlinkInternalRow record) {
        String partition = record.getPartitionPath();
        if (!this.isInsertOverwrite) {
            this.bootstrapIndexIfNeed(partition);
        }
        Map bucketToFileId = this.bucketIndex.computeIfAbsent(partition, p -> new HashMap());
        int bucketNum = BucketIdentifier.getBucketId((String)record.getRecordKey(), (String)this.indexKeyFields, (int)this.bucketNum);
        String bucketId = partition + "/" + bucketNum;
        if (this.incBucketIndex.contains(bucketId)) {
            record.setInstantTime("I");
            record.setFileId((String)bucketToFileId.get(bucketNum));
        } else if (bucketToFileId.containsKey(bucketNum)) {
            record.setInstantTime("U");
            record.setFileId((String)bucketToFileId.get(bucketNum));
        } else {
            String newFileId = this.isNonBlockingConcurrencyControl ? BucketIdentifier.newBucketFileIdForNBCC((int)bucketNum) : BucketIdentifier.newBucketFileIdPrefix((int)bucketNum);
            record.setInstantTime("I");
            record.setFileId(newFileId);
            bucketToFileId.put(bucketNum, newFileId);
            this.incBucketIndex.add(bucketId);
        }
    }

    public boolean isBucketToLoad(int bucketNumber, String partition) {
        return (Integer)this.partitionIndexFunc.apply((Object)partition, (Object)bucketNumber) == this.taskID;
    }

    private void bootstrapIndexIfNeed(String partition) {
        if (this.bucketIndex.containsKey(partition)) {
            return;
        }
        LOG.info("Loading Hoodie Table {}, with path {}/{}", new Object[]{this.metaClient.getTableConfig().getTableName(), this.metaClient.getBasePath(), partition});
        HashMap bucketToFileIDMap = new HashMap();
        this.writeClient.getHoodieTable().getHoodieView().getLatestFileSlices(partition).forEach(fileSlice -> {
            String fileId = fileSlice.getFileId();
            int bucketNumber = BucketIdentifier.bucketIdFromFileId((String)fileId);
            if (this.isBucketToLoad(bucketNumber, partition)) {
                LOG.info(String.format("Should load this partition bucket %s with fileId %s", bucketNumber, fileId));
                if (bucketToFileIDMap.containsKey(bucketNumber)) {
                    throw new RuntimeException(String.format("Duplicate fileId %s from bucket %s of partition %s found during the BucketStreamWriteFunction index bootstrap.", fileId, bucketNumber, partition));
                }
                LOG.info(String.format("Adding fileId %s to the bucket %s of partition %s.", fileId, bucketNumber, partition));
                bucketToFileIDMap.put(bucketNumber, fileId);
            }
        });
        this.bucketIndex.put(partition, bucketToFileIDMap);
    }
}

