/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connectors.hive;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.connectors.hive.FlinkHiveException;
import org.apache.flink.connectors.hive.HiveOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.filesystem.FileSystemConnectorOptions;
import org.apache.flink.table.planner.utils.TableTestUtil;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class HiveTableSinkITCase {
    private static HiveCatalog hiveCatalog;

    @BeforeClass
    public static void createCatalog() throws IOException {
        hiveCatalog = HiveTestUtils.createHiveCatalog();
        hiveCatalog.open();
    }

    @AfterClass
    public static void closeCatalog() {
        if (hiveCatalog != null) {
            hiveCatalog.close();
        }
    }

    @Test
    public void testHiveTableSinkWithParallelismInBatch() {
        TableEnvironment tEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
        this.testHiveTableSinkWithParallelismBase(tEnv, "/explain/testHiveTableSinkWithParallelismInBatch.out");
    }

    @Test
    public void testHiveTableSinkWithParallelismInStreaming() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = HiveTestUtils.createTableEnvInStreamingMode(env, SqlDialect.HIVE);
        this.testHiveTableSinkWithParallelismBase((TableEnvironment)tEnv, "/explain/testHiveTableSinkWithParallelismInStreaming.out");
    }

    private void testHiveTableSinkWithParallelismBase(TableEnvironment tEnv, String expectedResourceFileName) {
        tEnv.registerCatalog(hiveCatalog.getName(), (Catalog)hiveCatalog);
        tEnv.useCatalog(hiveCatalog.getName());
        tEnv.executeSql("create database db1");
        tEnv.useDatabase("db1");
        tEnv.executeSql(String.format("CREATE TABLE test_table ( id int, real_col int) TBLPROPERTIES ( 'sink.parallelism' = '8')", new Object[0]));
        tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        String actual = tEnv.explainSql("insert into test_table select 1, 1", new ExplainDetail[]{ExplainDetail.JSON_EXECUTION_PLAN});
        String expected = TableTestUtil.readFromResource((String)expectedResourceFileName);
        Assert.assertEquals((Object)TableTestUtil.replaceStreamNodeId((String)TableTestUtil.replaceStageId((String)expected)), (Object)TableTestUtil.replaceStreamNodeId((String)TableTestUtil.replaceStageId((String)actual)));
        tEnv.executeSql("drop database db1 cascade");
    }

    @Test
    public void testBatchAppend() throws Exception {
        TableEnvironment tEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
        tEnv.registerCatalog(hiveCatalog.getName(), (Catalog)hiveCatalog);
        tEnv.useCatalog(hiveCatalog.getName());
        tEnv.executeSql("create database db1");
        tEnv.useDatabase("db1");
        try {
            tEnv.executeSql("create table append_table (i int, j int)");
            tEnv.executeSql("insert into append_table select 1, 1").await();
            tEnv.executeSql("insert into append_table select 2, 2").await();
            List rows = CollectionUtil.iteratorToList((Iterator)tEnv.executeSql("select * from append_table").collect());
            rows.sort(Comparator.comparingInt(o -> (Integer)o.getField(0)));
            Assert.assertEquals(Arrays.asList(Row.of((Object[])new Object[]{1, 1}), Row.of((Object[])new Object[]{2, 2})), (Object)rows);
        }
        finally {
            tEnv.executeSql("drop database db1 cascade");
        }
    }

    @Test
    public void testDefaultSerPartStreamingWrite() throws Exception {
        this.testStreamingWrite(true, false, "textfile", this::checkSuccessFiles);
    }

    @Test
    public void testPartStreamingWrite() throws Exception {
        this.testStreamingWrite(true, false, "parquet", this::checkSuccessFiles);
        if (!hiveCatalog.getHiveVersion().startsWith("2.")) {
            this.testStreamingWrite(true, false, "orc", this::checkSuccessFiles);
        }
    }

    @Test
    public void testNonPartStreamingWrite() throws Exception {
        this.testStreamingWrite(false, false, "parquet", p -> {});
        if (!hiveCatalog.getHiveVersion().startsWith("2.")) {
            this.testStreamingWrite(false, false, "orc", p -> {});
        }
    }

    @Test
    public void testPartStreamingMrWrite() throws Exception {
        this.testStreamingWrite(true, true, "parquet", this::checkSuccessFiles);
        if (!hiveCatalog.getHiveVersion().startsWith("2.0")) {
            this.testStreamingWrite(true, true, "orc", this::checkSuccessFiles);
        }
    }

    @Test
    public void testNonPartStreamingMrWrite() throws Exception {
        this.testStreamingWrite(false, true, "parquet", p -> {});
        if (!hiveCatalog.getHiveVersion().startsWith("2.0")) {
            this.testStreamingWrite(false, true, "orc", p -> {});
        }
    }

    @Test
    public void testStreamingAppend() throws Exception {
        this.testStreamingWrite(false, false, "parquet", p -> {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            StreamTableEnvironment tEnv = HiveTestUtils.createTableEnvInStreamingMode(env);
            tEnv.registerCatalog(hiveCatalog.getName(), (Catalog)hiveCatalog);
            tEnv.useCatalog(hiveCatalog.getName());
            try {
                tEnv.executeSql("insert into db1.sink_table select 6,'a','b','2020-05-03','12'").await();
            }
            catch (Exception e) {
                Assert.fail((String)("Failed to execute sql: " + e.getMessage()));
            }
            this.assertBatch("db1.sink_table", Arrays.asList("+I[1, a, b, 2020-05-03, 7]", "+I[1, a, b, 2020-05-03, 7]", "+I[2, p, q, 2020-05-03, 8]", "+I[2, p, q, 2020-05-03, 8]", "+I[3, x, y, 2020-05-03, 9]", "+I[3, x, y, 2020-05-03, 9]", "+I[4, x, y, 2020-05-03, 10]", "+I[4, x, y, 2020-05-03, 10]", "+I[5, x, y, 2020-05-03, 11]", "+I[5, x, y, 2020-05-03, 11]", "+I[6, a, b, 2020-05-03, 12]"));
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStreamingSinkWithTimestampLtzWatermark() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(100L);
        StreamTableEnvironment tEnv = HiveTestUtils.createTableEnvInStreamingMode(env);
        tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
        tEnv.registerCatalog(hiveCatalog.getName(), (Catalog)hiveCatalog);
        tEnv.useCatalog(hiveCatalog.getName());
        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        try {
            tEnv.executeSql("create database db1");
            tEnv.useDatabase("db1");
            tEnv.executeSql("create external table source_table ( a int, b string, c string, epoch_ts bigint) partitioned by ( pt_day string, pt_hour string) TBLPROPERTIES('partition.time-extractor.timestamp-pattern'='$pt_day $pt_hour:00:00','streaming-source.enable'='true','streaming-source.monitor-interval'='1s','streaming-source.consume-order'='partition-time')");
            tEnv.executeSql("create external table sink_table ( a int, b string, c string) partitioned by ( d string, e string) TBLPROPERTIES( 'partition.time-extractor.timestamp-pattern' = '$d $e:00:00', 'auto-compaction'='true', 'compaction.file-size' = '128MB', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='30min', 'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', 'sink.partition-commit.policy.kind'='metastore,success-file', 'sink.partition-commit.success-file.name'='_MY_SUCCESS', 'streaming-source.enable'='true', 'streaming-source.monitor-interval'='1s', 'streaming-source.consume-order'='partition-time')");
            tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
            DataStream dataStream = tEnv.toDataStream(tEnv.sqlQuery("select a, b, c, epoch_ts, pt_day, pt_hour from source_table"));
            Table table = tEnv.fromDataStream(dataStream, Schema.newBuilder().column("a", (AbstractDataType)DataTypes.INT()).column("b", (AbstractDataType)DataTypes.STRING()).column("c", (AbstractDataType)DataTypes.STRING()).column("epoch_ts", (AbstractDataType)DataTypes.BIGINT()).column("pt_day", (AbstractDataType)DataTypes.STRING()).column("pt_hour", (AbstractDataType)DataTypes.STRING()).columnByExpression("ts_ltz", (Expression)Expressions.callSql((String)"TO_TIMESTAMP_LTZ(epoch_ts, 3)")).watermark("ts_ltz", "ts_ltz - INTERVAL '1' SECOND").build());
            tEnv.createTemporaryView("my_table", table);
            HashMap<Integer, Object[]> testData = new HashMap<Integer, Object[]>();
            testData.put(1, new Object[]{1, "a", "b", 1588461300000L});
            testData.put(2, new Object[]{1, "a", "b", 1588463100000L});
            testData.put(3, new Object[]{2, "p", "q", 1588464300000L});
            testData.put(4, new Object[]{2, "p", "q", 1588466400000L});
            testData.put(5, new Object[]{3, "x", "y", 1588468800000L});
            testData.put(6, new Object[]{3, "x", "y", 1588470900000L});
            testData.put(7, new Object[]{4, "x", "y", 1588471800000L});
            testData.put(8, new Object[]{4, "x", "y", 1588473300000L});
            testData.put(9, new Object[]{5, "x", "y", 1588476300000L});
            testData.put(10, new Object[]{5, "x", "y", 1588477800000L});
            HashMap<Integer, String> testPartition = new HashMap<Integer, String>();
            testPartition.put(1, "pt_day='2020-05-03',pt_hour='7'");
            testPartition.put(2, "pt_day='2020-05-03',pt_hour='8'");
            testPartition.put(3, "pt_day='2020-05-03',pt_hour='9'");
            testPartition.put(4, "pt_day='2020-05-03',pt_hour='10'");
            testPartition.put(5, "pt_day='2020-05-03',pt_hour='11'");
            HashMap<Integer, Object[]> expectedData = new HashMap<Integer, Object[]>();
            expectedData.put(1, new Object[]{1, "a", "b", "2020-05-03", "7"});
            expectedData.put(2, new Object[]{2, "p", "q", "2020-05-03", "8"});
            expectedData.put(3, new Object[]{3, "x", "y", "2020-05-03", "9"});
            expectedData.put(4, new Object[]{4, "x", "y", "2020-05-03", "10"});
            expectedData.put(5, new Object[]{5, "x", "y", "2020-05-03", "11"});
            tEnv.executeSql("insert into sink_table select a, b, c, pt_day, pt_hour from my_table");
            CloseableIterator iter = tEnv.executeSql("select * from sink_table").collect();
            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "source_table").addRow((Object[])testData.get(1)).addRow((Object[])testData.get(2)).commit((String)testPartition.get(1));
            for (int i = 2; i < 7; ++i) {
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                Assert.assertEquals(Arrays.asList(Row.of((Object[])((Object[])expectedData.get(i - 1))).toString(), Row.of((Object[])((Object[])expectedData.get(i - 1))).toString()), HiveTableSinkITCase.fetchRows((Iterator<Row>)iter, 2));
                if (i >= 6) continue;
                HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "source_table").addRow((Object[])testData.get(2 * i - 1)).addRow((Object[])testData.get(2 * i)).commit((String)testPartition.get(i));
            }
            this.checkSuccessFiles(URI.create(hiveCatalog.getHiveTable(ObjectPath.fromString((String)"db1.sink_table")).getSd().getLocation()).getPath());
        }
        finally {
            tEnv.executeSql("drop database db1 cascade");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStreamingSinkWithoutCommitPolicy() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = HiveTestUtils.createTableEnvInStreamingMode(env);
        tableEnv.registerCatalog(hiveCatalog.getName(), (Catalog)hiveCatalog);
        tableEnv.useCatalog(hiveCatalog.getName());
        tableEnv.executeSql("create database db1");
        try {
            tableEnv.useDatabase("db1");
            tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
            tableEnv.executeSql("create table dest(x int) partitioned by (p string)");
            tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
            tableEnv.executeSql("create table src (i int, p string) with ('connector'='datagen','number-of-rows'='5')");
            tableEnv.executeSql("insert into dest select * from src").await();
            Assert.fail((String)"Streaming write partitioned table without commit policy should fail");
        }
        catch (FlinkHiveException e) {
            Assert.assertTrue((boolean)e.getMessage().contains(String.format("Streaming write to partitioned hive table `%s`.`%s`.`%s` without providing a commit policy", hiveCatalog.getName(), "db1", "dest")));
        }
        finally {
            tableEnv.executeSql("drop database db1 cascade");
        }
    }

    private static List<String> fetchRows(Iterator<Row> iter, int size) {
        ArrayList<String> strings = new ArrayList<String>(size);
        for (int i = 0; i < size; ++i) {
            Assert.assertTrue((boolean)iter.hasNext());
            strings.add(iter.next().toString());
        }
        strings.sort(String::compareTo);
        return strings;
    }

    private void checkSuccessFiles(String path) {
        File basePath = new File(path, "d=2020-05-03");
        Assert.assertEquals((long)5L, (long)basePath.list().length);
        Assert.assertTrue((boolean)new File(new File(basePath, "e=7"), "_MY_SUCCESS").exists());
        Assert.assertTrue((boolean)new File(new File(basePath, "e=8"), "_MY_SUCCESS").exists());
        Assert.assertTrue((boolean)new File(new File(basePath, "e=9"), "_MY_SUCCESS").exists());
        Assert.assertTrue((boolean)new File(new File(basePath, "e=10"), "_MY_SUCCESS").exists());
        Assert.assertTrue((boolean)new File(new File(basePath, "e=11"), "_MY_SUCCESS").exists());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testStreamingWrite(boolean part, boolean useMr, String format, Consumer<String> pathConsumer) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(100L);
        StreamTableEnvironment tEnv = HiveTestUtils.createTableEnvInStreamingMode(env);
        tEnv.registerCatalog(hiveCatalog.getName(), (Catalog)hiveCatalog);
        tEnv.useCatalog(hiveCatalog.getName());
        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        if (useMr) {
            tEnv.getConfig().getConfiguration().set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, (Object)true);
        } else {
            tEnv.getConfig().getConfiguration().set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, (Object)false);
        }
        try {
            tEnv.executeSql("create database db1");
            tEnv.useDatabase("db1");
            List<Row> data = Arrays.asList(Row.of((Object[])new Object[]{1, "a", "b", "2020-05-03", "7"}), Row.of((Object[])new Object[]{2, "p", "q", "2020-05-03", "8"}), Row.of((Object[])new Object[]{3, "x", "y", "2020-05-03", "9"}), Row.of((Object[])new Object[]{4, "x", "y", "2020-05-03", "10"}), Row.of((Object[])new Object[]{5, "x", "y", "2020-05-03", "11"}));
            DataStreamSource stream = env.addSource((SourceFunction)new FiniteTestSource(data), (TypeInformation)new RowTypeInfo(new TypeInformation[]{Types.INT, Types.STRING, Types.STRING, Types.STRING, Types.STRING}));
            tEnv.createTemporaryView("my_table", (DataStream)stream, new Expression[]{Expressions.$((String)"a"), Expressions.$((String)"b"), Expressions.$((String)"c"), Expressions.$((String)"d"), Expressions.$((String)"e")});
            tEnv.executeSql("create external table sink_table (a int,b string,c string" + (part ? "" : ",d string,e string") + ") " + (part ? "partitioned by (d string,e string) " : "") + " stored as " + format + " TBLPROPERTIES ('" + FileSystemConnectorOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN.key() + "'='$d $e:00:00','" + FileSystemConnectorOptions.SINK_PARTITION_COMMIT_DELAY.key() + "'='1h','" + FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_KIND.key() + "'='metastore,success-file','" + FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME.key() + "'='_MY_SUCCESS')");
            tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
            tEnv.sqlQuery("select * from my_table").executeInsert("sink_table").await();
            this.assertBatch("db1.sink_table", Arrays.asList("+I[1, a, b, 2020-05-03, 7]", "+I[1, a, b, 2020-05-03, 7]", "+I[2, p, q, 2020-05-03, 8]", "+I[2, p, q, 2020-05-03, 8]", "+I[3, x, y, 2020-05-03, 9]", "+I[3, x, y, 2020-05-03, 9]", "+I[4, x, y, 2020-05-03, 10]", "+I[4, x, y, 2020-05-03, 10]", "+I[5, x, y, 2020-05-03, 11]", "+I[5, x, y, 2020-05-03, 11]"));
            pathConsumer.accept(URI.create(hiveCatalog.getHiveTable(ObjectPath.fromString((String)"db1.sink_table")).getSd().getLocation()).getPath());
        }
        finally {
            tEnv.executeSql("drop database db1 cascade");
        }
    }

    private void assertBatch(String table, List<String> expected) {
        ArrayList results = new ArrayList();
        TableEnvironment batchTEnv = HiveTestUtils.createTableEnvInBatchMode();
        batchTEnv.registerCatalog(hiveCatalog.getName(), (Catalog)hiveCatalog);
        batchTEnv.useCatalog(hiveCatalog.getName());
        batchTEnv.executeSql("select * from " + table).collect().forEachRemaining(r -> results.add(r.toString()));
        results.sort(String::compareTo);
        expected.sort(String::compareTo);
        Assert.assertEquals(expected, results);
    }
}

