/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table;

import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.apache.hudi.adapter.TestHoodieCatalogs;
import org.apache.hudi.exception.HoodieCatalogException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.utils.FlinkMiniCluster;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestTableEnvs;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIf;
import org.junit.jupiter.api.extension.ExtendWith;

@EnabledIf(value="supportAdvancedAlterTableSyntax")
@ExtendWith(value={FlinkMiniCluster.class})
public abstract class ITTestSchemaEvolutionBySQL {
    protected static final String CATALOG_NAME = "hudi_catalog";
    protected static final String DB_NAME = "hudi";
    private TableEnvironment tableEnv;
    protected Catalog catalog;
    private static final String CREATE_TABLE_DDL = "create table t1(  f_int int,  f_date date,  f_str string,  f_par string,  primary key(f_int) not enforced)partitioned by (`f_par`)with (  'connector' = 'hudi',  'hoodie.datasource.write.recordkey.field' = 'f_int',  'hoodie.schema.on.read.enable' = 'true',  'connector' = 'hudi',  'precombine.field' = 'f_date')";
    private static final String INITIALIZE_INSERT_SQL = "insert into t1 values (1, TO_DATE('2022-02-02'), 'first', '1'), (2, DATE '2022-02-02', 'second', '2')";

    @BeforeEach
    void beforeEach() {
        this.tableEnv = TestTableEnvs.getBatchTableEnv();
        this.tableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
        this.catalog = this.createCatalog();
        this.catalog.open();
        this.tableEnv.registerCatalog(CATALOG_NAME, this.catalog);
        this.tableEnv.executeSql("use catalog hudi_catalog");
        this.tableEnv.executeSql("create database if not exists hudi");
        this.tableEnv.executeSql("use hudi");
        this.tableEnv.executeSql(CREATE_TABLE_DDL);
    }

    @AfterEach
    void afterEach() {
        if (this.catalog != null) {
            this.catalog.close();
        }
    }

    @Test
    void testAddColumns() {
        this.execInsertSql(this.tableEnv, INITIALIZE_INSERT_SQL);
        String alterSql = "alter table t1 add (f_name string after f_date, f_long bigint)";
        this.tableEnv.executeSql(alterSql);
        String newInsertSql = "insert into t1 values (3, TO_DATE('2022-02-02'), 'Hi', 'third', '3', 1000), (1, TO_DATE('2022-02-02'), 'Hello', 'first', '1', 500)";
        this.execInsertSql(this.tableEnv, newInsertSql);
        List result = CollectionUtil.iterableToList(() -> this.tableEnv.sqlQuery("select * from t1").execute().collect());
        String expected = "[+I[1, 2022-02-02, Hello, first, 1, 500], +I[2, 2022-02-02, null, second, 2, null], +I[3, 2022-02-02, Hi, third, 3, 1000]]";
        TestData.assertRowsEquals((List<Row>)result, expected);
    }

    @Test
    void testDropColumns() {
        this.execInsertSql(this.tableEnv, INITIALIZE_INSERT_SQL);
        String alterSql = "alter table t1 drop f_date";
        this.tableEnv.executeSql(alterSql);
        List result = CollectionUtil.iterableToList(() -> this.tableEnv.sqlQuery("select * from t1").execute().collect());
        String expected = "[+I[1, first, 1], +I[2, second, 2]]";
        TestData.assertRowsEquals((List<Row>)result, expected);
    }

    @Test
    void testRenameColumns() {
        this.execInsertSql(this.tableEnv, INITIALIZE_INSERT_SQL);
        String alterSql = "alter table t1 rename f_str to f_string";
        this.tableEnv.executeSql(alterSql);
        List result = CollectionUtil.iterableToList(() -> this.tableEnv.sqlQuery("select f_int, f_date, f_string, f_par from t1").execute().collect());
        String expected = "[+I[1, 2022-02-02, first, 1], +I[2, 2022-02-02, second, 2]]";
        TestData.assertRowsEquals((List<Row>)result, expected);
    }

    @Test
    void testModifyColumn() {
        this.execInsertSql(this.tableEnv, INITIALIZE_INSERT_SQL);
        String alterSql = "alter table t1 modify (f_int bigint, f_str string after f_par)";
        this.tableEnv.executeSql(alterSql);
        String newInsertSql = "insert into t1 values (3, TO_DATE('2022-02-02'), '3', 'third'), (1, TO_DATE('2022-02-02'), '1', 'first1')";
        this.execInsertSql(this.tableEnv, newInsertSql);
        List result = CollectionUtil.iterableToList(() -> this.tableEnv.sqlQuery("select * from t1").execute().collect());
        String expected = "[+I[1, 2022-02-02, 1, first1], +I[2, 2022-02-02, 2, second], +I[3, 2022-02-02, 3, third]]";
        TestData.assertRowsEquals((List<Row>)result, expected);
    }

    @Test
    void testIllegalModifyColumnType() {
        this.execInsertSql(this.tableEnv, INITIALIZE_INSERT_SQL);
        String alterSql = "alter table t1 modify f_str int";
        Exception e = (Exception)Assertions.assertThrows(TableException.class, () -> this.tableEnv.executeSql(alterSql), (String)"Should throw exception when the type update is not allowed ");
        Assertions.assertTrue((boolean)(e.getCause() instanceof IllegalArgumentException));
        Assertions.assertTrue((boolean)e.getCause().getMessage().contains("cannot update origin type: string to a incompatibility type: int"));
    }

    @Test
    void testSetAndResetProperty() throws Exception {
        this.tableEnv.executeSql("alter table t1 set ('k' = 'v')");
        CatalogBaseTable table = this.catalog.getTable(new ObjectPath(DB_NAME, "t1"));
        Assertions.assertEquals(table.getOptions().get("k"), (Object)"v");
        this.tableEnv.executeSql("alter table t1 reset ('k')");
        table = this.catalog.getTable(new ObjectPath(DB_NAME, "t1"));
        Assertions.assertFalse((boolean)table.getOptions().containsKey("k"));
    }

    @Test
    void testAlterTableType() {
        Exception e = (Exception)Assertions.assertThrows(TableException.class, () -> this.tableEnv.executeSql("alter table t1 set ('table.type' = 'MERGE_ON_READ')"), (String)"Should throw exception because alter table type is not supported.");
        Assertions.assertTrue((boolean)(e.getCause() instanceof HoodieCatalogException));
        Assertions.assertTrue((boolean)e.getCause().getMessage().contains("Hoodie catalog does not support to alter table type and index type"));
    }

    @Test
    void testAlterIndexType() {
        Exception e = (Exception)Assertions.assertThrows(TableException.class, () -> this.tableEnv.executeSql("alter table t1 set ('index.type' = 'BUCKET')"), (String)"Should throw exception because alter index type is not supported.");
        Assertions.assertTrue((boolean)(e.getCause() instanceof HoodieCatalogException));
        Assertions.assertTrue((boolean)e.getCause().getMessage().contains("Hoodie catalog does not support to alter table type and index type"));
    }

    @Test
    void testAddNonPhysicalColumn() {
        Exception e = (Exception)Assertions.assertThrows(TableException.class, () -> this.tableEnv.executeSql("alter table t1 add (ts AS f_int + 1)"), (String)"Should throw exception because add non-physical column is not supported.");
        Assertions.assertTrue((boolean)(e.getCause() instanceof HoodieNotSupportedException));
        Assertions.assertTrue((boolean)e.getCause().getMessage().contains("Add non-physical column is not supported yet."));
    }

    @Test
    void testDropPrimaryKeyConstraint() {
        Exception e = (Exception)Assertions.assertThrows(TableException.class, () -> this.tableEnv.executeSql("alter table t1 drop primary key"), (String)"Should throw exception because DropConstraint is not supported.");
        Assertions.assertTrue((boolean)(e.getCause() instanceof HoodieNotSupportedException));
        Assertions.assertTrue((boolean)e.getCause().getMessage().contains("DropConstraint is not supported."));
    }

    @Test
    void testAddWatermark() {
        Exception e = (Exception)Assertions.assertThrows(TableException.class, () -> this.tableEnv.executeSql("alter table t1 add (ts timestamp(3), watermark for ts as ts - interval '1' hour)"), (String)"Should throw exception because AddWatermark is not supported.");
        Assertions.assertTrue((boolean)(e.getCause() instanceof HoodieNotSupportedException));
        Assertions.assertTrue((boolean)e.getCause().getMessage().contains("AddWatermark is not supported."));
    }

    static boolean supportAdvancedAlterTableSyntax() {
        return TestHoodieCatalogs.supportAdvancedAlterTableSyntax();
    }

    private void execInsertSql(TableEnvironment tEnv, String insert) {
        TableResult tableResult = tEnv.executeSql(insert);
        try {
            tableResult.await();
        }
        catch (InterruptedException | ExecutionException exception) {
            // empty catch block
        }
    }

    protected abstract Catalog createCatalog();
}

