/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.functional;

import java.io.IOException;
import java.util.List;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.TableExecutionContext;
import org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.JsonKafkaSource;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

public class TestHoodieMultiTableDeltaStreamer
extends TestHoodieDeltaStreamer {
    private static volatile Logger log = LogManager.getLogger(TestHoodieMultiTableDeltaStreamer.class);

    @Test
    public void testInvalidHiveSyncProps() throws IOException {
        HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig("test-invalid-hive-sync-source1.properties", dfsBasePath + "/config", TestDataSource.class.getName(), true);
        Exception e = (Exception)Assertions.assertThrows(HoodieException.class, () -> new HoodieMultiTableDeltaStreamer(cfg, this.jsc), (String)"Should fail when hive sync table not provided with enableHiveSync flag");
        log.debug((Object)"Expected error when creating table execution objects", (Throwable)e);
        Assertions.assertTrue((boolean)e.getMessage().contains("Hive sync table field not provided!"));
    }

    @Test
    public void testInvalidPropsFilePath() throws IOException {
        HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig("test-invalid-props.properties", dfsBasePath + "/config", TestDataSource.class.getName(), true);
        Exception e = (Exception)Assertions.assertThrows(IllegalArgumentException.class, () -> new HoodieMultiTableDeltaStreamer(cfg, this.jsc), (String)"Should fail when invalid props file is provided");
        log.debug((Object)"Expected error when creating table execution objects", (Throwable)e);
        Assertions.assertTrue((boolean)e.getMessage().contains("Please provide valid common config file path!"));
    }

    @Test
    public void testInvalidTableConfigFilePath() throws IOException {
        HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig("test-invalid-table-config.properties", dfsBasePath + "/config", TestDataSource.class.getName(), true);
        Exception e = (Exception)Assertions.assertThrows(IllegalArgumentException.class, () -> new HoodieMultiTableDeltaStreamer(cfg, this.jsc), (String)"Should fail when invalid table config props file path is provided");
        log.debug((Object)"Expected error when creating table execution objects", (Throwable)e);
        Assertions.assertTrue((boolean)e.getMessage().contains("Please provide valid table config file path!"));
    }

    @Test
    public void testCustomConfigProps() throws IOException {
        HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig("test-source1.properties", dfsBasePath + "/config", TestDataSource.class.getName(), false);
        HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, this.jsc);
        TableExecutionContext executionContext = (TableExecutionContext)streamer.getTableExecutionContexts().get(1);
        Assertions.assertEquals((int)2, (int)streamer.getTableExecutionContexts().size());
        Assertions.assertEquals((Object)(dfsBasePath + "/multi_table_dataset/uber_db/dummy_table_uber"), (Object)executionContext.getConfig().targetBasePath);
        Assertions.assertEquals((Object)"uber_db.dummy_table_uber", (Object)executionContext.getConfig().targetTableName);
        Assertions.assertEquals((Object)"topic1", (Object)executionContext.getProperties().getString("hoodie.deltastreamer.source.kafka.topic"));
        Assertions.assertEquals((Object)"_row_key", (Object)executionContext.getProperties().getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()));
        Assertions.assertEquals((Object)TestHoodieDeltaStreamer.TestGenerator.class.getName(), (Object)executionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY()));
        Assertions.assertEquals((Object)"uber_hive_dummy_table", (Object)executionContext.getProperties().getString("hoodie.datasource.hive_sync.table"));
    }

    @Test
    @Disabled
    public void testInvalidIngestionProps() {
        Exception e = (Exception)Assertions.assertThrows(Exception.class, () -> {
            HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig("test-source1.properties", dfsBasePath + "/config", TestDataSource.class.getName(), true);
            new HoodieMultiTableDeltaStreamer(cfg, this.jsc);
        }, (String)"Creation of execution object should fail without kafka topic");
        log.debug((Object)("Creation of execution object failed with error: " + e.getMessage()), (Throwable)e);
        Assertions.assertTrue((boolean)e.getMessage().contains("Please provide valid table config arguments!"));
    }

    @Test
    public void testMultiTableExecution() throws IOException {
        testUtils.createTopic("topic1", 2);
        testUtils.createTopic("topic2", 2);
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        testUtils.sendMessages("topic1", UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", Integer.valueOf(5), "{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}")));
        testUtils.sendMessages("topic2", UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", Integer.valueOf(10), "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}")));
        HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig("test-source1.properties", dfsBasePath + "/config", JsonKafkaSource.class.getName(), false);
        HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, this.jsc);
        List executionContexts = streamer.getTableExecutionContexts();
        TypedProperties properties = ((TableExecutionContext)executionContexts.get(1)).getProperties();
        properties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source_uber.avsc");
        properties.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target_uber.avsc");
        ((TableExecutionContext)executionContexts.get(1)).setProperties(properties);
        TypedProperties properties1 = ((TableExecutionContext)executionContexts.get(0)).getProperties();
        properties1.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source_short_trip_uber.avsc");
        properties1.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target_short_trip_uber.avsc");
        ((TableExecutionContext)executionContexts.get(0)).setProperties(properties1);
        String targetBasePath1 = ((TableExecutionContext)executionContexts.get((int)1)).getConfig().targetBasePath;
        String targetBasePath2 = ((TableExecutionContext)executionContexts.get((int)0)).getConfig().targetBasePath;
        streamer.sync();
        TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5L, targetBasePath1 + "/*/*.parquet", this.sqlContext);
        TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10L, targetBasePath2 + "/*/*.parquet", this.sqlContext);
        testUtils.sendMessages("topic1", UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", Integer.valueOf(5), "{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}")));
        testUtils.sendMessages("topic2", UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", Integer.valueOf(10), "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}")));
        streamer.sync();
        Assertions.assertEquals((int)2, (int)streamer.getSuccessTables().size());
        Assertions.assertTrue((boolean)streamer.getFailedTables().isEmpty());
        TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5L, targetBasePath1 + "/*/*.parquet", this.sqlContext);
        TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10L, targetBasePath2 + "/*/*.parquet", this.sqlContext);
    }

    static class TestHelpers {
        TestHelpers() {
        }

        static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String configFolder, String sourceClassName, boolean enableHiveSync) {
            HoodieMultiTableDeltaStreamer.Config config = new HoodieMultiTableDeltaStreamer.Config();
            config.configFolder = configFolder;
            config.targetTableName = "dummy_table";
            config.basePathPrefix = dfsBasePath + "/multi_table_dataset";
            config.propsFilePath = dfsBasePath + "/" + fileName;
            config.tableType = "COPY_ON_WRITE";
            config.sourceClassName = sourceClassName;
            config.sourceOrderingField = "timestamp";
            config.schemaProviderClassName = FilebasedSchemaProvider.class.getName();
            config.enableHiveSync = enableHiveSync;
            return config;
        }
    }
}

