package org.apache.flink.table.api;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.FileUtils;
import org.apache.flink.FlinkVersion;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
import org.apache.flink.table.planner.utils.JsonTestUtils;
import org.apache.flink.table.planner.utils.TableTestUtil;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/api/CompiledPlanITCase.class */
public class CompiledPlanITCase extends JsonPlanTestBase {
    private static final List<String> DATA = Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello world");
    private static final String[] COLUMNS_DEFINITION = {"a bigint", "b int", "c varchar"};

    @Override // org.apache.flink.table.planner.utils.JsonPlanTestBase
    @Before
    public void setup() throws Exception {
        super.setup();
        this.tableEnv.executeSql("CREATE TABLE MyTable (\n" + String.join(",", COLUMNS_DEFINITION) + ") with (\n  'connector' = 'values',\n  'bounded' = 'false')");
        this.tableEnv.executeSql("CREATE TABLE MySink (\n" + String.join(",", COLUMNS_DEFINITION) + ") with (\n  'connector' = 'values',\n  'table-sink-class' = 'DEFAULT')");
    }

    @Test
    public void testCompilePlanSql() throws IOException {
        Assertions.assertThat(TableTestUtil.replaceExecNodeId(TableTestUtil.replaceFlinkVersion(TableTestUtil.getFormattedJson(this.tableEnv.compilePlanSql("INSERT INTO MySink SELECT * FROM MyTable").asJsonString())))).isEqualTo(TableTestUtil.replaceExecNodeId(TableTestUtil.replaceFlinkVersion(TableTestUtil.getFormattedJson(TableTestUtil.readFromResource("/jsonplan/testGetJsonPlan.out")))));
    }

    @Test
    public void testExecutePlanSql() throws Exception {
        File createSourceSinkTables = createSourceSinkTables();
        this.tableEnv.compilePlanSql("INSERT INTO sink SELECT * FROM src").execute().await();
        assertResult(DATA, createSourceSinkTables);
    }

    @Test
    public void testExecuteCtasPlanSql() throws Exception {
        createTestCsvSourceTable("src", DATA, COLUMNS_DEFINITION);
        File newFolder = TEMPORARY_FOLDER.newFolder();
        Assertions.assertThatThrownBy(() -> {
            this.tableEnv.compilePlanSql(String.format("CREATE TABLE sink\nWITH (\n  'connector' = 'filesystem',\n  'format' = 'testcsv',\n  'path' = '%s'\n) AS SELECT * FROM src", newFolder.getAbsolutePath())).execute();
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(TableException.class, "Unsupported SQL query! compilePlanSql() only accepts a single SQL statement of type INSERT")});
    }

    @Test
    public void testExecutePlanTable() throws Exception {
        File createSourceSinkTables = createSourceSinkTables();
        this.tableEnv.from("src").select(new Expression[]{Expressions.$("*")}).insertInto("sink").compilePlan().execute().await();
        assertResult(DATA, createSourceSinkTables);
    }

    @Test
    public void testCompileWriteToFileAndThenExecuteSql() throws Exception {
        Path path = Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json");
        FileUtils.createParentDirectories(path.toFile());
        File createSourceSinkTables = createSourceSinkTables();
        this.tableEnv.compilePlanSql("INSERT INTO sink SELECT * FROM src").writeToFile(path);
        this.tableEnv.executeSql(String.format("EXECUTE PLAN '%s'", path.toAbsolutePath())).await();
        assertResult(DATA, createSourceSinkTables);
    }

    @Test
    public void testCompilePlan() throws Exception {
        Path absolutePath = Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json").toAbsolutePath();
        FileUtils.createParentDirectories(absolutePath.toFile());
        File createSourceSinkTables = createSourceSinkTables();
        Assertions.assertThat(this.tableEnv.executeSql(String.format("COMPILE PLAN '%s' FOR INSERT INTO sink SELECT * FROM src", absolutePath))).isEqualTo(TableResultInternal.TABLE_RESULT_OK);
        Assertions.assertThat(absolutePath.toFile()).exists();
        Assertions.assertThatThrownBy(() -> {
            this.tableEnv.executeSql(String.format("COMPILE PLAN '%s' FOR INSERT INTO sink SELECT * FROM src", absolutePath));
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(TableException.class, "Cannot overwrite the plan file")});
        this.tableEnv.executeSql(String.format("EXECUTE PLAN '%s'", absolutePath)).await();
        assertResult(DATA, createSourceSinkTables);
    }

    @Test
    public void testCompilePlanWithStatementSet() throws Exception {
        Path absolutePath = Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json").toAbsolutePath();
        FileUtils.createParentDirectories(absolutePath.toFile());
        createTestCsvSourceTable("src", DATA, COLUMNS_DEFINITION);
        File createTestCsvSinkTable = createTestCsvSinkTable("sinkA", COLUMNS_DEFINITION);
        File createTestCsvSinkTable2 = createTestCsvSinkTable("sinkB", COLUMNS_DEFINITION);
        Assertions.assertThat(this.tableEnv.executeSql(String.format("COMPILE PLAN '%s' FOR STATEMENT SET BEGIN INSERT INTO sinkA SELECT * FROM src;INSERT INTO sinkB SELECT a + 1, b + 1, CONCAT(c, '-something') FROM src;END", absolutePath))).isEqualTo(TableResultInternal.TABLE_RESULT_OK);
        Assertions.assertThat(absolutePath.toFile()).exists();
        this.tableEnv.executeSql(String.format("EXECUTE PLAN '%s'", absolutePath)).await();
        assertResult(DATA, createTestCsvSinkTable);
        assertResult(Arrays.asList("2,2,hi-something", "3,2,hello-something", "4,3,hello world-something"), createTestCsvSinkTable2);
    }

    @Test
    public void testCompilePlanIfNotExists() throws Exception {
        Path absolutePath = Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json").toAbsolutePath();
        FileUtils.createParentDirectories(absolutePath.toFile());
        File createSourceSinkTables = createSourceSinkTables();
        Assertions.assertThat(this.tableEnv.executeSql(String.format("COMPILE PLAN '%s' IF NOT EXISTS FOR INSERT INTO sink SELECT * FROM src", absolutePath))).isEqualTo(TableResultInternal.TABLE_RESULT_OK);
        Assertions.assertThat(absolutePath.toFile()).exists();
        Assertions.assertThat(this.tableEnv.executeSql(String.format("COMPILE PLAN '%s' IF NOT EXISTS FOR INSERT INTO sink SELECT a + 1, b + 1, CONCAT(c, '-something') FROM src", absolutePath))).isEqualTo(TableResultInternal.TABLE_RESULT_OK);
        this.tableEnv.executeSql(String.format("EXECUTE PLAN '%s'", absolutePath)).await();
        assertResult(DATA, createSourceSinkTables);
    }

    @Test
    public void testCompilePlanOverwrite() throws Exception {
        this.tableEnv.getConfig().set(TableConfigOptions.PLAN_FORCE_RECOMPILE, true);
        Path absolutePath = Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json").toAbsolutePath();
        FileUtils.createParentDirectories(absolutePath.toFile());
        List<String> asList = Arrays.asList("2,2,hi-something", "3,2,hello-something", "4,3,hello world-something");
        File createSourceSinkTables = createSourceSinkTables();
        Assertions.assertThat(this.tableEnv.executeSql(String.format("COMPILE PLAN '%s' FOR INSERT INTO sink SELECT * FROM src", absolutePath))).isEqualTo(TableResultInternal.TABLE_RESULT_OK);
        Assertions.assertThat(absolutePath.toFile()).exists();
        Assertions.assertThat(this.tableEnv.executeSql(String.format("COMPILE PLAN '%s' FOR INSERT INTO sink SELECT a + 1, b + 1, CONCAT(c, '-something') FROM src", absolutePath))).isEqualTo(TableResultInternal.TABLE_RESULT_OK);
        this.tableEnv.executeSql(String.format("EXECUTE PLAN '%s'", absolutePath)).await();
        assertResult(asList, createSourceSinkTables);
    }

    @Test
    public void testCompileAndExecutePlan() throws Exception {
        Path absolutePath = Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json").toAbsolutePath();
        FileUtils.createParentDirectories(absolutePath.toFile());
        File createSourceSinkTables = createSourceSinkTables();
        this.tableEnv.executeSql(String.format("COMPILE AND EXECUTE PLAN '%s' FOR INSERT INTO sink SELECT * FROM src", absolutePath)).await();
        Assertions.assertThat(absolutePath.toFile()).exists();
        assertResult(DATA, createSourceSinkTables);
    }

    @Test
    public void testCompileAndExecutePlanWithStatementSet() throws Exception {
        Path absolutePath = Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json").toAbsolutePath();
        FileUtils.createParentDirectories(absolutePath.toFile());
        createTestCsvSourceTable("src", DATA, COLUMNS_DEFINITION);
        File createTestCsvSinkTable = createTestCsvSinkTable("sinkA", COLUMNS_DEFINITION);
        File createTestCsvSinkTable2 = createTestCsvSinkTable("sinkB", COLUMNS_DEFINITION);
        this.tableEnv.executeSql(String.format("COMPILE AND EXECUTE PLAN '%s' FOR STATEMENT SET BEGIN INSERT INTO sinkA SELECT * FROM src;INSERT INTO sinkB SELECT a + 1, b + 1, CONCAT(c, '-something') FROM src;END", absolutePath)).await();
        Assertions.assertThat(absolutePath.toFile()).exists();
        assertResult(DATA, createTestCsvSinkTable);
        assertResult(Arrays.asList("2,2,hi-something", "3,2,hello-something", "4,3,hello world-something"), createTestCsvSinkTable2);
    }

    @Test
    public void testExplainPlan() throws IOException {
        Assertions.assertThat(TableTestUtil.replaceNodeIdInOperator(TableTestUtil.replaceStreamNodeId(this.tableEnv.loadPlan(PlanReference.fromJsonString(JsonTestUtils.setFlinkVersion(JsonTestUtils.readFromResource("/jsonplan/testGetJsonPlan.out"), FlinkVersion.current()).toString())).explain(new ExplainDetail[]{ExplainDetail.JSON_EXECUTION_PLAN})))).isEqualTo(TableTestUtil.readFromResource("/explain/testExplainJsonPlan.out"));
    }

    @Test
    public void testPersistedConfigOption() throws Exception {
        FileUtils.createParentDirectories(Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json").toFile());
        createTestCsvSourceTable("src", (List) Stream.concat(DATA.stream(), Stream.of((Object[]) new String[]{"4,2,This string is long", "5,3,This is an even longer string"})).collect(Collectors.toList()), COLUMNS_DEFINITION);
        File createTestCsvSinkTable = createTestCsvSinkTable("sink", "a bigint", "b int", "c varchar(11)");
        this.tableEnv.getConfig().getConfiguration().set(ExecutionConfigOptions.TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER, ExecutionConfigOptions.TypeLengthEnforcer.TRIM_PAD);
        CompiledPlan compilePlanSql = this.tableEnv.compilePlanSql("INSERT INTO sink SELECT * FROM src");
        this.tableEnv.getConfig().getConfiguration().set(ExecutionConfigOptions.TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER, ExecutionConfigOptions.TypeLengthEnforcer.IGNORE);
        compilePlanSql.execute().await();
        assertResult((List<String>) Stream.concat(DATA.stream(), Stream.of((Object[]) new String[]{"4,2,This string", "5,3,This is an "})).collect(Collectors.toList()), createTestCsvSinkTable);
    }

    @Test
    public void testBatchMode() {
        this.tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
        this.tableEnv.executeSql("CREATE TABLE src (\n  a bigint\n) with (\n  'connector' = 'values',\n  'bounded' = 'true')");
        this.tableEnv.executeSql("CREATE TABLE sink (\n  a bigint\n) with (\n  'connector' = 'values',\n  'table-sink-class' = 'DEFAULT')");
        Assertions.assertThatThrownBy(() -> {
            this.tableEnv.compilePlanSql("INSERT INTO sink SELECT * FROM src");
        }).isInstanceOf(UnsupportedOperationException.class).hasMessage("The compiled plan feature is not supported in batch mode.");
    }

    private File createSourceSinkTables() throws IOException {
        createTestCsvSourceTable("src", DATA, COLUMNS_DEFINITION);
        return createTestCsvSinkTable("sink", COLUMNS_DEFINITION);
    }
}
