/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.partitioner;

import java.util.Objects;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.client.model.HoodieFlinkInternalRow;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.sink.partitioner.BucketAssigner;
import org.apache.hudi.sink.partitioner.BucketAssigners;
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.util.FlinkWriteClients;

public class BucketAssignFunction
extends KeyedProcessFunction<String, HoodieFlinkInternalRow, HoodieFlinkInternalRow>
implements CheckpointedFunction,
CheckpointListener {
    private ValueState<HoodieRecordGlobalLocation> indexState;
    private BucketAssigner bucketAssigner;
    private final Configuration conf;
    private final boolean isChangingRecords;
    private final boolean globalIndex;

    public BucketAssignFunction(Configuration conf) {
        this.conf = conf;
        this.isChangingRecords = WriteOperationType.isChangingRecords(WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)));
        this.globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED) && !conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED);
    }

    public void open(Configuration parameters2) throws Exception {
        super.open(parameters2);
        HoodieWriteConfig writeConfig = FlinkWriteClients.getHoodieClientConfig(this.conf, true);
        HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(HadoopFSUtils.getStorageConfWithCopy(HadoopConfigurations.getHadoopConf(this.conf)), new FlinkTaskContextSupplier(this.getRuntimeContext()));
        this.bucketAssigner = BucketAssigners.create(this.getRuntimeContext().getIndexOfThisSubtask(), this.getRuntimeContext().getMaxNumberOfParallelSubtasks(), this.getRuntimeContext().getNumberOfParallelSubtasks(), OptionsResolver.isInsertOverwrite(this.conf), HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)), context, writeConfig);
    }

    public void snapshotState(FunctionSnapshotContext context) {
        this.bucketAssigner.reset();
    }

    public void initializeState(FunctionInitializationContext context) {
        ValueStateDescriptor<HoodieRecordGlobalLocation> indexStateDesc = new ValueStateDescriptor<HoodieRecordGlobalLocation>("indexState", TypeInformation.of(HoodieRecordGlobalLocation.class));
        double ttl = this.conf.getDouble(FlinkOptions.INDEX_STATE_TTL) * 24.0 * 60.0 * 60.0 * 1000.0;
        if (ttl > 0.0) {
            indexStateDesc.enableTimeToLive(StateTtlConfig.newBuilder(Time.milliseconds((long)ttl)).build());
        }
        this.indexState = context.getKeyedStateStore().getState(indexStateDesc);
    }

    public void processElement(HoodieFlinkInternalRow value, KeyedProcessFunction.Context ctx, Collector<HoodieFlinkInternalRow> out) throws Exception {
        if (value.isIndexRecord()) {
            this.indexState.update((Object)new HoodieRecordGlobalLocation(value.getPartitionPath(), value.getInstantTime(), value.getFileId()));
        } else {
            this.processRecord(value, out);
        }
    }

    private void processRecord(HoodieFlinkInternalRow record, Collector<HoodieFlinkInternalRow> out) throws Exception {
        HoodieRecordLocation location;
        String partitionPath = record.getPartitionPath();
        if (this.isChangingRecords) {
            HoodieRecordGlobalLocation oldLoc = (HoodieRecordGlobalLocation)this.indexState.value();
            if (oldLoc != null) {
                String partitionFromState = oldLoc.getPartitionPath();
                String fileIdFromState = oldLoc.getFileId();
                if (!Objects.equals(partitionFromState, partitionPath)) {
                    if (this.globalIndex) {
                        RowData row = record.getRowData();
                        row.setRowKind(RowKind.DELETE);
                        HoodieFlinkInternalRow deleteRecord = new HoodieFlinkInternalRow(record.getRecordKey(), partitionFromState, fileIdFromState, "U", "D", false, row);
                        out.collect(deleteRecord);
                    }
                    location = this.getNewRecordLocation(partitionPath);
                } else {
                    location = oldLoc.toLocal("U");
                    this.bucketAssigner.addUpdate(partitionPath, location.getFileId());
                }
            } else {
                location = this.getNewRecordLocation(partitionPath);
            }
            this.indexState.update((Object)HoodieRecordGlobalLocation.fromLocal(partitionPath, location));
        } else {
            location = this.getNewRecordLocation(partitionPath);
        }
        record.setFileId(location.getFileId());
        record.setInstantTime(location.getInstantTime());
        out.collect(record);
    }

    protected HoodieRecordLocation getNewRecordLocation(String partitionPath) {
        HoodieRecordLocation location;
        BucketInfo bucketInfo = this.bucketAssigner.addInsert(partitionPath);
        switch (bucketInfo.getBucketType()) {
            case INSERT: {
                location = new HoodieRecordLocation("I", bucketInfo.getFileIdPrefix());
                break;
            }
            case UPDATE: {
                location = new HoodieRecordLocation("U", bucketInfo.getFileIdPrefix());
                break;
            }
            default: {
                throw new AssertionError();
            }
        }
        return location;
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) {
        this.bucketAssigner.reload(checkpointId);
    }

    public void close() throws Exception {
        this.bucketAssigner.close();
    }
}

