/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.bootstrap;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.sink.bootstrap.IndexRecord;
import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction;
import org.apache.hudi.sink.meta.CkpMetadata;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.format.FormatUtils;
import org.apache.hudi.util.FlinkTables;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BootstrapOperator<I, O extends HoodieRecord<?>>
extends AbstractStreamOperator<O>
implements OneInputStreamOperator<I, O> {
    private static final Logger LOG = LoggerFactory.getLogger(BootstrapOperator.class);
    protected HoodieTable<?, ?, ?, ?> hoodieTable;
    private CkpMetadata ckpMetadata;
    protected final Configuration conf;
    protected transient org.apache.hadoop.conf.Configuration hadoopConf;
    protected transient HoodieWriteConfig writeConfig;
    private transient GlobalAggregateManager aggregateManager;
    private transient ListState<String> instantState;
    private final Pattern pattern;
    private String lastInstantTime;

    public BootstrapOperator(Configuration conf) {
        this.conf = conf;
        this.pattern = Pattern.compile(conf.getString(FlinkOptions.INDEX_PARTITION_REGEX));
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        this.lastInstantTime = this.ckpMetadata.lastPendingInstant();
        this.instantState.update(Collections.singletonList(this.lastInstantTime));
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        Iterator instantIterator;
        ListStateDescriptor<String> instantStateDescriptor = new ListStateDescriptor<String>("instantStateDescriptor", Types.STRING);
        this.instantState = context.getOperatorStateStore().getListState(instantStateDescriptor);
        if (context.isRestored() && (instantIterator = ((Iterable)this.instantState.get()).iterator()).hasNext()) {
            this.lastInstantTime = (String)instantIterator.next();
        }
        this.hadoopConf = HadoopConfigurations.getHadoopConf(this.conf);
        this.writeConfig = FlinkWriteClients.getHoodieClientConfig(this.conf, true);
        this.hoodieTable = FlinkTables.createTable(this.writeConfig, this.hadoopConf, (RuntimeContext)this.getRuntimeContext());
        this.ckpMetadata = CkpMetadata.getInstance(this.hoodieTable.getMetaClient().getFs(), this.writeConfig.getBasePath());
        this.aggregateManager = this.getRuntimeContext().getGlobalAggregateManager();
        this.preLoadIndexRecords();
    }

    protected void preLoadIndexRecords() throws Exception {
        String basePath = this.hoodieTable.getMetaClient().getBasePath();
        int taskID = this.getRuntimeContext().getIndexOfThisSubtask();
        LOG.info("Start loading records in table {} into the index state, taskId = {}", (Object)basePath, (Object)taskID);
        for (String partitionPath : FSUtils.getAllFoldersWithPartitionMetaFile(FSUtils.getFs(basePath, this.hadoopConf), basePath)) {
            if (!this.pattern.matcher(partitionPath).matches()) continue;
            this.loadRecords(partitionPath);
        }
        LOG.info("Finish sending index records, taskId = {}.", (Object)this.getRuntimeContext().getIndexOfThisSubtask());
        this.waitForBootstrapReady(this.getRuntimeContext().getIndexOfThisSubtask());
    }

    private void waitForBootstrapReady(int taskID) {
        int taskNum = this.getRuntimeContext().getNumberOfParallelSubtasks();
        int readyTaskNum = 1;
        while (taskNum != readyTaskNum) {
            try {
                readyTaskNum = (Integer)this.aggregateManager.updateGlobalAggregate(BootstrapAggFunction.NAME + this.conf.getString(FlinkOptions.TABLE_NAME), (Object)taskID, (AggregateFunction)new BootstrapAggFunction());
                LOG.info("Waiting for other bootstrap tasks to complete, taskId = {}.", (Object)taskID);
                TimeUnit.SECONDS.sleep(5L);
            }
            catch (Exception e) {
                LOG.warn("Update global task bootstrap summary error", (Throwable)e);
            }
        }
    }

    public void processElement(StreamRecord<I> element) throws Exception {
        this.output.collect(element);
    }

    protected void loadRecords(String partitionPath) throws Exception {
        Option<HoodieInstant> latestCommitTime;
        long start2 = System.currentTimeMillis();
        int parallelism = this.getRuntimeContext().getNumberOfParallelSubtasks();
        int maxParallelism = this.getRuntimeContext().getMaxNumberOfParallelSubtasks();
        int taskID = this.getRuntimeContext().getIndexOfThisSubtask();
        HoodieTimeline commitsTimeline = this.hoodieTable.getMetaClient().getCommitsTimeline();
        if (!StringUtils.isNullOrEmpty(this.lastInstantTime)) {
            commitsTimeline = commitsTimeline.findInstantsAfter(this.lastInstantTime);
        }
        if ((latestCommitTime = commitsTimeline.filterCompletedInstants().lastInstant()).isPresent()) {
            BaseFileUtils fileUtils = BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat());
            Schema schema = new TableSchemaResolver(this.hoodieTable.getMetaClient()).getTableAvroSchema();
            List fileSlices = this.hoodieTable.getSliceView().getLatestMergedFileSlicesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp()).collect(Collectors.toList());
            for (FileSlice fileSlice : fileSlices) {
                if (!this.shouldLoadFile(fileSlice.getFileId(), maxParallelism, parallelism, taskID)) continue;
                LOG.info("Load records from {}.", (Object)fileSlice);
                fileSlice.getBaseFile().ifPresent(baseFile -> {
                    if (!StreamerUtil.isValidFile(baseFile.getFileStatus())) {
                        return;
                    }
                    try (ClosableIterator<HoodieKey> iterator2 = fileUtils.getHoodieKeyIterator(this.hadoopConf, new Path(baseFile.getPath()));){
                        iterator2.forEachRemaining(hoodieKey -> this.output.collect((Object)new StreamRecord(new IndexRecord(BootstrapOperator.generateHoodieRecord(hoodieKey, fileSlice)))));
                    }
                });
                List<String> logPaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).filter(logFile -> StreamerUtil.isValidFile(logFile.getFileStatus())).map(logFile -> logFile.getPath().toString()).collect(Collectors.toList());
                try (HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(logPaths, schema, latestCommitTime.get().getTimestamp(), this.writeConfig, this.hadoopConf);){
                    for (String recordKey : scanner.getRecords().keySet()) {
                        this.output.collect((Object)new StreamRecord(new IndexRecord(BootstrapOperator.generateHoodieRecord(new HoodieKey(recordKey, partitionPath), fileSlice))));
                    }
                }
            }
        }
        long cost = System.currentTimeMillis() - start2;
        LOG.info("Task [{}}:{}}] finish loading the index under partition {} and sending them to downstream, time cost: {} milliseconds.", new Object[]{((Object)((Object)this)).getClass().getSimpleName(), taskID, partitionPath, cost});
    }

    public static HoodieRecord generateHoodieRecord(HoodieKey hoodieKey, FileSlice fileSlice) {
        HoodieAvroRecord<Object> hoodieRecord = new HoodieAvroRecord<Object>(hoodieKey, null);
        hoodieRecord.setCurrentLocation(new HoodieRecordGlobalLocation(hoodieKey.getPartitionPath(), fileSlice.getBaseInstantTime(), fileSlice.getFileId()));
        hoodieRecord.seal();
        return hoodieRecord;
    }

    protected boolean shouldLoadFile(String fileId, int maxParallelism, int parallelism, int taskID) {
        return KeyGroupRangeAssignment.assignKeyToParallelOperator((Object)fileId, (int)maxParallelism, (int)parallelism) == taskID;
    }

    @VisibleForTesting
    public boolean isAlreadyBootstrap() throws Exception {
        return ((Iterable)this.instantState.get()).iterator().hasNext();
    }
}

