/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.functional;

import java.io.IOException;
import java.io.Serializable;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.avro.model.HoodiePath;
import org.apache.hudi.client.bootstrap.BootstrapMode;
import org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider;
import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector;
import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector;
import org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.bootstrap.FileStatusUtils;
import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.AvroOrcUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.OrcReaderIterator;
import org.apache.hudi.common.util.PartitionPathEncodeUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieBootstrapConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.functional.SparkRDDWriteClientOverride;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.table.action.bootstrap.BootstrapUtils;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIf;
import org.junit.jupiter.api.io.TempDir;

@Tag(value="functional")
@DisabledIf(value="org.apache.hudi.HoodieSparkUtils#gteqSpark3_0", disabledReason="Orc for spark3 is fixed only in 13.0 release. 0.12.X does not have this support")
public class TestOrcBootstrap
extends HoodieClientTestBase {
    public static final String TRIP_HIVE_COLUMN_TYPES = "bigint,string,string,string,double,double,double,double,struct<amount:double,currency:string>,array<struct<amount:double,currency:string>>,boolean";
    @TempDir
    public java.nio.file.Path tmpFolder;
    protected String bootstrapBasePath = null;
    private HoodieParquetInputFormat roInputFormat;
    private JobConf roJobConf;
    private HoodieParquetRealtimeInputFormat rtInputFormat;
    private JobConf rtJobConf;
    private SparkSession spark;

    @BeforeEach
    public void setUp() throws Exception {
        this.bootstrapBasePath = this.tmpFolder.toAbsolutePath().toString() + "/data";
        this.initPath();
        this.initSparkContexts();
        this.initTestDataGenerator();
        this.initMetaClient();
        this.reloadInputFormats();
    }

    @AfterEach
    public void tearDown() throws IOException {
        this.cleanupSparkContexts();
        this.cleanupClients();
        this.cleanupTestDataGenerator();
    }

    private void reloadInputFormats() {
        this.roInputFormat = new HoodieParquetInputFormat();
        this.roJobConf = new JobConf(this.jsc.hadoopConfiguration());
        this.roInputFormat.setConf((Configuration)this.roJobConf);
    }

    public Schema generateNewDataSetAndReturnSchema(long timestamp, int numRecords, List<String> partitionPaths, String srcPath) throws Exception {
        boolean isPartitioned = partitionPaths != null && !partitionPaths.isEmpty();
        Dataset<Row> df = TestOrcBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths, this.jsc, this.sqlContext);
        df.printSchema();
        if (isPartitioned) {
            df.write().partitionBy(new String[]{"datestr"}).format("orc").mode(SaveMode.Overwrite).save(srcPath);
        } else {
            df.write().format("orc").mode(SaveMode.Overwrite).save(srcPath);
        }
        String filePath = FileStatusUtils.toPath((HoodiePath)((HoodieFileStatus)((Optional)BootstrapUtils.getAllLeafFoldersWithFiles((HoodieTableMetaClient)this.metaClient, (FileSystem)this.metaClient.getFs(), (String)srcPath, (HoodieEngineContext)this.context).stream().findAny().map(p -> ((List)p.getValue()).stream().findAny()).orElse(null)).get()).getPath()).toString();
        Reader orcReader = OrcFile.createReader((Path)new Path(filePath), (OrcFile.ReaderOptions)OrcFile.readerOptions((Configuration)this.metaClient.getHadoopConf()));
        TypeDescription orcSchema = orcReader.getSchema();
        return AvroOrcUtils.createAvroSchemaWithDefaultValue((TypeDescription)orcSchema, (String)"test_orc_record", null, (boolean)true);
    }

    @Test
    public void testMetadataBootstrapNonpartitionedCOW() throws Exception {
        this.testBootstrapCommon(false, false, EffectiveMode.METADATA_BOOTSTRAP_MODE);
    }

    @Test
    public void testMetadataBootstrapWithUpdatesCOW() throws Exception {
        this.testBootstrapCommon(true, false, EffectiveMode.METADATA_BOOTSTRAP_MODE);
    }

    private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, EffectiveMode mode) throws Exception {
        List<String> bootstrapInstants;
        int numInstantsAfterBootstrap;
        boolean isBootstrapIndexCreated;
        boolean checkNumRawFiles;
        String bootstrapCommitInstantTs;
        String bootstrapModeSelectorClass;
        this.metaClient = deltaCommit ? HoodieTestUtils.init((String)this.basePath, (HoodieTableType)HoodieTableType.MERGE_ON_READ, (String)this.bootstrapBasePath, (HoodieFileFormat)HoodieFileFormat.ORC) : HoodieTestUtils.init((String)this.basePath, (HoodieTableType)HoodieTableType.COPY_ON_WRITE, (String)this.bootstrapBasePath, (HoodieFileFormat)HoodieFileFormat.ORC);
        int totalRecords = 100;
        String keyGeneratorClass = partitioned ? SimpleKeyGenerator.class.getCanonicalName() : NonpartitionedKeyGenerator.class.getCanonicalName();
        switch (mode) {
            case FULL_BOOTSTRAP_MODE: {
                bootstrapModeSelectorClass = FullRecordBootstrapModeSelector.class.getCanonicalName();
                bootstrapCommitInstantTs = "00000000000002";
                checkNumRawFiles = false;
                isBootstrapIndexCreated = false;
                numInstantsAfterBootstrap = 1;
                bootstrapInstants = Arrays.asList(bootstrapCommitInstantTs);
                break;
            }
            case METADATA_BOOTSTRAP_MODE: {
                bootstrapModeSelectorClass = MetadataOnlyBootstrapModeSelector.class.getCanonicalName();
                bootstrapCommitInstantTs = "00000000000001";
                checkNumRawFiles = true;
                isBootstrapIndexCreated = true;
                numInstantsAfterBootstrap = 1;
                bootstrapInstants = Arrays.asList(bootstrapCommitInstantTs);
                break;
            }
            default: {
                bootstrapModeSelectorClass = TestRandomBootstrapModeSelector.class.getName();
                bootstrapCommitInstantTs = "00000000000002";
                checkNumRawFiles = false;
                isBootstrapIndexCreated = true;
                numInstantsAfterBootstrap = 2;
                bootstrapInstants = Arrays.asList("00000000000001", "00000000000002");
            }
        }
        List<String> partitions = Arrays.asList("2020/04/01", "2020/04/02", "2020/04/03");
        long timestamp = Instant.now().toEpochMilli();
        Schema schema = this.generateNewDataSetAndReturnSchema(timestamp, totalRecords, partitions, this.bootstrapBasePath);
        HoodieWriteConfig config = this.getConfigBuilder(schema.toString()).withAutoCommit(true).withSchema(schema.toString()).withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()).withBootstrapConfig(HoodieBootstrapConfig.newBuilder().withBootstrapBasePath(this.bootstrapBasePath).withBootstrapKeyGenClass(keyGeneratorClass).withFullBootstrapInputProvider(TestFullBootstrapDataProvider.class.getName()).withBootstrapParallelism(3).withBootstrapModeSelector(bootstrapModeSelectorClass).build()).build();
        SparkRDDWriteClientOverride client = new SparkRDDWriteClientOverride((HoodieEngineContext)this.context, config);
        client.bootstrap(Option.empty());
        this.checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap, numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants, true);
        if (deltaCommit) {
            FileCreateUtils.deleteDeltaCommit((String)this.metaClient.getBasePath(), (String)bootstrapCommitInstantTs);
        } else {
            FileCreateUtils.deleteCommit((String)this.metaClient.getBasePath(), (String)bootstrapCommitInstantTs);
        }
        client.rollbackFailedBootstrap();
        this.metaClient.reloadActiveTimeline();
        Assertions.assertEquals((int)0, (int)this.metaClient.getCommitsTimeline().countInstants());
        Assertions.assertEquals((long)0L, (long)BootstrapUtils.getAllLeafFoldersWithFiles((HoodieTableMetaClient)this.metaClient, (FileSystem)this.metaClient.getFs(), (String)this.basePath, (HoodieEngineContext)this.context).stream().flatMap(f -> ((List)f.getValue()).stream()).count());
        BootstrapIndex index = BootstrapIndex.getBootstrapIndex((HoodieTableMetaClient)this.metaClient);
        Assertions.assertFalse((boolean)index.useIndex());
        client = new SparkRDDWriteClientOverride((HoodieEngineContext)this.context, config);
        client.bootstrap(Option.empty());
        this.metaClient.reloadActiveTimeline();
        index = BootstrapIndex.getBootstrapIndex((HoodieTableMetaClient)this.metaClient);
        if (isBootstrapIndexCreated) {
            Assertions.assertTrue((boolean)index.useIndex());
        } else {
            Assertions.assertFalse((boolean)index.useIndex());
        }
        this.checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap, numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants, true);
        long updateTimestamp = Instant.now().toEpochMilli();
        String updateSPath = this.tmpFolder.toAbsolutePath().toString() + "/data2";
        this.generateNewDataSetAndReturnSchema(updateTimestamp, totalRecords, partitions, updateSPath);
        JavaRDD<HoodieRecord> updateBatch = TestOrcBootstrap.generateInputBatch(this.jsc, BootstrapUtils.getAllLeafFoldersWithFiles((HoodieTableMetaClient)this.metaClient, (FileSystem)this.metaClient.getFs(), (String)updateSPath, (HoodieEngineContext)this.context), schema);
        String newInstantTs = client.startCommit();
        client.upsert(updateBatch, newInstantTs);
        this.checkBootstrapResults(totalRecords, schema, newInstantTs, false, numInstantsAfterBootstrap + 1, updateTimestamp, deltaCommit ? timestamp : updateTimestamp, deltaCommit, true);
        if (deltaCommit) {
            Option compactionInstant = client.scheduleCompaction(Option.empty());
            Assertions.assertTrue((boolean)compactionInstant.isPresent());
            client.compact((String)compactionInstant.get());
            this.checkBootstrapResults(totalRecords, schema, (String)compactionInstant.get(), checkNumRawFiles, numInstantsAfterBootstrap + 2, 2, updateTimestamp, updateTimestamp, !deltaCommit, Arrays.asList((String)compactionInstant.get()), !config.isPreserveHoodieCommitMetadataForCompaction());
        }
    }

    @Test
    public void testMetadataBootstrapWithUpdatesMOR() throws Exception {
        this.testBootstrapCommon(true, true, EffectiveMode.METADATA_BOOTSTRAP_MODE);
    }

    @Test
    public void testFullBootstrapOnlyCOW() throws Exception {
        this.testBootstrapCommon(true, false, EffectiveMode.FULL_BOOTSTRAP_MODE);
    }

    @Test
    public void testFullBootstrapWithUpdatesMOR() throws Exception {
        this.testBootstrapCommon(true, true, EffectiveMode.FULL_BOOTSTRAP_MODE);
    }

    @Test
    public void testMetaAndFullBootstrapCOW() throws Exception {
        this.testBootstrapCommon(true, false, EffectiveMode.MIXED_BOOTSTRAP_MODE);
    }

    @Test
    public void testMetadataAndFullBootstrapWithUpdatesMOR() throws Exception {
        this.testBootstrapCommon(true, true, EffectiveMode.MIXED_BOOTSTRAP_MODE);
    }

    private void checkBootstrapResults(int totalRecords, Schema schema, String maxInstant, boolean checkNumRawFiles, int expNumInstants, long expTimestamp, long expROTimestamp, boolean isDeltaCommit, boolean validateRecordsForCommitTime) throws Exception {
        this.checkBootstrapResults(totalRecords, schema, maxInstant, checkNumRawFiles, expNumInstants, expNumInstants, expTimestamp, expROTimestamp, isDeltaCommit, Arrays.asList(maxInstant), validateRecordsForCommitTime);
    }

    private void checkBootstrapResults(int totalRecords, Schema schema, String instant, boolean checkNumRawFiles, int expNumInstants, int numVersions, long expTimestamp, long expROTimestamp, boolean isDeltaCommit, List<String> instantsWithValidRecords, boolean validateCommitRecords) throws Exception {
        this.metaClient.reloadActiveTimeline();
        Assertions.assertEquals((int)expNumInstants, (int)this.metaClient.getCommitsTimeline().filterCompletedInstants().countInstants());
        Assertions.assertEquals((Object)instant, (Object)((HoodieInstant)this.metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get()).getTimestamp());
        Dataset bootstrapped = this.sqlContext.read().format("orc").load(this.basePath);
        Dataset original = this.sqlContext.read().format("orc").load(this.bootstrapBasePath);
        bootstrapped.registerTempTable("bootstrapped");
        original.registerTempTable("original");
        if (checkNumRawFiles) {
            List files = BootstrapUtils.getAllLeafFoldersWithFiles((HoodieTableMetaClient)this.metaClient, (FileSystem)this.metaClient.getFs(), (String)this.bootstrapBasePath, (HoodieEngineContext)this.context).stream().flatMap(x -> ((List)x.getValue()).stream()).collect(Collectors.toList());
            Assertions.assertEquals((long)(files.size() * numVersions), (long)this.sqlContext.sql("select distinct _hoodie_file_name from bootstrapped").count());
        }
        if (!isDeltaCommit) {
            String predicate = String.join((CharSequence)", ", instantsWithValidRecords.stream().map(p -> "\"" + p + "\"").collect(Collectors.toList()));
            if (validateCommitRecords) {
                Assertions.assertEquals((long)totalRecords, (long)this.sqlContext.sql("select * from bootstrapped where _hoodie_commit_time IN (" + predicate + ")").count());
            }
            Dataset missingOriginal = this.sqlContext.sql("select a._row_key from original a where a._row_key not in (select _hoodie_record_key from bootstrapped)");
            Assertions.assertEquals((long)0L, (long)missingOriginal.count());
            Dataset missingBootstrapped = this.sqlContext.sql("select a._hoodie_record_key from bootstrapped a where a._hoodie_record_key not in (select _row_key from original)");
            Assertions.assertEquals((long)0L, (long)missingBootstrapped.count());
        }
    }

    private static JavaRDD<HoodieRecord> generateInputBatch(JavaSparkContext jsc, List<Pair<String, List<HoodieFileStatus>>> partitionPaths, Schema writerSchema) {
        List fullFilePathsWithPartition = partitionPaths.stream().flatMap(p -> ((List)p.getValue()).stream().map(x -> Pair.of((Object)p.getKey(), (Object)FileStatusUtils.toPath((HoodiePath)x.getPath())))).collect(Collectors.toList());
        return jsc.parallelize(fullFilePathsWithPartition.stream().flatMap(p -> {
            try {
                Configuration conf = jsc.hadoopConfiguration();
                AvroReadSupport.setAvroReadSchema((Configuration)conf, (Schema)writerSchema);
                Reader orcReader = OrcFile.createReader((Path)((Path)p.getValue()), (OrcFile.ReaderOptions)new OrcFile.ReaderOptions(jsc.hadoopConfiguration()));
                RecordReader recordReader = orcReader.rows();
                TypeDescription orcSchema = orcReader.getSchema();
                Schema avroSchema = AvroOrcUtils.createAvroSchemaWithDefaultValue((TypeDescription)orcSchema, (String)"test_orc_record", null, (boolean)true);
                OrcReaderIterator recIterator = new OrcReaderIterator(recordReader, avroSchema, orcSchema);
                return StreamSupport.stream(Spliterators.spliteratorUnknownSize(recIterator, 0), false).map(gr -> {
                    try {
                        String key = gr.get("_row_key").toString();
                        String pPath = (String)p.getKey();
                        return new HoodieAvroRecord(new HoodieKey(key, pPath), (HoodieRecordPayload)new RawTripTestPayload(gr.toString(), key, pPath, "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}"));
                    }
                    catch (IOException e) {
                        throw new HoodieIOException(e.getMessage(), e);
                    }
                });
            }
            catch (IOException ioe) {
                throw new HoodieIOException(ioe.getMessage(), ioe);
            }
        }).collect(Collectors.toList()));
    }

    public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
        HoodieWriteConfig.Builder builder = this.getConfigBuilder(schemaStr, HoodieIndex.IndexType.BLOOM).withExternalSchemaTrasformation(true);
        TypedProperties properties = new TypedProperties();
        properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key");
        properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "datestr");
        builder = builder.withProps((Map)properties);
        return builder;
    }

    public static Dataset<Row> generateTestRawTripDataset(long timestamp, int from, int to, List<String> partitionPaths, JavaSparkContext jsc, SQLContext sqlContext) {
        boolean isPartitioned = partitionPaths != null && !partitionPaths.isEmpty();
        ArrayList records = new ArrayList();
        IntStream.range(from, to).forEach(i -> {
            String id = "" + i;
            records.add(new HoodieTestDataGenerator().generateGenericRecord("trip_" + id, Long.toString(timestamp), "rider_" + id, "driver_" + id, timestamp, false, false).toString());
        });
        if (isPartitioned) {
            sqlContext.udf().register("partgen", (UDF1 & Serializable)val -> PartitionPathEncodeUtils.escapePathName((String)((String)partitionPaths.get(Integer.parseInt(val.split("_")[1]) % partitionPaths.size()))), DataTypes.StringType);
        }
        JavaRDD rdd = jsc.parallelize(records);
        Dataset df = sqlContext.read().json(rdd);
        if (isPartitioned) {
            df = df.withColumn("datestr", functions.callUDF((String)"partgen", (Column[])new Column[]{new Column("_row_key")}));
            df = df.select("timestamp", new String[]{"_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon", "fare", "tip_history", "_hoodie_is_deleted", "datestr"});
        } else {
            df = df.select("timestamp", new String[]{"_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon", "fare", "tip_history", "_hoodie_is_deleted"});
        }
        return df;
    }

    public static class TestRandomBootstrapModeSelector
    extends BootstrapModeSelector {
        private int currIdx = new Random().nextInt(2);

        public TestRandomBootstrapModeSelector(HoodieWriteConfig writeConfig) {
            super(writeConfig);
        }

        public Map<BootstrapMode, List<String>> select(List<Pair<String, List<HoodieFileStatus>>> partitions) {
            ArrayList selections = new ArrayList();
            partitions.stream().forEach(p -> {
                BootstrapMode mode = this.currIdx == 0 ? BootstrapMode.METADATA_ONLY : BootstrapMode.FULL_RECORD;
                this.currIdx = (this.currIdx + 1) % 2;
                selections.add(Pair.of((Object)mode, (Object)p.getKey()));
            });
            return selections.stream().collect(Collectors.groupingBy(Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toList())));
        }
    }

    public static class TestFullBootstrapDataProvider
    extends FullRecordBootstrapDataProvider<JavaRDD<HoodieRecord>> {
        public TestFullBootstrapDataProvider(TypedProperties props, HoodieSparkEngineContext context) {
            super(props, (HoodieEngineContext)context);
        }

        public JavaRDD<HoodieRecord> generateInputRecords(String tableName, String sourceBasePath, List<Pair<String, List<HoodieFileStatus>>> partitionPaths) {
            String[] filePaths = (String[])partitionPaths.stream().map(Pair::getValue).flatMap(f -> f.stream().map(fs -> FileStatusUtils.toPath((HoodiePath)fs.getPath()).toString())).toArray(String[]::new);
            JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext((HoodieEngineContext)this.context);
            String filePath = FileStatusUtils.toPath((HoodiePath)((HoodieFileStatus)partitionPaths.stream().flatMap(p -> ((List)p.getValue()).stream()).findAny().get()).getPath()).toString();
            try {
                Reader orcReader = OrcFile.createReader((Path)new Path(filePath), (OrcFile.ReaderOptions)new OrcFile.ReaderOptions(jsc.hadoopConfiguration()));
                TypeDescription orcSchema = orcReader.getSchema();
                Schema avroSchema = AvroOrcUtils.createAvroSchemaWithDefaultValue((TypeDescription)orcSchema, (String)"test_orc_record", null, (boolean)true);
                return TestOrcBootstrap.generateInputBatch(jsc, partitionPaths, avroSchema);
            }
            catch (IOException ioe) {
                throw new HoodieIOException(ioe.getMessage(), ioe);
            }
        }
    }

    private static enum EffectiveMode {
        FULL_BOOTSTRAP_MODE,
        METADATA_BOOTSTRAP_MODE,
        MIXED_BOOTSTRAP_MODE;

    }
}

