/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.execution.bulkinsert;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.FlatLists;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerFactory;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class TestBulkInsertInternalPartitioner
extends HoodieClientTestBase
implements Serializable {
    private static final Comparator<HoodieRecord<? extends HoodieRecordPayload>> KEY_COMPARATOR = Comparator.comparing(o -> o.getPartitionPath() + "+" + o.getRecordKey());

    public static JavaRDD<HoodieRecord> generateTestRecordsForBulkInsert(JavaSparkContext jsc) {
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        List records1 = dataGenerator.generateInserts("0", Integer.valueOf(100));
        List records2 = dataGenerator.generateInserts("0", Integer.valueOf(150));
        return jsc.parallelize(records1, 1).union(jsc.parallelize(records2, 1));
    }

    public static JavaRDD<HoodieRecord> generateTestRecordsForBulkInsert(JavaSparkContext jsc, int count) {
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        List records = dataGenerator.generateInserts("0", Integer.valueOf(count));
        return jsc.parallelize(records, 1);
    }

    public static Map<String, Long> generateExpectedPartitionNumRecords(JavaRDD<HoodieRecord> records) {
        return records.map((Function & Serializable)record -> record.getPartitionPath()).countByValue();
    }

    private static JavaRDD<HoodieRecord> generateTripleTestRecordsForBulkInsert(JavaSparkContext jsc) {
        return TestBulkInsertInternalPartitioner.generateTestRecordsForBulkInsert(jsc).union(TestBulkInsertInternalPartitioner.generateTestRecordsForBulkInsert(jsc)).union(TestBulkInsertInternalPartitioner.generateTestRecordsForBulkInsert(jsc));
    }

    private static Stream<Arguments> configParams() {
        Object[][] data = new Object[][]{{BulkInsertSortMode.GLOBAL_SORT, true, true, true, true, true}, {BulkInsertSortMode.PARTITION_SORT, true, true, false, true, true}, {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, true, false, false, true}, {BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, true, false, false, true}, {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, true, false, false, true}, {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, true, false, false, true}, {BulkInsertSortMode.NONE, true, true, false, false, true}, {BulkInsertSortMode.NONE, true, false, false, false, true}, {BulkInsertSortMode.GLOBAL_SORT, true, true, true, true, false}, {BulkInsertSortMode.PARTITION_SORT, true, true, false, true, false}, {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, true, false, false, false}, {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, true, false, false, false}};
        return Stream.of(data).map(Arguments::of);
    }

    private void verifyRecordAscendingOrder(List<HoodieRecord<? extends HoodieRecordPayload>> records, Option<Comparator<HoodieRecord<? extends HoodieRecordPayload>>> comparator) {
        ArrayList<HoodieRecord<? extends HoodieRecordPayload>> expectedRecords = new ArrayList<HoodieRecord<? extends HoodieRecordPayload>>(records);
        Collections.sort(expectedRecords, (Comparator)comparator.orElse(KEY_COMPARATOR));
        Assertions.assertEquals(expectedRecords, records);
    }

    private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner, JavaRDD<HoodieRecord> records, boolean enforceNumOutputPartitions, boolean isGloballySorted, boolean isLocallySorted, Map<String, Long> expectedPartitionNumRecords, boolean populateMetaFields) {
        this.testBulkInsertInternalPartitioner(partitioner, records, enforceNumOutputPartitions, isGloballySorted, isLocallySorted, expectedPartitionNumRecords, (Option<Comparator<HoodieRecord<? extends HoodieRecordPayload>>>)Option.empty(), populateMetaFields);
    }

    private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner, JavaRDD<HoodieRecord> records, boolean enforceNumOutputPartitions, boolean isGloballySorted, boolean isLocallySorted, Map<String, Long> expectedPartitionNumRecords, Option<Comparator<HoodieRecord<? extends HoodieRecordPayload>>> comparator, boolean populateMetaFields) {
        int numPartitions = 2;
        if (!populateMetaFields) {
            Assertions.assertThrows(HoodieException.class, () -> partitioner.repartitionRecords((Object)records, numPartitions));
            return;
        }
        JavaRDD actualRecords = (JavaRDD)partitioner.repartitionRecords(records, numPartitions);
        Assertions.assertEquals((int)(enforceNumOutputPartitions ? numPartitions : records.getNumPartitions()), (int)actualRecords.getNumPartitions());
        List collectedActualRecords = actualRecords.collect();
        if (isGloballySorted) {
            this.verifyRecordAscendingOrder(collectedActualRecords, comparator);
        } else if (isLocallySorted) {
            actualRecords.mapPartitions((FlatMapFunction & Serializable)partition -> {
                ArrayList<HoodieRecord<? extends HoodieRecordPayload>> partitionRecords = new ArrayList<HoodieRecord<? extends HoodieRecordPayload>>();
                partition.forEachRemaining(partitionRecords::add);
                this.verifyRecordAscendingOrder(partitionRecords, comparator);
                return Collections.emptyList().iterator();
            }).collect();
        }
        HashMap<String, Long> actualPartitionNumRecords = new HashMap<String, Long>();
        for (HoodieRecord record : collectedActualRecords) {
            String partitionPath = record.getPartitionPath();
            actualPartitionNumRecords.put(partitionPath, actualPartitionNumRecords.getOrDefault(partitionPath, 0L) + 1L);
        }
        Assertions.assertEquals(expectedPartitionNumRecords, actualPartitionNumRecords);
    }

    @ParameterizedTest(name="[{index}] {0} isTablePartitioned={1} enforceNumOutputPartitions={2}")
    @MethodSource(value={"configParams"})
    public void testBulkInsertInternalPartitioner(BulkInsertSortMode sortMode, boolean isTablePartitioned, boolean enforceNumOutputPartitions, boolean isGloballySorted, boolean isLocallySorted, boolean populateMetaFields) {
        JavaRDD<HoodieRecord> records1 = TestBulkInsertInternalPartitioner.generateTestRecordsForBulkInsert(this.jsc);
        JavaRDD<HoodieRecord> records2 = TestBulkInsertInternalPartitioner.generateTripleTestRecordsForBulkInsert(this.jsc);
        HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath("/").withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withBulkInsertSortMode(sortMode.name()).withPopulateMetaFields(populateMetaFields).build();
        this.testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerFactory.get((HoodieWriteConfig)config, (boolean)isTablePartitioned, (boolean)enforceNumOutputPartitions), records1, enforceNumOutputPartitions, isGloballySorted, isLocallySorted, TestBulkInsertInternalPartitioner.generateExpectedPartitionNumRecords(records1), populateMetaFields);
        this.testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerFactory.get((HoodieWriteConfig)config, (boolean)isTablePartitioned, (boolean)enforceNumOutputPartitions), records2, enforceNumOutputPartitions, isGloballySorted, isLocallySorted, TestBulkInsertInternalPartitioner.generateExpectedPartitionNumRecords(records2), populateMetaFields);
    }

    @Test
    public void testCustomColumnSortPartitioner() {
        String sortColumnString = "begin_lat";
        HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath("/").withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withUserDefinedBulkInsertPartitionerClass(RDDCustomColumnsSortPartitioner.class.getName()).withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString).build();
        String[] sortColumns = sortColumnString.split(",");
        Comparator<HoodieRecord<? extends HoodieRecordPayload>> columnComparator = this.getCustomColumnComparator(HoodieTestDataGenerator.AVRO_SCHEMA, true, sortColumns);
        JavaRDD<HoodieRecord> records1 = TestBulkInsertInternalPartitioner.generateTestRecordsForBulkInsert(this.jsc);
        JavaRDD<HoodieRecord> records2 = TestBulkInsertInternalPartitioner.generateTripleTestRecordsForBulkInsert(this.jsc);
        this.testBulkInsertInternalPartitioner((BulkInsertPartitioner)new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA, config), records1, true, true, true, TestBulkInsertInternalPartitioner.generateExpectedPartitionNumRecords(records1), (Option<Comparator<HoodieRecord<? extends HoodieRecordPayload>>>)Option.of(columnComparator), true);
        this.testBulkInsertInternalPartitioner((BulkInsertPartitioner)new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA, config), records2, true, true, true, TestBulkInsertInternalPartitioner.generateExpectedPartitionNumRecords(records2), (Option<Comparator<HoodieRecord<? extends HoodieRecordPayload>>>)Option.of(columnComparator), true);
        this.testBulkInsertInternalPartitioner((BulkInsertPartitioner)new RDDCustomColumnsSortPartitioner(config), records1, true, true, true, TestBulkInsertInternalPartitioner.generateExpectedPartitionNumRecords(records1), (Option<Comparator<HoodieRecord<? extends HoodieRecordPayload>>>)Option.of(columnComparator), true);
        this.testBulkInsertInternalPartitioner((BulkInsertPartitioner)new RDDCustomColumnsSortPartitioner(config), records2, true, true, true, TestBulkInsertInternalPartitioner.generateExpectedPartitionNumRecords(records2), (Option<Comparator<HoodieRecord<? extends HoodieRecordPayload>>>)Option.of(columnComparator), true);
    }

    private Comparator<HoodieRecord<? extends HoodieRecordPayload>> getCustomColumnComparator(Schema schema, boolean prependPartitionPath, String[] sortColumns) {
        Comparator<HoodieRecord<? extends HoodieRecordPayload>> comparator = Comparator.comparing(record -> {
            try {
                GenericRecord genericRecord = (GenericRecord)((HoodieRecordPayload)record.getData()).getInsertValue(schema).get();
                ArrayList<Object> keys = new ArrayList<Object>();
                if (prependPartitionPath) {
                    keys.add(record.getPartitionPath());
                }
                for (String col : sortColumns) {
                    keys.add(genericRecord.get(col));
                }
                return keys;
            }
            catch (IOException e) {
                throw new HoodieIOException("unable to read value for " + sortColumns);
            }
        }, (o1, o2) -> {
            FlatLists.ComparableList values1 = FlatLists.ofComparableArray((Object[])o1.toArray());
            FlatLists.ComparableList values2 = FlatLists.ofComparableArray((Object[])o2.toArray());
            return values1.compareTo((Object)values2);
        });
        return comparator;
    }
}

