package org.apache.hudi.sink.partitioner;

import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.table.action.commit.BucketType;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/sink/partitioner/BucketAssignFunction.class */
public class BucketAssignFunction<K, I, O extends HoodieRecord<?>> extends KeyedProcessFunction<K, I, O> implements CheckpointedFunction, CheckpointListener {
    private static final Logger LOG = LoggerFactory.getLogger(BucketAssignFunction.class);
    private HoodieFlinkEngineContext context;
    private MapState<HoodieKey, HoodieRecordLocation> indexState;
    private BucketAssigner bucketAssigner;
    private final Configuration conf;
    private transient org.apache.hadoop.conf.Configuration hadoopConf;
    private final boolean isChangingRecords;
    private transient Set<String> initialPartitionsToLoad;
    private MapState<String, Integer> partitionLoadState;
    private boolean allPartitionsLoaded = false;

    /* renamed from: org.apache.hudi.sink.partitioner.BucketAssignFunction$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/hudi/sink/partitioner/BucketAssignFunction$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$hudi$table$action$commit$BucketType = new int[BucketType.values().length];

        static {
            try {
                $SwitchMap$org$apache$hudi$table$action$commit$BucketType[BucketType.INSERT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$hudi$table$action$commit$BucketType[BucketType.UPDATE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public BucketAssignFunction(Configuration configuration) {
        this.conf = configuration;
        this.isChangingRecords = WriteOperationType.isChangingRecords(WriteOperationType.fromValue(configuration.getString(FlinkOptions.OPERATION)));
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        HoodieWriteConfig hoodieClientConfig = StreamerUtil.getHoodieClientConfig(this.conf);
        this.hadoopConf = StreamerUtil.getHadoopConf();
        this.context = new HoodieFlinkEngineContext(new SerializableConfiguration(this.hadoopConf), new FlinkTaskContextSupplier(getRuntimeContext()));
        this.bucketAssigner = BucketAssigners.create(HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)), this.context, hoodieClientConfig);
        loadInitialPartitions();
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
        this.bucketAssigner.reset();
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) {
        this.indexState = functionInitializationContext.getKeyedStateStore().getMapState(new MapStateDescriptor("indexState", TypeInformation.of(HoodieKey.class), TypeInformation.of(HoodieRecordLocation.class)));
        this.partitionLoadState = functionInitializationContext.getKeyedStateStore().getMapState(new MapStateDescriptor("partitionLoadState", Types.STRING, Types.INT));
    }

    public void processElement(I i, KeyedProcessFunction<K, I, O>.Context context, Collector<O> collector) throws Exception {
        HoodieRecordLocation hoodieRecordLocation;
        HoodieRecord hoodieRecord = (HoodieRecord) i;
        HoodieKey key = hoodieRecord.getKey();
        if (!this.allPartitionsLoaded && this.initialPartitionsToLoad.contains(key.getPartitionPath()) && !this.partitionLoadState.contains(key.getPartitionPath())) {
            loadRecords(key.getPartitionPath());
        }
        if (this.isChangingRecords && this.indexState.contains(key)) {
            hoodieRecordLocation = new HoodieRecordLocation("U", ((HoodieRecordLocation) this.indexState.get(key)).getFileId());
            this.bucketAssigner.addUpdate(hoodieRecord.getPartitionPath(), hoodieRecordLocation.getFileId());
        } else {
            BucketInfo addInsert = this.bucketAssigner.addInsert(key.getPartitionPath());
            switch (AnonymousClass1.$SwitchMap$org$apache$hudi$table$action$commit$BucketType[addInsert.getBucketType().ordinal()]) {
                case 1:
                    hoodieRecordLocation = new HoodieRecordLocation("I", addInsert.getFileIdPrefix());
                    break;
                case 2:
                    hoodieRecordLocation = new HoodieRecordLocation("U", addInsert.getFileIdPrefix());
                    break;
                default:
                    throw new AssertionError();
            }
            this.indexState.put(key, hoodieRecordLocation);
        }
        hoodieRecord.unseal();
        hoodieRecord.setCurrentLocation(hoodieRecordLocation);
        hoodieRecord.seal();
        collector.collect(hoodieRecord);
    }

    public void notifyCheckpointComplete(long j) {
        this.bucketAssigner.refreshTable();
        checkPartitionsLoaded();
    }

    private void loadRecords(String str) throws Exception {
        for (HoodieBaseFile hoodieBaseFile : HoodieIndexUtils.getLatestBaseFilesForPartition(str, this.bucketAssigner.getTable())) {
            ParquetUtils.fetchRecordKeyPartitionPathFromParquet(this.hadoopConf, new Path(hoodieBaseFile.getPath())).forEach(hoodieKey -> {
                try {
                    this.indexState.put(hoodieKey, new HoodieRecordLocation(hoodieBaseFile.getCommitTime(), hoodieBaseFile.getFileId()));
                } catch (Exception e) {
                    throw new HoodieIOException("Error when load record keys from file: " + hoodieBaseFile);
                }
            });
        }
        this.partitionLoadState.put(str, 0);
    }

    private void loadInitialPartitions() {
        List allPartitionPaths = FSUtils.getAllPartitionPaths(this.context, this.conf.getString(FlinkOptions.PATH), false, false, false);
        int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
        int maxNumberOfParallelSubtasks = getRuntimeContext().getMaxNumberOfParallelSubtasks();
        int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        this.initialPartitionsToLoad = (Set) allPartitionPaths.stream().filter(str -> {
            return KeyGroupRangeAssignment.assignKeyToParallelOperator(str, maxNumberOfParallelSubtasks, numberOfParallelSubtasks) == indexOfThisSubtask;
        }).collect(Collectors.toSet());
    }

    private void checkPartitionsLoaded() {
        Iterator<String> it = this.initialPartitionsToLoad.iterator();
        while (it.hasNext()) {
            try {
                if (!this.partitionLoadState.contains(it.next())) {
                    return;
                }
            } catch (Exception e) {
                LOG.warn("Error when check whether all partitions are loaded, ignored", e);
                throw new HoodieException(e);
            }
        }
        this.allPartitionsLoaded = true;
    }

    @VisibleForTesting
    public boolean isAllPartitionsLoaded() {
        return this.allPartitionsLoaded;
    }

    @VisibleForTesting
    public void clearIndexState() {
        this.allPartitionsLoaded = false;
        this.indexState.clear();
        loadInitialPartitions();
    }

    @VisibleForTesting
    public boolean isKeyInState(HoodieKey hoodieKey) {
        try {
            return this.indexState.contains(hoodieKey);
        } catch (Exception e) {
            throw new HoodieException(e);
        }
    }
}
