package org.apache.hudi.utilities.functional;

import java.io.IOException;
import java.util.List;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.TableExecutionContext;
import org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.JsonKafkaSource;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.class */
public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer {
    private static volatile Logger log = LogManager.getLogger(TestHoodieMultiTableDeltaStreamer.class);

    /* loaded from: input_file:org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer$TestHelpers.class */
    static class TestHelpers {
        TestHelpers() {
        }

        static HoodieMultiTableDeltaStreamer.Config getConfig(String str, String str2, String str3, boolean z) {
            HoodieMultiTableDeltaStreamer.Config config = new HoodieMultiTableDeltaStreamer.Config();
            config.configFolder = str2;
            config.targetTableName = "dummy_table";
            config.basePathPrefix = TestHoodieMultiTableDeltaStreamer.dfsBasePath + "/multi_table_dataset";
            config.propsFilePath = TestHoodieMultiTableDeltaStreamer.dfsBasePath + "/" + str;
            config.tableType = "COPY_ON_WRITE";
            config.sourceClassName = str3;
            config.sourceOrderingField = "timestamp";
            config.schemaProviderClassName = FilebasedSchemaProvider.class.getName();
            config.enableHiveSync = Boolean.valueOf(z);
            return config;
        }
    }

    @Test
    public void testInvalidHiveSyncProps() throws IOException {
        HoodieMultiTableDeltaStreamer.Config config = TestHelpers.getConfig(TestHoodieDeltaStreamer.PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true);
        Exception exc = (Exception) Assertions.assertThrows(HoodieException.class, () -> {
            new HoodieMultiTableDeltaStreamer(config, this.jsc);
        }, "Should fail when hive sync table not provided with enableHiveSync flag");
        log.debug("Expected error when creating table execution objects", exc);
        Assertions.assertTrue(exc.getMessage().contains("Hive sync table field not provided!"));
    }

    @Test
    public void testInvalidPropsFilePath() throws IOException {
        HoodieMultiTableDeltaStreamer.Config config = TestHelpers.getConfig(TestHoodieDeltaStreamer.PROPS_INVALID_FILE, dfsBasePath + "/config", TestDataSource.class.getName(), true);
        Exception exc = (Exception) Assertions.assertThrows(IllegalArgumentException.class, () -> {
            new HoodieMultiTableDeltaStreamer(config, this.jsc);
        }, "Should fail when invalid props file is provided");
        log.debug("Expected error when creating table execution objects", exc);
        Assertions.assertTrue(exc.getMessage().contains("Please provide valid common config file path!"));
    }

    @Test
    public void testInvalidTableConfigFilePath() throws IOException {
        HoodieMultiTableDeltaStreamer.Config config = TestHelpers.getConfig(TestHoodieDeltaStreamer.PROPS_INVALID_TABLE_CONFIG_FILE, dfsBasePath + "/config", TestDataSource.class.getName(), true);
        Exception exc = (Exception) Assertions.assertThrows(IllegalArgumentException.class, () -> {
            new HoodieMultiTableDeltaStreamer(config, this.jsc);
        }, "Should fail when invalid table config props file path is provided");
        log.debug("Expected error when creating table execution objects", exc);
        Assertions.assertTrue(exc.getMessage().contains("Please provide valid table config file path!"));
    }

    @Test
    public void testCustomConfigProps() throws IOException {
        HoodieMultiTableDeltaStreamer hoodieMultiTableDeltaStreamer = new HoodieMultiTableDeltaStreamer(TestHelpers.getConfig(TestHoodieDeltaStreamer.PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), false), this.jsc);
        TableExecutionContext tableExecutionContext = (TableExecutionContext) hoodieMultiTableDeltaStreamer.getTableExecutionContexts().get(1);
        Assertions.assertEquals(2, hoodieMultiTableDeltaStreamer.getTableExecutionContexts().size());
        Assertions.assertEquals(dfsBasePath + "/multi_table_dataset/uber_db/dummy_table_uber", tableExecutionContext.getConfig().targetBasePath);
        Assertions.assertEquals("uber_db.dummy_table_uber", tableExecutionContext.getConfig().targetTableName);
        Assertions.assertEquals("topic1", tableExecutionContext.getProperties().getString("hoodie.deltastreamer.source.kafka.topic"));
        Assertions.assertEquals("_row_key", tableExecutionContext.getProperties().getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()));
        Assertions.assertEquals(TestHoodieDeltaStreamer.TestGenerator.class.getName(), tableExecutionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY()));
        Assertions.assertEquals("uber_hive_dummy_table", tableExecutionContext.getProperties().getString("hoodie.datasource.hive_sync.table"));
    }

    @Disabled
    @Test
    public void testInvalidIngestionProps() {
        Exception exc = (Exception) Assertions.assertThrows(Exception.class, () -> {
            new HoodieMultiTableDeltaStreamer(TestHelpers.getConfig(TestHoodieDeltaStreamer.PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true), this.jsc);
        }, "Creation of execution object should fail without kafka topic");
        log.debug("Creation of execution object failed with error: " + exc.getMessage(), exc);
        Assertions.assertTrue(exc.getMessage().contains("Please provide valid table config arguments!"));
    }

    @Test
    public void testMultiTableExecution() throws IOException {
        testUtils.createTopic("topic1", 2);
        testUtils.createTopic("topic2", 2);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        testUtils.sendMessages("topic1", UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInsertsAsPerSchema("000", 5, "{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}")));
        testUtils.sendMessages("topic2", UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInsertsAsPerSchema("000", 10, "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}")));
        HoodieMultiTableDeltaStreamer hoodieMultiTableDeltaStreamer = new HoodieMultiTableDeltaStreamer(TestHelpers.getConfig(TestHoodieDeltaStreamer.PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", JsonKafkaSource.class.getName(), false), this.jsc);
        List tableExecutionContexts = hoodieMultiTableDeltaStreamer.getTableExecutionContexts();
        TypedProperties properties = ((TableExecutionContext) tableExecutionContexts.get(1)).getProperties();
        properties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source_uber.avsc");
        properties.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target_uber.avsc");
        ((TableExecutionContext) tableExecutionContexts.get(1)).setProperties(properties);
        TypedProperties properties2 = ((TableExecutionContext) tableExecutionContexts.get(0)).getProperties();
        properties2.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source_short_trip_uber.avsc");
        properties2.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target_short_trip_uber.avsc");
        ((TableExecutionContext) tableExecutionContexts.get(0)).setProperties(properties2);
        String str = ((TableExecutionContext) tableExecutionContexts.get(1)).getConfig().targetBasePath;
        String str2 = ((TableExecutionContext) tableExecutionContexts.get(0)).getConfig().targetBasePath;
        hoodieMultiTableDeltaStreamer.sync();
        TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5L, str + "/*/*.parquet", this.sqlContext);
        TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10L, str2 + "/*/*.parquet", this.sqlContext);
        testUtils.sendMessages("topic1", UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateUpdatesAsPerSchema("001", 5, "{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}")));
        testUtils.sendMessages("topic2", UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateUpdatesAsPerSchema("001", 10, "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}")));
        hoodieMultiTableDeltaStreamer.sync();
        Assertions.assertEquals(2, hoodieMultiTableDeltaStreamer.getSuccessTables().size());
        Assertions.assertTrue(hoodieMultiTableDeltaStreamer.getFailedTables().isEmpty());
        TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5L, str + "/*/*.parquet", this.sqlContext);
        TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10L, str2 + "/*/*.parquet", this.sqlContext);
    }
}
