/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.catalog.hive;

import java.io.File;
import java.io.IOException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogFunctionImpl;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.functions.hive.util.TestHiveGenericUDF;
import org.apache.flink.table.functions.hive.util.TestHiveSimpleUDF;
import org.apache.flink.table.functions.hive.util.TestHiveUDTF;
import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
import org.apache.flink.table.planner.runtime.utils.TestingRetractSink;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FileUtils;
import org.apache.hadoop.hive.ql.udf.UDFMonth;
import org.apache.hadoop.hive.ql.udf.UDFYear;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import scala.collection.Seq;

public class HiveCatalogUdfITCase
extends AbstractTestBase {
    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();
    private static HiveCatalog hiveCatalog;
    private String sourceTableName = "csv_source";
    private String sinkTableName = "csv_sink";

    @BeforeClass
    public static void createCatalog() {
        hiveCatalog = HiveTestUtils.createHiveCatalog();
        hiveCatalog.open();
    }

    @AfterClass
    public static void closeCatalog() {
        if (hiveCatalog != null) {
            hiveCatalog.close();
        }
    }

    @Test
    public void testFlinkUdf() throws Exception {
        TableSchema schema = TableSchema.builder().field("name", DataTypes.STRING()).field("age", DataTypes.INT()).build();
        HashMap<String, String> sourceOptions = new HashMap<String, String>();
        sourceOptions.put("connector.type", "filesystem");
        sourceOptions.put("connector.path", ((Object)((Object)this)).getClass().getResource("/csv/test.csv").getPath());
        sourceOptions.put("format.type", "csv");
        CatalogTableImpl source = new CatalogTableImpl(schema, sourceOptions, "Comment.");
        hiveCatalog.createTable(new ObjectPath("default", this.sourceTableName), (CatalogBaseTable)source, false);
        hiveCatalog.createFunction(new ObjectPath("default", "myudf"), (CatalogFunction)new CatalogFunctionImpl(TestHiveSimpleUDF.class.getCanonicalName()), false);
        hiveCatalog.createFunction(new ObjectPath("default", "mygenericudf"), (CatalogFunction)new CatalogFunctionImpl(TestHiveGenericUDF.class.getCanonicalName()), false);
        hiveCatalog.createFunction(new ObjectPath("default", "myudtf"), (CatalogFunction)new CatalogFunctionImpl(TestHiveUDTF.class.getCanonicalName()), false);
        hiveCatalog.createFunction(new ObjectPath("default", "myudaf"), (CatalogFunction)new CatalogFunctionImpl(GenericUDAFSum.class.getCanonicalName()), false);
        this.testUdf(true);
        this.testUdf(false);
    }

    private void testUdf(boolean batch) throws Exception {
        List<Object> results;
        TableEnvironment tEnv;
        StreamExecutionEnvironment env = null;
        EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings.newInstance();
        if (batch) {
            settingsBuilder.inBatchMode();
        } else {
            settingsBuilder.inStreamingMode();
        }
        if (batch) {
            tEnv = TableEnvironment.create((EnvironmentSettings)settingsBuilder.build());
        } else {
            env = StreamExecutionEnvironment.getExecutionEnvironment();
            tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)env, (EnvironmentSettings)settingsBuilder.build());
        }
        BatchTestBase.configForMiniCluster((TableConfig)tEnv.getConfig());
        tEnv.registerCatalog("myhive", (Catalog)hiveCatalog);
        tEnv.useCatalog("myhive");
        String innerSql = String.format("select mygenericudf(myudf(name), 1) as a, mygenericudf(myudf(age), 1) as b, s from %s, lateral table(myudtf(name, 1)) as T(s)", this.sourceTableName);
        String selectSql = String.format("select a, s, sum(b), myudaf(b) from (%s) group by a, s", innerSql);
        if (batch) {
            Path p = Paths.get(this.tempFolder.newFolder().getAbsolutePath(), "test.csv");
            TableSchema sinkSchema = TableSchema.builder().field("name1", Types.STRING()).field("name2", Types.STRING()).field("sum1", Types.INT()).field("sum2", Types.LONG()).build();
            HashMap<String, String> sinkOptions = new HashMap<String, String>();
            sinkOptions.put("connector.type", "filesystem");
            sinkOptions.put("connector.path", p.toAbsolutePath().toString());
            sinkOptions.put("format.type", "csv");
            CatalogTableImpl sink = new CatalogTableImpl(sinkSchema, sinkOptions, "Comment.");
            hiveCatalog.createTable(new ObjectPath("default", this.sinkTableName), (CatalogBaseTable)sink, false);
            tEnv.executeSql(String.format("insert into %s " + selectSql, this.sinkTableName)).await();
            StringBuilder builder = new StringBuilder();
            try (Stream<Path> paths = Files.walk(Paths.get(p.toAbsolutePath().toString(), new String[0]), new FileVisitOption[0]);){
                paths.filter(x$0 -> Files.isRegularFile(x$0, new LinkOption[0])).forEach(path -> {
                    try {
                        String content = FileUtils.readFileUtf8((File)path.toFile());
                        if (content.isEmpty()) {
                            return;
                        }
                        builder.append(content);
                    }
                    catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                });
            }
            results = Arrays.stream(builder.toString().split("\n")).filter(s -> !s.isEmpty()).collect(Collectors.toList());
        } else {
            StreamTableEnvironment streamTEnv = (StreamTableEnvironment)tEnv;
            TestingRetractSink sink = new TestingRetractSink();
            streamTEnv.toRetractStream(tEnv.sqlQuery(selectSql), Row.class).map((MapFunction)new JavaToScala()).addSink((SinkFunction)sink);
            env.execute("");
            results = JavaScalaConversionUtil.toJava((Seq)sink.getRetractResults());
        }
        results = new ArrayList(results);
        results.sort(String::compareTo);
        Assert.assertEquals(Arrays.asList("1,1,2,2", "2,2,4,4", "3,3,6,6"), results);
    }

    @Test
    public void testTimestampUDF() throws Exception {
        TableEnvironment tableEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
        tableEnv.registerCatalog(hiveCatalog.getName(), (Catalog)hiveCatalog);
        tableEnv.useCatalog(hiveCatalog.getName());
        tableEnv.executeSql(String.format("create function myyear as '%s'", UDFYear.class.getName()));
        tableEnv.executeSql("create table src(ts timestamp)");
        try {
            HiveTestUtils.createTextTableInserter(hiveCatalog, "default", "src").addRow(new Object[]{Timestamp.valueOf("2013-07-15 10:00:00")}).addRow(new Object[]{Timestamp.valueOf("2019-05-23 17:32:55")}).commit();
            List results = CollectionUtil.iteratorToList((Iterator)tableEnv.sqlQuery("select myyear(ts) as y from src").execute().collect());
            Assert.assertEquals((long)2L, (long)results.size());
            Assert.assertEquals((Object)"[+I[2013], +I[2019]]", (Object)results.toString());
        }
        finally {
            tableEnv.executeSql("drop table src");
        }
    }

    @Test
    public void testDateUDF() throws Exception {
        TableEnvironment tableEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
        tableEnv.registerCatalog(hiveCatalog.getName(), (Catalog)hiveCatalog);
        tableEnv.useCatalog(hiveCatalog.getName());
        tableEnv.executeSql(String.format("create function mymonth as '%s'", UDFMonth.class.getName()));
        tableEnv.executeSql("create table src(dt date)");
        try {
            HiveTestUtils.createTextTableInserter(hiveCatalog, "default", "src").addRow(new Object[]{Date.valueOf("2019-01-19")}).addRow(new Object[]{Date.valueOf("2019-03-02")}).commit();
            List results = CollectionUtil.iteratorToList((Iterator)tableEnv.sqlQuery("select mymonth(dt) as m from src order by m").execute().collect());
            Assert.assertEquals((long)2L, (long)results.size());
            Assert.assertEquals((Object)"[+I[1], +I[3]]", (Object)results.toString());
        }
        finally {
            tableEnv.executeSql("drop table src");
        }
    }

    private static class JavaToScala
    implements MapFunction<Tuple2<Boolean, Row>, scala.Tuple2<Boolean, Row>> {
        private JavaToScala() {
        }

        public scala.Tuple2<Boolean, Row> map(Tuple2<Boolean, Row> value) throws Exception {
            return new scala.Tuple2(value.f0, value.f1);
        }
    }
}

