/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.functional;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveClient;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.utilities.DummySchemaProvider;
import org.apache.hudi.utilities.HoodieClusteringJob;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.CsvDFSSource;
import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.ParquetDFSSource;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.hudi.utilities.testutils.sources.DistributedTestDataSource;
import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer;
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF4;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class TestHoodieDeltaStreamer
extends UtilitiesTestBase {
    private static final Random RANDOM = new Random();
    private static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties";
    public static final String PROPS_FILENAME_TEST_SOURCE1 = "test-source1.properties";
    public static final String PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1 = "test-invalid-hive-sync-source1.properties";
    public static final String PROPS_INVALID_FILE = "test-invalid-props.properties";
    public static final String PROPS_INVALID_TABLE_CONFIG_FILE = "test-invalid-table-config.properties";
    private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties";
    private static final String PROPS_FILENAME_TEST_CSV = "test-csv-dfs-source.properties";
    private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties";
    private static String PARQUET_SOURCE_ROOT;
    private static final int PARQUET_NUM_RECORDS = 5;
    private static final int CSV_NUM_RECORDS = 3;
    private static final String TGT_BASE_PATH_PARAM = "--target-base-path";
    private static final String TGT_BASE_PATH_VALUE = "s3://mybucket/blah";
    private static final String TABLE_TYPE_PARAM = "--table-type";
    private static final String TABLE_TYPE_VALUE = "COPY_ON_WRITE";
    private static final String TARGET_TABLE_PARAM = "--target-table";
    private static final String TARGET_TABLE_VALUE = "test";
    private static final String BASE_FILE_FORMAT_PARAM = "--base-file-format";
    private static final String BASE_FILE_FORMAT_VALUE = "PARQUET";
    private static final String SOURCE_LIMIT_PARAM = "--source-limit";
    private static final String SOURCE_LIMIT_VALUE = "500";
    private static final String ENABLE_HIVE_SYNC_PARAM = "--enable-hive-sync";
    private static final String HOODIE_CONF_PARAM = "--hoodie-conf";
    private static final String HOODIE_CONF_VALUE1 = "hoodie.datasource.hive_sync.table=test_table";
    private static final String HOODIE_CONF_VALUE2 = "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3";
    private static final Logger LOG;
    public static KafkaTestUtils testUtils;
    private static int testNum;

    @BeforeAll
    public static void initClass() throws Exception {
        UtilitiesTestBase.initClass(true);
        PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
        testUtils = new KafkaTestUtils();
        testUtils.setup();
        UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", (FileSystem)dfs, dfsBasePath + "/base.properties");
        UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", (FileSystem)dfs, dfsBasePath + "/config/base.properties");
        UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties", (FileSystem)dfs, dfsBasePath + "/sql-transformer.properties");
        UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", (FileSystem)dfs, dfsBasePath + "/source.avsc");
        UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc", (FileSystem)dfs, dfsBasePath + "/source-flattened.avsc");
        UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", (FileSystem)dfs, dfsBasePath + "/target.avsc");
        UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target-flattened.avsc", (FileSystem)dfs, dfsBasePath + "/target-flattened.avsc");
        UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source_short_trip_uber.avsc", (FileSystem)dfs, dfsBasePath + "/source_short_trip_uber.avsc");
        UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source_uber.avsc", (FileSystem)dfs, dfsBasePath + "/source_uber.avsc");
        UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target_short_trip_uber.avsc", (FileSystem)dfs, dfsBasePath + "/target_short_trip_uber.avsc");
        UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target_uber.avsc", (FileSystem)dfs, dfsBasePath + "/target_uber.avsc");
        UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/invalid_hive_sync_uber_config.properties", (FileSystem)dfs, dfsBasePath + "/config/invalid_hive_sync_uber_config.properties");
        UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/uber_config.properties", (FileSystem)dfs, dfsBasePath + "/config/uber_config.properties");
        UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/short_trip_uber_config.properties", (FileSystem)dfs, dfsBasePath + "/config/short_trip_uber_config.properties");
        UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/clusteringjob.properties", (FileSystem)dfs, dfsBasePath + "/clusteringjob.properties");
        TypedProperties props = new TypedProperties();
        props.setProperty("include", "sql-transformer.properties");
        props.setProperty("hoodie.datasource.write.keygenerator.class", TestGenerator.class.getName());
        props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
        props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
        props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
        props.setProperty(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), "jdbc:hive2://127.0.0.1:9999/");
        props.setProperty(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), "testdb1");
        props.setProperty(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(), "hive_trips");
        props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "datestr");
        props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(), MultiPartKeysValueExtractor.class.getName());
        UtilitiesTestBase.Helpers.savePropsToDFS(props, (FileSystem)dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE);
        TypedProperties downstreamProps = new TypedProperties();
        downstreamProps.setProperty("include", "base.properties");
        downstreamProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        downstreamProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
        downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/target.avsc");
        downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
        UtilitiesTestBase.Helpers.savePropsToDFS(downstreamProps, (FileSystem)dfs, dfsBasePath + "/test-downstream-source.properties");
        TypedProperties invalidProps = new TypedProperties();
        invalidProps.setProperty("include", "sql-transformer.properties");
        invalidProps.setProperty("hoodie.datasource.write.keygenerator.class", "invalid");
        invalidProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        invalidProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
        invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
        invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
        UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, (FileSystem)dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_INVALID);
        TypedProperties props1 = new TypedProperties();
        TestHoodieDeltaStreamer.populateCommonProps(props1);
        UtilitiesTestBase.Helpers.savePropsToDFS(props1, (FileSystem)dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE1);
        TypedProperties properties = new TypedProperties();
        TestHoodieDeltaStreamer.populateInvalidTableConfigFilePathProps(properties);
        UtilitiesTestBase.Helpers.savePropsToDFS(properties, (FileSystem)dfs, dfsBasePath + "/" + PROPS_INVALID_TABLE_CONFIG_FILE);
        TypedProperties invalidHiveSyncProps = new TypedProperties();
        invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", "uber_db.dummy_table_uber");
        invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/invalid_hive_sync_uber_config.properties");
        UtilitiesTestBase.Helpers.savePropsToDFS(invalidHiveSyncProps, (FileSystem)dfs, dfsBasePath + "/" + PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1);
        TestHoodieDeltaStreamer.prepareParquetDFSFiles(5);
    }

    private static void populateInvalidTableConfigFilePathProps(TypedProperties props) {
        props.setProperty("hoodie.datasource.write.keygenerator.class", TestGenerator.class.getName());
        props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyyMMdd");
        props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", "uber_db.dummy_table_uber");
        props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/invalid_uber_config.properties");
    }

    private static void populateCommonProps(TypedProperties props) {
        props.setProperty("hoodie.datasource.write.keygenerator.class", TestGenerator.class.getName());
        props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyyMMdd");
        props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", "short_trip_db.dummy_table_short_trip,uber_db.dummy_table_uber");
        props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/uber_config.properties");
        props.setProperty("hoodie.deltastreamer.ingestion.short_trip_db.dummy_table_short_trip.configFile", dfsBasePath + "/config/short_trip_uber_config.properties");
        props.setProperty("bootstrap.servers", testUtils.brokerAddress());
        props.setProperty("auto.offset.reset", "earliest");
        props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", String.valueOf(5000));
        props.setProperty(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), "jdbc:hive2://127.0.0.1:9999/");
        props.setProperty(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), "testdb2");
        props.setProperty(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY(), "false");
        props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "datestr");
        props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(), MultiPartKeysValueExtractor.class.getName());
    }

    @AfterAll
    public static void cleanupClass() {
        UtilitiesTestBase.cleanupClass();
    }

    @Override
    @BeforeEach
    public void setup() throws Exception {
        super.setup();
    }

    @Override
    @AfterEach
    public void teardown() throws Exception {
        super.teardown();
    }

    @Test
    public void testProps() {
        TypedProperties props = new DFSPropertiesConfiguration((FileSystem)dfs, new Path(dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getConfig();
        Assertions.assertEquals((int)2, (int)props.getInteger("hoodie.upsert.shuffle.parallelism"));
        Assertions.assertEquals((Object)"_row_key", (Object)props.getString("hoodie.datasource.write.recordkey.field"));
        Assertions.assertEquals((Object)"org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer$TestGenerator", (Object)props.getString("hoodie.datasource.write.keygenerator.class"));
    }

    private static HoodieDeltaStreamer.Config getBaseConfig() {
        HoodieDeltaStreamer.Config base = new HoodieDeltaStreamer.Config();
        base.targetBasePath = TGT_BASE_PATH_VALUE;
        base.tableType = TABLE_TYPE_VALUE;
        base.targetTableName = TARGET_TABLE_VALUE;
        return base;
    }

    private static Stream<Arguments> provideValidCliArgs() {
        HoodieDeltaStreamer.Config base = TestHoodieDeltaStreamer.getBaseConfig();
        HoodieDeltaStreamer.Config conf1 = TestHoodieDeltaStreamer.getBaseConfig();
        conf1.baseFileFormat = BASE_FILE_FORMAT_VALUE;
        HoodieDeltaStreamer.Config conf2 = TestHoodieDeltaStreamer.getBaseConfig();
        conf2.sourceLimit = Long.parseLong(SOURCE_LIMIT_VALUE);
        HoodieDeltaStreamer.Config conf3 = TestHoodieDeltaStreamer.getBaseConfig();
        conf3.enableHiveSync = true;
        HoodieDeltaStreamer.Config conf4 = TestHoodieDeltaStreamer.getBaseConfig();
        conf4.configs = Arrays.asList(HOODIE_CONF_VALUE1);
        HoodieDeltaStreamer.Config conf5 = TestHoodieDeltaStreamer.getBaseConfig();
        conf5.configs = Arrays.asList(HOODIE_CONF_VALUE2);
        HoodieDeltaStreamer.Config conf6 = TestHoodieDeltaStreamer.getBaseConfig();
        conf6.configs = Arrays.asList(HOODIE_CONF_VALUE1, HOODIE_CONF_VALUE2);
        HoodieDeltaStreamer.Config conf = TestHoodieDeltaStreamer.getBaseConfig();
        conf.baseFileFormat = BASE_FILE_FORMAT_VALUE;
        conf.sourceLimit = Long.parseLong(SOURCE_LIMIT_VALUE);
        conf.enableHiveSync = true;
        conf.configs = Arrays.asList(HOODIE_CONF_VALUE1, HOODIE_CONF_VALUE2);
        String[] allConfig = new String[]{TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, SOURCE_LIMIT_PARAM, SOURCE_LIMIT_VALUE, TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, BASE_FILE_FORMAT_PARAM, BASE_FILE_FORMAT_VALUE, ENABLE_HIVE_SYNC_PARAM, HOODIE_CONF_PARAM, HOODIE_CONF_VALUE1, HOODIE_CONF_PARAM, HOODIE_CONF_VALUE2};
        return Stream.of(Arguments.of((Object[])new Object[]{new String[]{TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE}, base}), Arguments.of((Object[])new Object[]{new String[]{TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, BASE_FILE_FORMAT_PARAM, BASE_FILE_FORMAT_VALUE}, conf1}), Arguments.of((Object[])new Object[]{new String[]{TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, SOURCE_LIMIT_PARAM, SOURCE_LIMIT_VALUE}, conf2}), Arguments.of((Object[])new Object[]{new String[]{TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, ENABLE_HIVE_SYNC_PARAM}, conf3}), Arguments.of((Object[])new Object[]{new String[]{TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, HOODIE_CONF_PARAM, HOODIE_CONF_VALUE1}, conf4}), Arguments.of((Object[])new Object[]{new String[]{TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, HOODIE_CONF_PARAM, HOODIE_CONF_VALUE2}, conf5}), Arguments.of((Object[])new Object[]{new String[]{TGT_BASE_PATH_PARAM, TGT_BASE_PATH_VALUE, TABLE_TYPE_PARAM, TABLE_TYPE_VALUE, TARGET_TABLE_PARAM, TARGET_TABLE_VALUE, HOODIE_CONF_PARAM, HOODIE_CONF_VALUE1, HOODIE_CONF_PARAM, HOODIE_CONF_VALUE2}, conf6}), Arguments.of((Object[])new Object[]{allConfig, conf}));
    }

    @ParameterizedTest
    @MethodSource(value={"provideValidCliArgs"})
    public void testValidCommandLineArgs(String[] args, HoodieDeltaStreamer.Config expected) {
        Assertions.assertEquals((Object)expected, (Object)HoodieDeltaStreamer.getConfig((String[])args));
    }

    @Test
    public void testKafkaConnectCheckpointProvider() throws IOException {
        String tableBasePath = dfsBasePath + "/test_table";
        String bootstrapPath = dfsBasePath + "/kafka_topic1";
        String partitionPath = bootstrapPath + "/year=2016/month=05/day=01";
        String filePath = partitionPath + "/kafka_topic1+0+100+200.parquet";
        String checkpointProviderClass = "org.apache.hudi.utilities.checkpointing.KafkaConnectHdfsProvider";
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeDropAllConfig(tableBasePath, WriteOperationType.UPSERT);
        TypedProperties props = new DFSPropertiesConfiguration((FileSystem)dfs, new Path(dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getConfig();
        props.put((Object)"hoodie.deltastreamer.checkpoint.provider.path", (Object)bootstrapPath);
        cfg.initialCheckpointProvider = checkpointProviderClass;
        dfs.mkdirs(new Path(bootstrapPath));
        dfs.mkdirs(new Path(partitionPath));
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        UtilitiesTestBase.Helpers.saveParquetToDFS(UtilitiesTestBase.Helpers.toGenericRecords(dataGenerator.generateInserts("000", Integer.valueOf(100))), new Path(filePath));
        HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, this.jsc, (FileSystem)dfs, hdfsTestService.getHadoopConf(), Option.ofNullable((Object)props));
        Assertions.assertEquals((Object)"kafka_topic1,0:200", (Object)deltaStreamer.getConfig().checkpoint);
    }

    @Test
    public void testPropsWithInvalidKeyGenerator() throws Exception {
        Exception e = (Exception)Assertions.assertThrows(IOException.class, () -> {
            String tableBasePath = dfsBasePath + "/test_table";
            HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(TripsWithDistanceTransformer.class.getName()), PROPS_FILENAME_TEST_INVALID, false), this.jsc);
            deltaStreamer.sync();
        }, (String)"Should error out when setting the key generator class property to an invalid value");
        LOG.debug((Object)"Expected error during getting the key generator", (Throwable)e);
        Assertions.assertTrue((boolean)e.getMessage().contains("Could not load key generator class"));
    }

    @Test
    public void testTableCreation() throws Exception {
        Exception e = (Exception)Assertions.assertThrows(TableNotFoundException.class, () -> {
            dfs.mkdirs(new Path(dfsBasePath + "/not_a_table"));
            HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(TestHelpers.makeConfig(dfsBasePath + "/not_a_table", WriteOperationType.BULK_INSERT), this.jsc);
            deltaStreamer.sync();
        }, (String)"Should error out when pointed out at a dir thats not a table");
        LOG.debug((Object)"Expected error during table creation", (Throwable)e);
    }

    @Test
    public void testBulkInsertsAndUpsertsWithBootstrap() throws Exception {
        String tableBasePath = dfsBasePath + "/test_table";
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
        new HoodieDeltaStreamer(cfg, this.jsc).sync();
        TestHelpers.assertRecordCount(1000L, tableBasePath + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertDistanceCount(1000L, tableBasePath + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertCommitMetadata("00000", tableBasePath, (FileSystem)dfs, 1);
        cfg.sourceLimit = 0L;
        new HoodieDeltaStreamer(cfg, this.jsc).sync();
        TestHelpers.assertRecordCount(1000L, tableBasePath + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertDistanceCount(1000L, tableBasePath + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertCommitMetadata("00000", tableBasePath, (FileSystem)dfs, 1);
        cfg.sourceLimit = 2000L;
        cfg.operation = WriteOperationType.UPSERT;
        new HoodieDeltaStreamer(cfg, this.jsc).sync();
        TestHelpers.assertRecordCount(1950L, tableBasePath + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertDistanceCount(1950L, tableBasePath + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertCommitMetadata("00001", tableBasePath, (FileSystem)dfs, 2);
        List<Row> counts = TestHelpers.countsPerCommit(tableBasePath + "/*/*.parquet", this.sqlContext);
        Assertions.assertEquals((long)1950L, (long)counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
        String bootstrapSourcePath = dfsBasePath + "/src_bootstrapped";
        Dataset sourceDf = this.sqlContext.read().format("org.apache.hudi").load(tableBasePath + "/*/*.parquet");
        sourceDf.write().format("parquet").save(bootstrapSourcePath);
        String newDatasetBasePath = dfsBasePath + "/test_dataset_bootstrapped";
        cfg.runBootstrap = true;
        cfg.configs.add(String.format("hoodie.bootstrap.base.path=%s", bootstrapSourcePath));
        cfg.configs.add(String.format("hoodie.bootstrap.keygen.class=%s", SimpleKeyGenerator.class.getName()));
        cfg.configs.add("hoodie.bootstrap.parallelism=5");
        cfg.targetBasePath = newDatasetBasePath;
        new HoodieDeltaStreamer(cfg, this.jsc).sync();
        Dataset res = this.sqlContext.read().format("org.apache.hudi").load(newDatasetBasePath + "/*.parquet");
        LOG.info((Object)"Schema :");
        res.printSchema();
        TestHelpers.assertRecordCount(1950L, newDatasetBasePath + "/*.parquet", this.sqlContext);
        res.registerTempTable("bootstrapped");
        Assertions.assertEquals((long)1950L, (long)this.sqlContext.sql("select distinct _hoodie_record_key from bootstrapped").count());
        StructField[] fields = res.schema().fields();
        List<String> fieldNames = Arrays.asList(res.schema().fieldNames());
        List<String> expectedFieldNames = Arrays.asList(sourceDf.schema().fieldNames());
        Assertions.assertEquals((int)expectedFieldNames.size(), (int)fields.length);
        Assertions.assertTrue((boolean)fieldNames.containsAll(HoodieRecord.HOODIE_META_COLUMNS));
        Assertions.assertTrue((boolean)fieldNames.containsAll(expectedFieldNames));
    }

    @Test
    public void testUpsertsCOWContinuousMode() throws Exception {
        this.testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow");
    }

    @Test
    public void testUpsertsMORContinuousMode() throws Exception {
        this.testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor");
    }

    private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir) throws Exception {
        String tableBasePath = dfsBasePath + "/" + tempDir;
        int totalRecords = 3000;
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
        cfg.continuousMode = true;
        cfg.tableType = tableType.name();
        cfg.configs.add(String.format("%s=%d", "hoodie.deltastreamer.source.test.max_unique_records", totalRecords));
        cfg.configs.add(String.format("%s=false", "hoodie.clean.automatic"));
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, this.jsc);
        this.deltaStreamerTestRunner(ds, cfg, r -> {
            if (tableType.equals((Object)HoodieTableType.MERGE_ON_READ)) {
                TestHelpers.assertAtleastNDeltaCommits(5, tableBasePath, (FileSystem)dfs);
                TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, (FileSystem)dfs);
            } else {
                TestHelpers.assertAtleastNCompactionCommits(5, tableBasePath, (FileSystem)dfs);
            }
            TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", this.sqlContext);
            TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", this.sqlContext);
            return true;
        });
    }

    private void deltaStreamerTestRunner(HoodieDeltaStreamer ds, HoodieDeltaStreamer.Config cfg, Function<Boolean, Boolean> condition) throws Exception {
        Future<?> dsFuture = Executors.newSingleThreadExecutor().submit(() -> {
            try {
                ds.sync();
            }
            catch (Exception ex) {
                throw new RuntimeException(ex.getMessage(), ex);
            }
        });
        TestHelpers.waitTillCondition(condition, 240L);
        ds.shutdownGracefully();
        dsFuture.get();
    }

    @Test
    public void testInlineClustering() throws Exception {
        String tableBasePath = dfsBasePath + "/inlineClustering";
        int totalRecords = 3000;
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
        cfg.continuousMode = true;
        cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
        cfg.configs.add(String.format("%s=%d", "hoodie.deltastreamer.source.test.max_unique_records", totalRecords));
        cfg.configs.add(String.format("%s=false", "hoodie.clean.automatic"));
        cfg.configs.add(String.format("%s=%s", "hoodie.clustering.inline", "true"));
        cfg.configs.add(String.format("%s=%s", "hoodie.clustering.inline.max.commits", "2"));
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, this.jsc);
        this.deltaStreamerTestRunner(ds, cfg, r -> {
            HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), tableBasePath, true);
            int pendingReplaceSize = metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().toArray().length;
            int completeReplaceSize = metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length;
            LOG.info((Object)("PendingReplaceSize=" + pendingReplaceSize + ",completeReplaceSize = " + completeReplaceSize));
            return completeReplaceSize > 0;
        });
        HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), tableBasePath, true);
        Assertions.assertEquals((int)1, (int)metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length);
    }

    private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePath, String clusteringInstantTime, boolean runSchedule) {
        HoodieClusteringJob.Config config = new HoodieClusteringJob.Config();
        config.basePath = basePath;
        config.clusteringInstantTime = clusteringInstantTime;
        config.runSchedule = runSchedule;
        config.propsFilePath = dfsBasePath + "/clusteringjob.properties";
        return config;
    }

    @Test
    public void testHoodieAsyncClusteringJob() throws Exception {
        String tableBasePath = dfsBasePath + "/asyncClustering";
        int totalRecords = 3000;
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT);
        cfg.continuousMode = true;
        cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
        cfg.configs.add(String.format("%s=%d", "hoodie.deltastreamer.source.test.max_unique_records", totalRecords));
        cfg.configs.add(String.format("%s=false", "hoodie.clean.automatic"));
        cfg.configs.add(String.format("%s=true", "hoodie.clustering.async.enabled"));
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, this.jsc);
        this.deltaStreamerTestRunner(ds, cfg, r -> {
            TestHelpers.assertAtLeastNCommits(2, tableBasePath, (FileSystem)dfs);
            HoodieClusteringJob.Config scheduleClusteringConfig = this.buildHoodieClusteringUtilConfig(tableBasePath, null, true);
            HoodieClusteringJob scheduleClusteringJob = new HoodieClusteringJob(this.jsc, scheduleClusteringConfig);
            Option scheduleClusteringInstantTime = Option.empty();
            try {
                scheduleClusteringInstantTime = scheduleClusteringJob.doSchedule();
            }
            catch (Exception e) {
                LOG.warn((Object)"Schedule clustering failed", (Throwable)e);
                return false;
            }
            if (scheduleClusteringInstantTime.isPresent()) {
                LOG.info((Object)("Schedule clustering success, now cluster with instant time " + (String)scheduleClusteringInstantTime.get()));
                HoodieClusteringJob.Config clusterClusteringConfig = this.buildHoodieClusteringUtilConfig(tableBasePath, (String)scheduleClusteringInstantTime.get(), false);
                HoodieClusteringJob clusterClusteringJob = new HoodieClusteringJob(this.jsc, clusterClusteringConfig);
                clusterClusteringJob.cluster(clusterClusteringConfig.retry);
                LOG.info((Object)"Cluster success");
            } else {
                LOG.warn((Object)"Schedule clustering failed");
            }
            HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), tableBasePath, true);
            int pendingReplaceSize = metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().toArray().length;
            int completeReplaceSize = metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length;
            System.out.println("PendingReplaceSize=" + pendingReplaceSize + ",completeReplaceSize = " + completeReplaceSize);
            return completeReplaceSize > 0;
        });
        HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), tableBasePath, true);
        Assertions.assertEquals((int)1, (int)metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length);
    }

    @Test
    public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() throws Exception {
        String tableBasePath = dfsBasePath + "/test_table2";
        String downstreamTableBasePath = dfsBasePath + "/test_downstream_table2";
        HiveSyncConfig hiveSyncConfig = TestHoodieDeltaStreamer.getHiveSyncConfig(tableBasePath, "hive_trips");
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true);
        new HoodieDeltaStreamer(cfg, this.jsc, (FileSystem)dfs, (Configuration)hiveServer.getHiveConf()).sync();
        TestHelpers.assertRecordCount(1000L, tableBasePath + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertDistanceCount(1000L, tableBasePath + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertDistanceCountWithExactValue(1000L, tableBasePath + "/*/*.parquet", this.sqlContext);
        String lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00000", tableBasePath, (FileSystem)dfs, 1);
        HoodieDeltaStreamer.Config downstreamCfg = TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath, WriteOperationType.BULK_INSERT, true, null);
        new HoodieDeltaStreamer(downstreamCfg, this.jsc, (FileSystem)dfs, (Configuration)hiveServer.getHiveConf()).sync();
        TestHelpers.assertRecordCount(1000L, downstreamTableBasePath + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertDistanceCount(1000L, downstreamTableBasePath + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertDistanceCountWithExactValue(1000L, downstreamTableBasePath + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, downstreamTableBasePath, (FileSystem)dfs, 1);
        cfg.sourceLimit = 0L;
        new HoodieDeltaStreamer(cfg, this.jsc, (FileSystem)dfs, (Configuration)hiveServer.getHiveConf()).sync();
        TestHelpers.assertRecordCount(1000L, tableBasePath + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertDistanceCount(1000L, tableBasePath + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertDistanceCountWithExactValue(1000L, tableBasePath + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertCommitMetadata("00000", tableBasePath, (FileSystem)dfs, 1);
        HoodieDeltaStreamer.Config downstreamCfg1 = TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath, WriteOperationType.BULK_INSERT, true, DummySchemaProvider.class.getName());
        new HoodieDeltaStreamer(downstreamCfg1, this.jsc).sync();
        TestHelpers.assertRecordCount(1000L, downstreamTableBasePath + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertDistanceCount(1000L, downstreamTableBasePath + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertDistanceCountWithExactValue(1000L, downstreamTableBasePath + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, downstreamTableBasePath, (FileSystem)dfs, 1);
        cfg.sourceLimit = 2000L;
        cfg.operation = WriteOperationType.UPSERT;
        new HoodieDeltaStreamer(cfg, this.jsc, (FileSystem)dfs, (Configuration)hiveServer.getHiveConf()).sync();
        TestHelpers.assertRecordCount(1950L, tableBasePath + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertDistanceCount(1950L, tableBasePath + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertDistanceCountWithExactValue(1950L, tableBasePath + "/*/*.parquet", this.sqlContext);
        lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00001", tableBasePath, (FileSystem)dfs, 2);
        List<Row> counts = TestHelpers.countsPerCommit(tableBasePath + "/*/*.parquet", this.sqlContext);
        Assertions.assertEquals((long)1950L, (long)counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
        downstreamCfg = TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath, WriteOperationType.UPSERT, false, null);
        downstreamCfg.sourceLimit = 2000L;
        new HoodieDeltaStreamer(downstreamCfg, this.jsc).sync();
        TestHelpers.assertRecordCount(2000L, downstreamTableBasePath + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertDistanceCount(2000L, downstreamTableBasePath + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertDistanceCountWithExactValue(2000L, downstreamTableBasePath + "/*/*.parquet", this.sqlContext);
        String finalInstant = TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, downstreamTableBasePath, (FileSystem)dfs, 2);
        counts = TestHelpers.countsPerCommit(downstreamTableBasePath + "/*/*.parquet", this.sqlContext);
        Assertions.assertEquals((long)2000L, (long)counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
        HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, hiveServer.getHiveConf(), (FileSystem)dfs);
        Assertions.assertTrue((boolean)hiveClient.doesTableExist(hiveSyncConfig.tableName), (String)("Table " + hiveSyncConfig.tableName + " should exist"));
        Assertions.assertEquals((int)1, (int)hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), (String)"Table partitions should match the number of partitions we wrote");
        Assertions.assertEquals((Object)lastInstantForUpstreamTable, (Object)hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(), (String)"The last commit that was sycned should be updated in the TBLPROPERTIES");
    }

    @Test
    public void testNullSchemaProvider() throws Exception {
        String tableBasePath = dfsBasePath + "/test_table";
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true, false, false, null, null);
        Exception e = (Exception)Assertions.assertThrows(HoodieException.class, () -> new HoodieDeltaStreamer(cfg, this.jsc, (FileSystem)dfs, (Configuration)hiveServer.getHiveConf()).sync(), (String)"Should error out when schema provider is not provided");
        LOG.debug((Object)"Expected error during reading data from source ", (Throwable)e);
        Assertions.assertTrue((boolean)e.getMessage().contains("Please provide a valid schema provider class!"));
    }

    @Test
    public void testPayloadClassUpdate() throws Exception {
        String dataSetBasePath = dfsBasePath + "/test_dataset_mor";
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true, true, false, null, "MERGE_ON_READ");
        new HoodieDeltaStreamer(cfg, this.jsc, (FileSystem)dfs, (Configuration)hiveServer.getHiveConf()).sync();
        TestHelpers.assertRecordCount(1000L, dataSetBasePath + "/*/*.parquet", this.sqlContext);
        cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true, true, true, DummyAvroPayload.class.getName(), "MERGE_ON_READ");
        new HoodieDeltaStreamer(cfg, this.jsc, (FileSystem)dfs, (Configuration)hiveServer.getHiveConf());
        Properties props = new Properties();
        String metaPath = dataSetBasePath + "/.hoodie/hoodie.properties";
        FileSystem fs = FSUtils.getFs((String)cfg.targetBasePath, (Configuration)this.jsc.hadoopConfiguration());
        try (FSDataInputStream inputStream = fs.open(new Path(metaPath));){
            props.load((InputStream)inputStream);
        }
        Assertions.assertEquals((Object)props.getProperty("hoodie.compaction.payload.class"), (Object)DummyAvroPayload.class.getName());
    }

    @Test
    public void testPayloadClassUpdateWithCOWTable() throws Exception {
        String dataSetBasePath = dfsBasePath + "/test_dataset_cow";
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true, true, false, null, null);
        new HoodieDeltaStreamer(cfg, this.jsc, (FileSystem)dfs, (Configuration)hiveServer.getHiveConf()).sync();
        TestHelpers.assertRecordCount(1000L, dataSetBasePath + "/*/*.parquet", this.sqlContext);
        cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true, true, true, DummyAvroPayload.class.getName(), null);
        new HoodieDeltaStreamer(cfg, this.jsc, (FileSystem)dfs, (Configuration)hiveServer.getHiveConf());
        Properties props = new Properties();
        String metaPath = dataSetBasePath + "/.hoodie/hoodie.properties";
        FileSystem fs = FSUtils.getFs((String)cfg.targetBasePath, (Configuration)this.jsc.hadoopConfiguration());
        try (FSDataInputStream inputStream = fs.open(new Path(metaPath));){
            props.load((InputStream)inputStream);
        }
        Assertions.assertFalse((boolean)props.containsKey("hoodie.compaction.payload.class"));
    }

    @Test
    public void testFilterDupes() throws Exception {
        String tableBasePath = dfsBasePath + "/test_dupes_table";
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
        new HoodieDeltaStreamer(cfg, this.jsc).sync();
        TestHelpers.assertRecordCount(1000L, tableBasePath + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertCommitMetadata("00000", tableBasePath, (FileSystem)dfs, 1);
        cfg.filterDupes = true;
        cfg.sourceLimit = 2000L;
        cfg.operation = WriteOperationType.INSERT;
        new HoodieDeltaStreamer(cfg, this.jsc).sync();
        TestHelpers.assertRecordCount(2000L, tableBasePath + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertCommitMetadata("00001", tableBasePath, (FileSystem)dfs, 2);
        List<Row> counts = TestHelpers.countsPerCommit(tableBasePath + "/*/*.parquet", this.sqlContext);
        Assertions.assertEquals((long)1000L, (long)counts.get(0).getLong(1));
        Assertions.assertEquals((long)1000L, (long)counts.get(1).getLong(1));
        HoodieTableMetaClient mClient = new HoodieTableMetaClient(this.jsc.hadoopConfiguration(), tableBasePath, true);
        HoodieInstant lastFinished = (HoodieInstant)mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
        HoodieDeltaStreamer.Config cfg2 = TestHelpers.makeDropAllConfig(tableBasePath, WriteOperationType.UPSERT);
        cfg2.filterDupes = false;
        cfg2.sourceLimit = 2000L;
        cfg2.operation = WriteOperationType.UPSERT;
        cfg2.configs.add(String.format("%s=false", "hoodie.clean.automatic"));
        HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg2, this.jsc);
        ds2.sync();
        mClient = new HoodieTableMetaClient(this.jsc.hadoopConfiguration(), tableBasePath, true);
        HoodieInstant newLastFinished = (HoodieInstant)mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
        Assertions.assertTrue((boolean)HoodieTimeline.compareTimestamps((String)newLastFinished.getTimestamp(), (BiPredicate)HoodieTimeline.GREATER_THAN, (String)lastFinished.getTimestamp()));
        HoodieCommitMetadata commitMetadata = (HoodieCommitMetadata)HoodieCommitMetadata.fromBytes((byte[])((byte[])mClient.getActiveTimeline().getInstantDetails(newLastFinished).get()), HoodieCommitMetadata.class);
        System.out.println("New Commit Metadata=" + commitMetadata);
        Assertions.assertTrue((boolean)commitMetadata.getPartitionToWriteStats().isEmpty());
        cfg2.filterDupes = true;
        cfg2.operation = WriteOperationType.UPSERT;
        try {
            new HoodieDeltaStreamer(cfg2, this.jsc).sync();
        }
        catch (IllegalArgumentException e) {
            Assertions.assertTrue((boolean)e.getMessage().contains("'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed."));
        }
    }

    @Test
    public void testDistributedTestDataSource() {
        TypedProperties props = new TypedProperties();
        props.setProperty("hoodie.deltastreamer.source.test.max_unique_records", "1000");
        props.setProperty("hoodie.deltastreamer.source.test.num_partitions", "1");
        props.setProperty("hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys", "true");
        DistributedTestDataSource distributedTestDataSource = new DistributedTestDataSource(props, this.jsc, this.sparkSession, null);
        InputBatch batch = distributedTestDataSource.fetchNext(Option.empty(), 10000000L);
        ((JavaRDD)batch.getBatch().get()).cache();
        long c = ((JavaRDD)batch.getBatch().get()).count();
        Assertions.assertEquals((long)1000L, (long)c);
    }

    private static void prepareParquetDFSFiles(int numRecords) throws IOException {
        String path = PARQUET_SOURCE_ROOT + "/1.parquet";
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        UtilitiesTestBase.Helpers.saveParquetToDFS(UtilitiesTestBase.Helpers.toGenericRecords(dataGenerator.generateInserts("000", Integer.valueOf(numRecords))), new Path(path));
    }

    private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer) throws IOException {
        TypedProperties parquetProps = new TypedProperties();
        parquetProps.setProperty("include", "base.properties");
        parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
        if (useSchemaProvider) {
            parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
            if (hasTransformer) {
                parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
            }
        }
        parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", PARQUET_SOURCE_ROOT);
        UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, (FileSystem)dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_PARQUET);
    }

    private void testParquetDFSSource(boolean useSchemaProvider, List<String> transformerClassNames) throws Exception {
        this.prepareParquetDFSSource(useSchemaProvider, transformerClassNames != null);
        String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum;
        HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false, useSchemaProvider, 100000, false, null, null, "timestamp"), this.jsc);
        deltaStreamer.sync();
        TestHelpers.assertRecordCount(5L, tableBasePath + "/*/*.parquet", this.sqlContext);
        ++testNum;
    }

    @Test
    public void testParquetDFSSourceWithoutSchemaProviderAndNoTransformer() throws Exception {
        this.testParquetDFSSource(false, null);
    }

    @Test
    public void testParquetDFSSourceWithoutSchemaProviderAndTransformer() throws Exception {
        this.testParquetDFSSource(false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
    }

    @Test
    public void testParquetDFSSourceWithSourceSchemaFileAndNoTransformer() throws Exception {
        this.testParquetDFSSource(true, null);
    }

    @Test
    public void testParquetDFSSourceWithSchemaFilesAndTransformer() throws Exception {
        this.testParquetDFSSource(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
    }

    private void prepareCsvDFSSource(boolean hasHeader, char sep, boolean useSchemaProvider, boolean hasTransformer) throws IOException {
        String sourceRoot = dfsBasePath + "/csvFiles";
        String recordKeyField = hasHeader || useSchemaProvider ? "_row_key" : "_c0";
        TypedProperties csvProps = new TypedProperties();
        csvProps.setProperty("include", "base.properties");
        csvProps.setProperty("hoodie.datasource.write.recordkey.field", recordKeyField);
        csvProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
        if (useSchemaProvider) {
            csvProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source-flattened.avsc");
            if (hasTransformer) {
                csvProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target-flattened.avsc");
            }
        }
        csvProps.setProperty("hoodie.deltastreamer.source.dfs.root", sourceRoot);
        if (sep != ',') {
            if (sep == '\t') {
                csvProps.setProperty("hoodie.deltastreamer.csv.sep", "\\t");
            } else {
                csvProps.setProperty("hoodie.deltastreamer.csv.sep", Character.toString(sep));
            }
        }
        if (hasHeader) {
            csvProps.setProperty("hoodie.deltastreamer.csv.header", Boolean.toString(hasHeader));
        }
        UtilitiesTestBase.Helpers.savePropsToDFS(csvProps, (FileSystem)dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_CSV);
        String path = sourceRoot + "/1.csv";
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        UtilitiesTestBase.Helpers.saveCsvToDFS(hasHeader, sep, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("000", Integer.valueOf(3), true)), (FileSystem)dfs, path);
    }

    private void testCsvDFSSource(boolean hasHeader, char sep, boolean useSchemaProvider, List<String> transformerClassNames) throws Exception {
        this.prepareCsvDFSSource(hasHeader, sep, useSchemaProvider, transformerClassNames != null);
        String tableBasePath = dfsBasePath + "/test_csv_table" + testNum;
        String sourceOrderingField = hasHeader || useSchemaProvider ? "timestamp" : "_c0";
        HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, CsvDFSSource.class.getName(), transformerClassNames, PROPS_FILENAME_TEST_CSV, false, useSchemaProvider, 1000, false, null, null, sourceOrderingField), this.jsc);
        deltaStreamer.sync();
        TestHelpers.assertRecordCount(3L, tableBasePath + "/*/*.parquet", this.sqlContext);
        ++testNum;
    }

    @Test
    public void testCsvDFSSourceWithHeaderWithoutSchemaProviderAndNoTransformer() throws Exception {
        this.testCsvDFSSource(true, ',', false, null);
    }

    @Test
    public void testCsvDFSSourceWithHeaderAndSepWithoutSchemaProviderAndNoTransformer() throws Exception {
        this.testCsvDFSSource(true, '\t', false, null);
    }

    @Test
    public void testCsvDFSSourceWithHeaderAndSepWithSchemaProviderAndNoTransformer() throws Exception {
        this.testCsvDFSSource(true, '\t', true, null);
    }

    @Test
    public void testCsvDFSSourceWithHeaderAndSepWithoutSchemaProviderAndWithTransformer() throws Exception {
        this.testCsvDFSSource(true, '\t', false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
    }

    @Test
    public void testCsvDFSSourceWithHeaderAndSepWithSchemaProviderAndTransformer() throws Exception {
        this.testCsvDFSSource(true, '\t', true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
    }

    @Test
    public void testCsvDFSSourceNoHeaderWithoutSchemaProviderAndNoTransformer() throws Exception {
        this.testCsvDFSSource(false, '\t', false, null);
    }

    @Test
    public void testCsvDFSSourceNoHeaderWithSchemaProviderAndNoTransformer() throws Exception {
        this.testCsvDFSSource(false, '\t', true, null);
    }

    @Test
    public void testCsvDFSSourceNoHeaderWithoutSchemaProviderAndWithTransformer() throws Exception {
        Exception e = (Exception)Assertions.assertThrows(AnalysisException.class, () -> this.testCsvDFSSource(false, '\t', false, Collections.singletonList(TripsWithDistanceTransformer.class.getName())), (String)"Should error out when doing the transformation.");
        LOG.debug((Object)"Expected error during transformation", (Throwable)e);
        Assertions.assertTrue((boolean)e.getMessage().contains("cannot resolve '`begin_lat`' given input columns:"));
    }

    @Test
    public void testCsvDFSSourceNoHeaderWithSchemaProviderAndTransformer() throws Exception {
        this.testCsvDFSSource(false, '\t', true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
    }

    static {
        LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class);
        testNum = 1;
    }

    public static class DropAllTransformer
    implements Transformer {
        public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) {
            System.out.println("DropAllTransformer called !!");
            return sparkSession.createDataFrame(jsc.emptyRDD(), rowDataset.schema());
        }
    }

    public static class DummyAvroPayload
    extends OverwriteWithLatestAvroPayload {
        public DummyAvroPayload(GenericRecord gr, Comparable orderingVal) {
            super(gr, orderingVal);
        }
    }

    public static class TestGenerator
    extends SimpleKeyGenerator {
        public TestGenerator(TypedProperties props) {
            super(props);
        }
    }

    public static class TripsWithDistanceTransformer
    implements Transformer {
        public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) {
            rowDataset.sqlContext().udf().register("distance_udf", (UDF4)new DistanceUDF(), DataTypes.DoubleType);
            return rowDataset.withColumn("haversine_distance", functions.callUDF((String)"distance_udf", (Column[])new Column[]{functions.col((String)"begin_lat"), functions.col((String)"end_lat"), functions.col((String)"begin_lon"), functions.col((String)"end_lat")}));
        }
    }

    public static class DistanceUDF
    implements UDF4<Double, Double, Double, Double, Double> {
        public Double call(Double lat1, Double lat2, Double lon1, Double lon2) {
            return RANDOM.nextDouble();
        }
    }

    static class TestHelpers {
        TestHelpers() {
        }

        static HoodieDeltaStreamer.Config makeDropAllConfig(String basePath, WriteOperationType op) {
            return TestHelpers.makeConfig(basePath, op, Collections.singletonList(DropAllTransformer.class.getName()));
        }

        static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op) {
            return TestHelpers.makeConfig(basePath, op, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
        }

        static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, List<String> transformerClassNames) {
            return TestHelpers.makeConfig(basePath, op, transformerClassNames, TestHoodieDeltaStreamer.PROPS_FILENAME_TEST_SOURCE, false);
        }

        static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, List<String> transformerClassNames, String propsFilename, boolean enableHiveSync) {
            return TestHelpers.makeConfig(basePath, op, transformerClassNames, propsFilename, enableHiveSync, true, false, null, null);
        }

        static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, List<String> transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, boolean updatePayloadClass, String payloadClassName, String tableType) {
            return TestHelpers.makeConfig(basePath, op, TestDataSource.class.getName(), transformerClassNames, propsFilename, enableHiveSync, useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, tableType, "timestamp");
        }

        static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, String sourceClassName, List<String> transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField) {
            HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
            cfg.targetBasePath = basePath;
            cfg.targetTableName = "hoodie_trips";
            cfg.tableType = tableType == null ? TestHoodieDeltaStreamer.TABLE_TYPE_VALUE : tableType;
            cfg.sourceClassName = sourceClassName;
            cfg.transformerClassNames = transformerClassNames;
            cfg.operation = op;
            cfg.enableHiveSync = enableHiveSync;
            cfg.sourceOrderingField = sourceOrderingField;
            cfg.propsFilePath = dfsBasePath + "/" + propsFilename;
            cfg.sourceLimit = sourceLimit;
            if (updatePayloadClass) {
                cfg.payloadClassName = payloadClassName;
            }
            if (useSchemaProviderClass) {
                cfg.schemaProviderClassName = FilebasedSchemaProvider.class.getName();
            }
            return cfg;
        }

        static HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc(String srcBasePath, String basePath, WriteOperationType op, boolean addReadLatestOnMissingCkpt, String schemaProviderClassName) {
            HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
            cfg.targetBasePath = basePath;
            cfg.targetTableName = "hoodie_trips_copy";
            cfg.tableType = TestHoodieDeltaStreamer.TABLE_TYPE_VALUE;
            cfg.sourceClassName = HoodieIncrSource.class.getName();
            cfg.operation = op;
            cfg.sourceOrderingField = "timestamp";
            cfg.propsFilePath = dfsBasePath + "/test-downstream-source.properties";
            cfg.sourceLimit = 1000L;
            if (null != schemaProviderClassName) {
                cfg.schemaProviderClassName = schemaProviderClassName;
            }
            ArrayList<String> cfgs = new ArrayList<String>();
            cfgs.add("hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=" + addReadLatestOnMissingCkpt);
            cfgs.add("hoodie.deltastreamer.source.hoodieincr.path=" + srcBasePath);
            cfgs.add("hoodie.deltastreamer.source.hoodieincr.partition.fields=datestr");
            cfg.configs = cfgs;
            return cfg;
        }

        static void assertRecordCount(long expected, String tablePath, SQLContext sqlContext) {
            long recordCount = sqlContext.read().format("org.apache.hudi").load(tablePath).count();
            Assertions.assertEquals((long)expected, (long)recordCount);
        }

        static List<Row> countsPerCommit(String tablePath, SQLContext sqlContext) {
            return sqlContext.read().format("org.apache.hudi").load(tablePath).groupBy("_hoodie_commit_time", new String[0]).count().sort("_hoodie_commit_time", new String[0]).collectAsList();
        }

        static void assertDistanceCount(long expected, String tablePath, SQLContext sqlContext) {
            sqlContext.read().format("org.apache.hudi").load(tablePath).registerTempTable("tmp_trips");
            long recordCount = sqlContext.sparkSession().sql("select * from tmp_trips where haversine_distance is not NULL").count();
            Assertions.assertEquals((long)expected, (long)recordCount);
        }

        static void assertDistanceCountWithExactValue(long expected, String tablePath, SQLContext sqlContext) {
            sqlContext.read().format("org.apache.hudi").load(tablePath).registerTempTable("tmp_trips");
            long recordCount = sqlContext.sparkSession().sql("select * from tmp_trips where haversine_distance = 1.0").count();
            Assertions.assertEquals((long)expected, (long)recordCount);
        }

        static void assertAtleastNCompactionCommits(int minExpected, String tablePath, FileSystem fs) {
            HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), tablePath);
            HoodieTimeline timeline = meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
            LOG.info((Object)("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList())));
            int numCompactionCommits = (int)timeline.getInstants().count();
            Assertions.assertTrue((minExpected <= numCompactionCommits ? 1 : 0) != 0, (String)("Got=" + numCompactionCommits + ", exp >=" + minExpected));
        }

        static void assertAtleastNDeltaCommits(int minExpected, String tablePath, FileSystem fs) {
            HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), tablePath);
            HoodieTimeline timeline = meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
            LOG.info((Object)("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList())));
            int numDeltaCommits = (int)timeline.getInstants().count();
            Assertions.assertTrue((minExpected <= numDeltaCommits ? 1 : 0) != 0, (String)("Got=" + numDeltaCommits + ", exp >=" + minExpected));
        }

        static String assertCommitMetadata(String expected, String tablePath, FileSystem fs, int totalCommits) throws IOException {
            HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), tablePath);
            HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
            HoodieInstant lastInstant = (HoodieInstant)timeline.lastInstant().get();
            HoodieCommitMetadata commitMetadata = (HoodieCommitMetadata)HoodieCommitMetadata.fromBytes((byte[])((byte[])timeline.getInstantDetails(lastInstant).get()), HoodieCommitMetadata.class);
            Assertions.assertEquals((int)totalCommits, (int)timeline.countInstants());
            Assertions.assertEquals((Object)expected, (Object)commitMetadata.getMetadata("deltastreamer.checkpoint.key"));
            return lastInstant.getTimestamp();
        }

        static void waitTillCondition(Function<Boolean, Boolean> condition, long timeoutInSecs) throws Exception {
            Future<Boolean> res = Executors.newSingleThreadExecutor().submit(() -> {
                boolean ret = false;
                while (!ret) {
                    try {
                        Thread.sleep(3000L);
                        ret = (Boolean)condition.apply(true);
                    }
                    catch (Throwable error) {
                        LOG.warn((Object)"Got error :", error);
                        ret = false;
                    }
                }
                return true;
            });
            res.get(timeoutInSecs, TimeUnit.SECONDS);
        }

        static void assertAtLeastNCommits(int minExpected, String tablePath, FileSystem fs) {
            HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), tablePath);
            HoodieTimeline timeline = meta.getActiveTimeline().filterCompletedInstants();
            LOG.info((Object)("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList())));
            int numDeltaCommits = (int)timeline.getInstants().count();
            Assertions.assertTrue((minExpected <= numDeltaCommits ? 1 : 0) != 0, (String)("Got=" + numDeltaCommits + ", exp >=" + minExpected));
        }
    }
}

