/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark;

import java.io.IOException;
import java.io.Serializable;
import java.math.BigDecimal;
import java.sql.Date;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.zorder.ZOrderingIndexHelper;
import org.apache.hudi.optimize.HilbertCurveUtils;
import org.apache.hudi.optimize.ZOrderingUtil;
import org.apache.parquet.io.api.Binary;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.hudi.execution.RangeSampleSort$;
import org.apache.spark.sql.hudi.execution.ZorderingBinarySort;
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.BinaryType$;
import org.apache.spark.sql.types.BooleanType;
import org.apache.spark.sql.types.ByteType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.DoubleType;
import org.apache.spark.sql.types.FloatType;
import org.apache.spark.sql.types.IntegerType;
import org.apache.spark.sql.types.LongType;
import org.apache.spark.sql.types.LongType$;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.ShortType;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.types.TimestampType;
import org.apache.spark.util.SerializableConfiguration;
import org.davidmoten.hilbert.HilbertCurve;
import scala.collection.JavaConversions;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import scala.collection.mutable.Buffer;

public class OrderingIndexHelper {
    private static final String SPARK_JOB_DESCRIPTION = "spark.job.description";

    public static Dataset<Row> createOptimizedDataFrameByMapValue(Dataset<Row> df, List<String> sortCols, int fileNum, String sortMode) {
        Map<String, StructField> columnsMap = Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e -> e));
        int fieldNum = df.schema().fields().length;
        List checkCols = sortCols.stream().filter(f -> columnsMap.containsKey(f)).collect(Collectors.toList());
        if (sortCols.size() != checkCols.size()) {
            return df;
        }
        if (sortCols.size() == 1) {
            return df.repartitionByRange(fileNum, new Column[]{functions.col((String)sortCols.get(0))});
        }
        Map<Integer, StructField> fieldMap = sortCols.stream().collect(Collectors.toMap(e -> Arrays.asList(df.schema().fields()).indexOf(columnsMap.get(e)), e -> (StructField)columnsMap.get(e)));
        JavaRDD<Row> sortedRDD = null;
        switch (HoodieClusteringConfig.BuildLayoutOptimizationStrategy.fromValue(sortMode)) {
            case ZORDER: {
                sortedRDD = OrderingIndexHelper.createZCurveSortedRDD((JavaRDD<Row>)df.toJavaRDD(), fieldMap, fieldNum, fileNum);
                break;
            }
            case HILBERT: {
                sortedRDD = OrderingIndexHelper.createHilbertSortedRDD((JavaRDD<Row>)df.toJavaRDD(), fieldMap, fieldNum, fileNum);
                break;
            }
            default: {
                throw new IllegalArgumentException(String.format("new only support z-order/hilbert optimize but find: %s", sortMode));
            }
        }
        ArrayList<StructField> newFields = new ArrayList<StructField>();
        newFields.addAll(Arrays.asList(df.schema().fields()));
        newFields.add(new StructField("Index", (DataType)BinaryType$.MODULE$, true, Metadata.empty()));
        return df.sparkSession().createDataFrame(sortedRDD, StructType$.MODULE$.apply(newFields)).drop("Index");
    }

    private static JavaRDD<Row> createZCurveSortedRDD(JavaRDD<Row> originRDD, Map<Integer, StructField> fieldMap, int fieldNum, int fileNum) {
        return originRDD.map((Function & Serializable)row -> {
            List zBytesList = fieldMap.entrySet().stream().map(entry -> {
                int index = (Integer)entry.getKey();
                StructField field = (StructField)entry.getValue();
                DataType dataType = field.dataType();
                if (dataType instanceof LongType) {
                    return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index));
                }
                if (dataType instanceof DoubleType) {
                    return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? Double.MAX_VALUE : row.getDouble(index));
                }
                if (dataType instanceof IntegerType) {
                    return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Integer.MAX_VALUE : row.getInt(index));
                }
                if (dataType instanceof FloatType) {
                    return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? 3.4028234663852886E38 : (double)row.getFloat(index));
                }
                if (dataType instanceof StringType) {
                    return ZOrderingUtil.utf8To8Byte(row.isNullAt(index) ? "" : row.getString(index));
                }
                if (dataType instanceof DateType) {
                    return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime());
                }
                if (dataType instanceof TimestampType) {
                    return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime());
                }
                if (dataType instanceof ByteType) {
                    return ZOrderingUtil.byteTo8Byte(row.isNullAt(index) ? (byte)127 : row.getByte(index));
                }
                if (dataType instanceof ShortType) {
                    return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Short.MAX_VALUE : (int)row.getShort(index));
                }
                if (dataType instanceof DecimalType) {
                    return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue());
                }
                if (dataType instanceof BooleanType) {
                    boolean value = row.isNullAt(index) ? false : row.getBoolean(index);
                    return ZOrderingUtil.intTo8Byte(value ? 1 : 0);
                }
                if (dataType instanceof BinaryType) {
                    byte[] byArray;
                    if (row.isNullAt(index)) {
                        byte[] byArray2 = new byte[1];
                        byArray = byArray2;
                        byArray2[0] = 0;
                    } else {
                        byArray = (byte[])row.get(index);
                    }
                    return ZOrderingUtil.paddingTo8Byte(byArray);
                }
                return null;
            }).filter(f -> f != null).collect(Collectors.toList());
            byte[][] zBytes = new byte[zBytesList.size()][];
            for (int i = 0; i < zBytesList.size(); ++i) {
                zBytes[i] = (byte[])zBytesList.get(i);
            }
            ArrayList<byte[]> zVaules = new ArrayList<byte[]>();
            zVaules.addAll((Collection)JavaConverters.bufferAsJavaListConverter((Buffer)row.toSeq().toBuffer()).asJava());
            zVaules.add(ZOrderingUtil.interleaving(zBytes, 8));
            return Row$.MODULE$.apply((Seq)JavaConversions.asScalaBuffer(zVaules));
        }).sortBy((Function & Serializable)f -> new ZorderingBinarySort((byte[])f.get(fieldNum)), true, fileNum);
    }

    private static JavaRDD<Row> createHilbertSortedRDD(JavaRDD<Row> originRDD, final Map<Integer, StructField> fieldMap, int fieldNum, int fileNum) {
        return originRDD.mapPartitions((FlatMapFunction & Serializable)rows -> {
            final HilbertCurve hilbertCurve = HilbertCurve.bits(63).dimensions(fieldMap.size());
            return new Iterator<Row>(){

                @Override
                public boolean hasNext() {
                    return rows.hasNext();
                }

                @Override
                public Row next() {
                    Row row = (Row)rows.next();
                    List longList = fieldMap.entrySet().stream().map(entry -> {
                        int index = (Integer)entry.getKey();
                        StructField field = (StructField)entry.getValue();
                        DataType dataType = field.dataType();
                        if (dataType instanceof LongType) {
                            return row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index);
                        }
                        if (dataType instanceof DoubleType) {
                            return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits(row.getDouble(index));
                        }
                        if (dataType instanceof IntegerType) {
                            return row.isNullAt(index) ? Long.MAX_VALUE : (long)row.getInt(index);
                        }
                        if (dataType instanceof FloatType) {
                            return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits(row.getFloat(index));
                        }
                        if (dataType instanceof StringType) {
                            return row.isNullAt(index) ? Long.MAX_VALUE : ZOrderingUtil.convertStringToLong(row.getString(index));
                        }
                        if (dataType instanceof DateType) {
                            return row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime();
                        }
                        if (dataType instanceof TimestampType) {
                            return row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime();
                        }
                        if (dataType instanceof ByteType) {
                            return row.isNullAt(index) ? Long.MAX_VALUE : ZOrderingUtil.convertBytesToLong(new byte[]{row.getByte(index)});
                        }
                        if (dataType instanceof ShortType) {
                            return row.isNullAt(index) ? Long.MAX_VALUE : (long)row.getShort(index);
                        }
                        if (dataType instanceof DecimalType) {
                            return row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue();
                        }
                        if (dataType instanceof BooleanType) {
                            boolean value = row.isNullAt(index) ? false : row.getBoolean(index);
                            return value ? Long.MAX_VALUE : 0L;
                        }
                        if (dataType instanceof BinaryType) {
                            return row.isNullAt(index) ? Long.MAX_VALUE : ZOrderingUtil.convertBytesToLong((byte[])row.get(index));
                        }
                        return null;
                    }).filter(f -> f != null).collect(Collectors.toList());
                    byte[] hilbertValue = HilbertCurveUtils.indexBytes(hilbertCurve, longList.stream().mapToLong(l -> l).toArray(), 63);
                    ArrayList<byte[]> values2 = new ArrayList<byte[]>();
                    values2.addAll((Collection)JavaConverters.bufferAsJavaListConverter((Buffer)row.toSeq().toBuffer()).asJava());
                    values2.add(hilbertValue);
                    return Row$.MODULE$.apply((Seq)JavaConversions.asScalaBuffer(values2));
                }
            };
        }).sortBy((Function & Serializable)f -> new ZorderingBinarySort((byte[])f.get(fieldNum)), true, fileNum);
    }

    public static Dataset<Row> createOptimizedDataFrameByMapValue(Dataset<Row> df, String sortCols, int fileNum, String sortMode) {
        if (sortCols == null || sortCols.isEmpty() || fileNum <= 0) {
            return df;
        }
        return OrderingIndexHelper.createOptimizedDataFrameByMapValue(df, Arrays.stream(sortCols.split(",")).map(f -> f.trim()).collect(Collectors.toList()), fileNum, sortMode);
    }

    public static Dataset<Row> createOptimizeDataFrameBySample(Dataset<Row> df, List<String> zCols, int fileNum, String sortMode) {
        return RangeSampleSort$.MODULE$.sortDataFrameBySample(df, (Seq<String>)JavaConversions.asScalaBuffer(zCols), fileNum, sortMode);
    }

    public static Dataset<Row> createOptimizeDataFrameBySample(Dataset<Row> df, String zCols, int fileNum, String sortMode) {
        if (zCols == null || zCols.isEmpty() || fileNum <= 0) {
            return df;
        }
        return OrderingIndexHelper.createOptimizeDataFrameBySample(df, Arrays.stream(zCols.split(",")).map(f -> f.trim()).collect(Collectors.toList()), fileNum, sortMode);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static Dataset<Row> getMinMaxValue(Dataset<Row> df, List<String> cols) {
        List colMinMaxInfos;
        Map<String, DataType> columnsMap = Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e -> e.dataType()));
        List<String> scanFiles = Arrays.asList(df.inputFiles());
        SparkContext sc = df.sparkSession().sparkContext();
        JavaSparkContext jsc = new JavaSparkContext(sc);
        SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sc.hadoopConfiguration());
        int numParallelism = scanFiles.size() / 3 + 1;
        String previousJobDescription = sc.getLocalProperty(SPARK_JOB_DESCRIPTION);
        try {
            jsc.setJobDescription("Listing parquet column statistics");
            colMinMaxInfos = jsc.parallelize(scanFiles, numParallelism).mapPartitions((FlatMapFunction & Serializable)paths -> {
                Configuration conf = serializableConfiguration.value();
                ParquetUtils parquetUtils = (ParquetUtils)BaseFileUtils.getInstance(HoodieFileFormat.PARQUET);
                ArrayList<List<HoodieColumnRangeMetadata<Comparable>>> results = new ArrayList<List<HoodieColumnRangeMetadata<Comparable>>>();
                while (paths.hasNext()) {
                    String path = (String)paths.next();
                    results.add(parquetUtils.readRangeFromParquetMetadata(conf, new Path(path), cols));
                }
                return results.stream().flatMap(f -> f.stream()).iterator();
            }).collect();
        }
        finally {
            jsc.setJobDescription(previousJobDescription);
        }
        Map<String, List<HoodieColumnRangeMetadata>> fileToStatsListMap = colMinMaxInfos.stream().collect(Collectors.groupingBy(e -> e.getFilePath()));
        JavaRDD allMetaDataRDD = jsc.parallelize(new ArrayList<List<HoodieColumnRangeMetadata>>(fileToStatsListMap.values()), 1).map((Function & Serializable)f -> {
            int colSize = f.size();
            if (colSize == 0) {
                return null;
            }
            ArrayList<String> rows = new ArrayList<String>();
            rows.add(((HoodieColumnRangeMetadata)f.get(0)).getFilePath());
            cols.stream().forEach(col -> {
                HoodieColumnRangeMetadata currentColRangeMetaData = f.stream().filter(s -> s.getColumnName().trim().equalsIgnoreCase((String)col)).findFirst().orElse(null);
                DataType colType = (DataType)columnsMap.get(col);
                if (currentColRangeMetaData == null || colType == null) {
                    throw new HoodieException(String.format("cannot collect min/max statistics for col: %s", col));
                }
                if (colType instanceof IntegerType) {
                    rows.add((String)currentColRangeMetaData.getMinValue());
                    rows.add((String)currentColRangeMetaData.getMaxValue());
                } else if (colType instanceof DoubleType) {
                    rows.add((String)currentColRangeMetaData.getMinValue());
                    rows.add((String)currentColRangeMetaData.getMaxValue());
                } else if (colType instanceof StringType) {
                    rows.add(((Comparable)currentColRangeMetaData.getMinValue()).toString());
                    rows.add(((Comparable)currentColRangeMetaData.getMaxValue()).toString());
                } else if (colType instanceof DecimalType) {
                    rows.add((String)((Object)new BigDecimal(((Comparable)currentColRangeMetaData.getMinValue()).toString())));
                    rows.add((String)((Object)new BigDecimal(((Comparable)currentColRangeMetaData.getMaxValue()).toString())));
                } else if (colType instanceof DateType) {
                    rows.add((String)((Object)Date.valueOf(((Comparable)currentColRangeMetaData.getMinValue()).toString())));
                    rows.add((String)((Object)Date.valueOf(((Comparable)currentColRangeMetaData.getMaxValue()).toString())));
                } else if (colType instanceof LongType) {
                    rows.add((String)currentColRangeMetaData.getMinValue());
                    rows.add((String)currentColRangeMetaData.getMaxValue());
                } else if (colType instanceof ShortType) {
                    rows.add((String)Short.parseShort(((Comparable)currentColRangeMetaData.getMinValue()).toString()));
                    rows.add((String)Short.parseShort(((Comparable)currentColRangeMetaData.getMaxValue()).toString()));
                } else if (colType instanceof FloatType) {
                    rows.add((String)currentColRangeMetaData.getMinValue());
                    rows.add((String)currentColRangeMetaData.getMaxValue());
                } else if (colType instanceof BinaryType) {
                    rows.add((String)((Binary)currentColRangeMetaData.getMinValue()).getBytes());
                    rows.add((String)((Binary)currentColRangeMetaData.getMaxValue()).getBytes());
                } else if (colType instanceof BooleanType) {
                    rows.add((String)currentColRangeMetaData.getMinValue());
                    rows.add((String)currentColRangeMetaData.getMaxValue());
                } else if (colType instanceof ByteType) {
                    rows.add((String)((Object)Byte.valueOf(((Comparable)currentColRangeMetaData.getMinValue()).toString())));
                    rows.add((String)((Object)Byte.valueOf(((Comparable)currentColRangeMetaData.getMaxValue()).toString())));
                } else {
                    throw new HoodieException(String.format("Not support type:  %s", colType));
                }
                rows.add((String)currentColRangeMetaData.getNumNulls());
            });
            return Row$.MODULE$.apply((Seq)JavaConversions.asScalaBuffer(rows));
        }).filter((Function & Serializable)f -> f != null);
        ArrayList<StructField> allMetaDataSchema = new ArrayList<StructField>();
        allMetaDataSchema.add(new StructField("file", (DataType)StringType$.MODULE$, true, Metadata.empty()));
        cols.forEach(col -> {
            allMetaDataSchema.add(new StructField(col + "_minValue", (DataType)columnsMap.get(col), true, Metadata.empty()));
            allMetaDataSchema.add(new StructField(col + "_maxValue", (DataType)columnsMap.get(col), true, Metadata.empty()));
            allMetaDataSchema.add(new StructField(col + "_num_nulls", (DataType)LongType$.MODULE$, true, Metadata.empty()));
        });
        return df.sparkSession().createDataFrame(allMetaDataRDD, StructType$.MODULE$.apply(allMetaDataSchema));
    }

    public static Dataset<Row> getMinMaxValue(Dataset<Row> df, String cols) {
        List<String> rawCols = Arrays.asList(cols.split(",")).stream().map(f -> f.trim()).collect(Collectors.toList());
        return OrderingIndexHelper.getMinMaxValue(df, rawCols);
    }

    public static void saveStatisticsInfo(Dataset<Row> df, String cols, String indexPath, String commitTime, List<String> validateCommits) {
        Path savePath = new Path(indexPath, commitTime);
        SparkSession spark = df.sparkSession();
        FileSystem fs = FSUtils.getFs(indexPath, spark.sparkContext().hadoopConfiguration());
        Dataset<Row> statisticsDF = OrderingIndexHelper.getMinMaxValue(df, cols);
        try {
            if (!fs.exists(new Path(indexPath))) {
                statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString());
                return;
            }
            List allIndexTables = Arrays.stream(fs.listStatus(new Path(indexPath))).filter(f -> f.isDirectory()).map(f -> f.getPath().getName()).collect(Collectors.toList());
            List<String> candidateIndexTables = allIndexTables.stream().filter(f -> validateCommits.contains(f)).sorted().collect(Collectors.toList());
            List<String> residualTables = allIndexTables.stream().filter(f -> !validateCommits.contains(f)).collect(Collectors.toList());
            Option<Object> latestIndexData = Option.empty();
            if (!candidateIndexTables.isEmpty()) {
                latestIndexData = Option.of(spark.read().load(new Path(indexPath, (String)candidateIndexTables.get(candidateIndexTables.size() - 1)).toString()));
                candidateIndexTables.remove(candidateIndexTables.size() - 1);
                candidateIndexTables.forEach(f -> {
                    try {
                        fs.delete(new Path(indexPath, f));
                    }
                    catch (IOException ie) {
                        throw new HoodieException(ie);
                    }
                });
            }
            residualTables.forEach(f -> {
                try {
                    fs.delete(new Path(indexPath, f));
                }
                catch (IOException ie) {
                    throw new HoodieException(ie);
                }
            });
            if (latestIndexData.isPresent() && ((Dataset)latestIndexData.get()).schema().equals((Object)statisticsDF.schema())) {
                String originalTable = "indexTable_" + UUID.randomUUID().toString().replace("-", "");
                String updateTable = "updateTable_" + UUID.randomUUID().toString().replace("-", "");
                ((Dataset)latestIndexData.get()).registerTempTable(originalTable);
                statisticsDF.registerTempTable(updateTable);
                List<String> columns = Arrays.asList(statisticsDF.schema().fieldNames());
                spark.sql(ZOrderingIndexHelper.createIndexMergeSql(originalTable, updateTable, columns)).repartition(1).write().save(savePath.toString());
            } else {
                statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString());
            }
        }
        catch (IOException e) {
            throw new HoodieException(e);
        }
    }
}

