/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.testutils.sources;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.collection.RocksDBBasedMap;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.AvroSource;
import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

public abstract class AbstractBaseTestSource
extends AvroSource {
    private static final Logger LOG = LogManager.getLogger(AbstractBaseTestSource.class);
    public static final int DEFAULT_PARTITION_NUM = 0;
    protected static transient Map<Integer, HoodieTestDataGenerator> dataGeneratorMap = new HashMap<Integer, HoodieTestDataGenerator>();

    public static void initDataGen() {
        dataGeneratorMap.putIfAbsent(0, new HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS));
    }

    public static void initDataGen(TypedProperties props, int partition) {
        try {
            boolean useRocksForTestDataGenKeys = props.getBoolean("hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys", SourceConfigs.DEFAULT_USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS.booleanValue());
            String baseStoreDir = props.getString("hoodie.deltastreamer.source.test.datagen.rocksdb_base_dir", File.createTempFile("test_data_gen", ".keys").getParent()) + "/" + partition;
            LOG.info((Object)("useRocksForTestDataGenKeys=" + useRocksForTestDataGenKeys + ", BaseStoreDir=" + baseStoreDir));
            dataGeneratorMap.put(partition, new HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, (Map)(useRocksForTestDataGenKeys ? new RocksDBBasedMap(baseStoreDir) : new HashMap())));
        }
        catch (IOException e) {
            throw new HoodieIOException(e.getMessage(), e);
        }
    }

    public static void resetDataGen() {
        for (HoodieTestDataGenerator dataGenerator : dataGeneratorMap.values()) {
            dataGenerator.close();
        }
        dataGeneratorMap.clear();
    }

    protected AbstractBaseTestSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) {
        super(props, sparkContext, sparkSession, schemaProvider);
    }

    protected static Stream<GenericRecord> fetchNextBatch(TypedProperties props, int sourceLimit, String instantTime, int partition) {
        Stream<GenericRecord> updateStream;
        int maxUniqueKeys = props.getInteger("hoodie.deltastreamer.source.test.max_unique_records", SourceConfigs.DEFAULT_MAX_UNIQUE_RECORDS.intValue());
        HoodieTestDataGenerator dataGenerator = dataGeneratorMap.get(partition);
        int numExistingKeys = dataGenerator.getNumExistingKeys("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": null, \"type\": {\"type\": \"array\", \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}");
        LOG.info((Object)("NumExistingKeys=" + numExistingKeys));
        int numUpdates = Math.min(numExistingKeys, sourceLimit / 2);
        int numInserts = sourceLimit - numUpdates;
        LOG.info((Object)("Before adjustments => numInserts=" + numInserts + ", numUpdates=" + numUpdates));
        boolean reachedMax = false;
        if (numInserts + numExistingKeys > maxUniqueKeys) {
            numInserts = Math.max(0, maxUniqueKeys - numExistingKeys);
            reachedMax = true;
        }
        if (numInserts + numUpdates < sourceLimit) {
            numUpdates = Math.min(numExistingKeys, sourceLimit - numInserts);
        }
        Stream<Object> deleteStream = Stream.empty();
        long memoryUsage1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
        LOG.info((Object)("Before DataGen. Memory Usage=" + memoryUsage1 + ", Total Memory=" + Runtime.getRuntime().totalMemory() + ", Free Memory=" + Runtime.getRuntime().freeMemory()));
        if (!reachedMax && numUpdates >= 50) {
            LOG.info((Object)("After adjustments => NumInserts=" + numInserts + ", NumUpdates=" + (numUpdates - 50) + ", NumDeletes=50, maxUniqueRecords=" + maxUniqueKeys));
            deleteStream = dataGenerator.generateUniqueDeleteRecordStream(instantTime, Integer.valueOf(50)).map(AbstractBaseTestSource::toGenericRecord);
            updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime, Integer.valueOf(numUpdates - 50), "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": null, \"type\": {\"type\": \"array\", \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").map(AbstractBaseTestSource::toGenericRecord);
        } else {
            LOG.info((Object)("After adjustments => NumInserts=" + numInserts + ", NumUpdates=" + numUpdates + ", maxUniqueRecords=" + maxUniqueKeys));
            updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime, Integer.valueOf(numUpdates), "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": null, \"type\": {\"type\": \"array\", \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").map(AbstractBaseTestSource::toGenericRecord);
        }
        Stream<GenericRecord> insertStream = dataGenerator.generateInsertsStream(instantTime, Integer.valueOf(numInserts), false, "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": null, \"type\": {\"type\": \"array\", \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").map(AbstractBaseTestSource::toGenericRecord);
        return Stream.concat(deleteStream, Stream.concat(updateStream, insertStream));
    }

    private static GenericRecord toGenericRecord(HoodieRecord hoodieRecord) {
        try {
            RawTripTestPayload payload = (RawTripTestPayload)hoodieRecord.getData();
            return (GenericRecord)payload.getRecordToInsert(HoodieTestDataGenerator.AVRO_SCHEMA);
        }
        catch (IOException e) {
            return null;
        }
    }
}

