/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.source.stats;

import io.hops.hudi.org.apache.avro.generic.GenericRecord;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.collection.Tuple3;
import org.apache.hudi.common.util.hash.ColumnIndexID;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.source.prune.ColumnStatsProbe;
import org.apache.hudi.source.stats.ColumnStatsIndex;
import org.apache.hudi.source.stats.ColumnStatsSchemas;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.apache.hudi.util.AvroToRowDataConverters;
import org.apache.hudi.util.DataTypeUtils;
import org.apache.hudi.util.FlinkClientUtil;
import org.apache.hudi.util.RowDataProjection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileStatsIndex
implements ColumnStatsIndex {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(FileStatsIndex.class);
    private final RowType rowType;
    private final String basePath;
    private final HoodieMetadataConfig metadataConfig;
    private HoodieTableMetadata metadataTable;

    public FileStatsIndex(String basePath, RowType rowType, HoodieMetadataConfig metadataConfig) {
        this.basePath = basePath;
        this.rowType = rowType;
        this.metadataConfig = metadataConfig;
    }

    @Override
    public String getIndexPartitionName() {
        return "column_stats";
    }

    public HoodieTableMetadata getMetadataTable() {
        if (this.metadataTable == null) {
            this.metadataTable = HoodieTableMetadata.create(HoodieFlinkEngineContext.DEFAULT, new HoodieHadoopStorage(this.basePath, FlinkClientUtil.getHadoopConf()), this.metadataConfig, this.basePath);
        }
        return this.metadataTable;
    }

    @Override
    public Set<String> computeCandidateFiles(ColumnStatsProbe probe, List<String> allFiles) {
        if (probe == null) {
            return null;
        }
        try {
            String[] targetColumns = probe.getReferencedCols();
            List<RowData> statsRows = this.readColumnStatsIndexByColumns(targetColumns);
            return this.candidatesInMetadataTable(probe, statsRows, allFiles);
        }
        catch (Throwable t) {
            LOG.warn("Read {} for data skipping error", (Object)this.getIndexPartitionName(), (Object)t);
            return null;
        }
    }

    @Override
    public Set<String> computeCandidatePartitions(ColumnStatsProbe probe, List<String> allPartitions) {
        throw new UnsupportedOperationException("This method is not supported by " + this.getClass().getSimpleName());
    }

    protected Set<String> candidatesInMetadataTable(@Nullable ColumnStatsProbe probe, List<RowData> indexRows, List<String> oriCandidates) {
        if (probe == null) {
            return null;
        }
        String[] referencedCols = probe.getReferencedCols();
        Pair<List<RowData>, String[]> colStatsTable = this.transposeColumnStatsIndex(indexRows, referencedCols);
        List<RowData> transposedColStats = colStatsTable.getLeft();
        String[] queryCols = colStatsTable.getRight();
        if (queryCols.length == 0) {
            return null;
        }
        RowType.RowField[] queryFields = DataTypeUtils.projectRowFields(this.rowType, queryCols);
        Set allIndexedFiles = ((Stream)transposedColStats.stream().parallel()).map(row -> row.getString(0).toString()).collect(Collectors.toSet());
        Set<String> candidateFiles = ((Stream)transposedColStats.stream().parallel()).filter(row -> probe.test((RowData)row, queryFields)).map(row -> row.getString(0).toString()).collect(Collectors.toSet());
        oriCandidates.removeAll(allIndexedFiles);
        candidateFiles.addAll(oriCandidates);
        return candidateFiles;
    }

    private static List<RowData> projectNestedColStatsColumns(List<RowData> rows) {
        int pos = HoodieMetadataRecord.SCHEMA$.getField("ColumnStatsMetadata").pos();
        RowDataProjection projection = RowDataProjection.instanceV2((RowType)ColumnStatsSchemas.COL_STATS_DATA_TYPE.getLogicalType(), ColumnStatsSchemas.COL_STATS_TARGET_POS);
        return ((Stream)rows.stream().parallel()).map(row -> {
            RowData columnStatsField = row.getRow(pos, 9);
            return projection.project(columnStatsField);
        }).collect(Collectors.toList());
    }

    @VisibleForTesting
    public Pair<List<RowData>, String[]> transposeColumnStatsIndex(List<RowData> colStats, String[] queryColumns) {
        Map<String, LogicalType> tableFieldTypeMap = this.rowType.getFields().stream().collect(Collectors.toMap(RowType.RowField::getName, RowType.RowField::getType));
        Set indexedColumns = colStats.stream().map(row -> row.getString(5).toString()).collect(Collectors.toSet());
        TreeSet sortedTargetColumns = Arrays.stream(queryColumns).sorted().filter(indexedColumns::contains).collect(Collectors.toCollection(TreeSet::new));
        ConcurrentHashMap converters2 = new ConcurrentHashMap();
        Map<StringData, List<RowData>> fileNameToRows = ((Stream)colStats.stream().parallel()).filter(row -> sortedTargetColumns.contains(row.getString(5).toString())).map(row -> {
            if (row.isNullAt(1) && row.isNullAt(2)) {
                return row;
            }
            String colName = row.getString(5).toString();
            LogicalType colType = (LogicalType)tableFieldTypeMap.get(colName);
            return FileStatsIndex.unpackMinMaxVal(row, colType, converters2);
        }).collect(Collectors.groupingBy(rowData -> rowData.getString(0)));
        return Pair.of(FileStatsIndex.foldRowsByFiles(sortedTargetColumns, fileNameToRows), sortedTargetColumns.toArray(new String[0]));
    }

    private static List<RowData> foldRowsByFiles(TreeSet<String> sortedTargetColumns, Map<StringData, List<RowData>> fileNameToRows) {
        return ((Stream)fileNameToRows.values().stream().parallel()).map(rows -> {
            StringData fileName = ((RowData)rows.get(0)).getString(0);
            long valueCount = ((RowData)rows.get(0)).getLong(4);
            Map<String, RowData> columnRowsMap = rows.stream().collect(Collectors.toMap(row -> row.getString(5).toString(), row -> row));
            TreeMap alignedColumnRowsMap = new TreeMap();
            sortedTargetColumns.forEach(col -> {
                RowData cfr_ignored_0 = (RowData)alignedColumnRowsMap.put(col, columnRowsMap.get(col));
            });
            List columnStats = alignedColumnRowsMap.values().stream().map(row -> {
                if (row == null) {
                    return Tuple3.of(null, null, valueCount);
                }
                GenericRowData gr = (GenericRowData)row;
                return Tuple3.of(gr.getField(1), gr.getField(2), gr.getField(3));
            }).collect(Collectors.toList());
            GenericRowData foldedRow = new GenericRowData(2 + 3 * columnStats.size());
            foldedRow.setField(0, (Object)fileName);
            foldedRow.setField(1, (Object)valueCount);
            for (int i = 0; i < columnStats.size(); ++i) {
                Tuple3 stats = (Tuple3)columnStats.get(i);
                int startPos = 2 + 3 * i;
                foldedRow.setField(startPos, stats.f0);
                foldedRow.setField(startPos + 1, stats.f1);
                foldedRow.setField(startPos + 2, stats.f2);
            }
            return foldedRow;
        }).collect(Collectors.toList());
    }

    private static RowData unpackMinMaxVal(RowData row, LogicalType colType, Map<LogicalType, AvroToRowDataConverters.AvroToRowDataConverter> converters2) {
        RowData minValueStruct = row.getRow(1, 1);
        RowData maxValueStruct = row.getRow(2, 1);
        ValidationUtils.checkState(minValueStruct != null && maxValueStruct != null, "Invalid Column Stats record: either both min/max have to be null, or both have to be non-null");
        Object minValue = FileStatsIndex.tryUnpackNonNullVal(minValueStruct, colType, converters2);
        Object maxValue = FileStatsIndex.tryUnpackNonNullVal(maxValueStruct, colType, converters2);
        GenericRowData unpackedRow = new GenericRowData(row.getArity());
        unpackedRow.setField(0, (Object)row.getString(0));
        unpackedRow.setField(1, minValue);
        unpackedRow.setField(2, maxValue);
        unpackedRow.setField(3, (Object)row.getLong(3));
        unpackedRow.setField(4, (Object)row.getLong(4));
        unpackedRow.setField(5, (Object)row.getString(5));
        return unpackedRow;
    }

    private static Object tryUnpackNonNullVal(RowData rowData, LogicalType colType, Map<LogicalType, AvroToRowDataConverters.AvroToRowDataConverter> converters2) {
        for (int i = 0; i < rowData.getArity(); ++i) {
            Object nested = ((GenericRowData)rowData).getField(i);
            if (nested == null) continue;
            return FileStatsIndex.doUnpack(nested, colType, converters2);
        }
        return null;
    }

    private static Object doUnpack(Object rawVal, LogicalType logicalType, Map<LogicalType, AvroToRowDataConverters.AvroToRowDataConverter> converters2) {
        AvroToRowDataConverters.AvroToRowDataConverter converter = converters2.computeIfAbsent(logicalType, k -> AvroToRowDataConverters.createConverter(logicalType, true));
        return converter.convert(rawVal);
    }

    @VisibleForTesting
    public List<RowData> readColumnStatsIndexByColumns(String[] targetColumns) {
        ValidationUtils.checkArgument(targetColumns.length > 0, "Column stats is only valid when push down filters have referenced columns");
        List<String> encodedTargetColumnNames = Arrays.stream(targetColumns).map(colName -> new ColumnIndexID((String)colName).asBase64EncodedString()).collect(Collectors.toList());
        HoodieData<HoodieRecord<HoodieMetadataPayload>> records = this.getMetadataTable().getRecordsByKeyPrefixes(encodedTargetColumnNames, this.getIndexPartitionName(), false);
        AvroToRowDataConverters.AvroToRowDataConverter converter = AvroToRowDataConverters.createRowConverter((RowType)ColumnStatsSchemas.METADATA_DATA_TYPE.getLogicalType());
        List<RowData> rows = ((Stream)records.collectAsList().stream().parallel()).map(record -> {
            GenericRecord genericRecord;
            try {
                genericRecord = ((HoodieMetadataPayload)record.getData()).getInsertValue(null, null).orElse(null);
            }
            catch (IOException e) {
                throw new HoodieException("Exception while getting insert value from metadata payload");
            }
            return (RowData)converter.convert(genericRecord);
        }).collect(Collectors.toList());
        return FileStatsIndex.projectNestedColStatsColumns(rows);
    }
}

