/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.examples.quickstart;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.types.Row;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.examples.quickstart.factory.CollectSinkTableFactory;
import org.apache.hudi.examples.quickstart.utils.QuickstartConfigurations;
import org.jetbrains.annotations.NotNull;

public final class HoodieFlinkQuickstart {
    private EnvironmentSettings settings = null;
    private TableEnvironment streamTableEnv = null;
    private String tableName;

    private HoodieFlinkQuickstart() {
    }

    public static HoodieFlinkQuickstart instance() {
        return new HoodieFlinkQuickstart();
    }

    public static void main(String[] args) throws TableNotExistException, InterruptedException {
        if (args.length < 3) {
            System.err.println("Usage: HoodieWriteClientExample <tablePath> <tableName> <tableType>");
            System.exit(1);
        }
        String tablePath = args[0];
        String tableName = args[1];
        String tableType = args[2];
        HoodieFlinkQuickstart flinkQuickstart = HoodieFlinkQuickstart.instance();
        flinkQuickstart.initEnv();
        flinkQuickstart.createFileSource();
        flinkQuickstart.createHudiTable(tablePath, tableName, HoodieTableType.valueOf((String)tableType));
        flinkQuickstart.insertData();
        flinkQuickstart.queryData();
        flinkQuickstart.updateData();
    }

    public void initEnv() {
        if (this.streamTableEnv == null) {
            this.settings = EnvironmentSettings.newInstance().build();
            TableEnvironmentImpl streamTableEnv = TableEnvironmentImpl.create((EnvironmentSettings)this.settings);
            streamTableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
            Configuration execConf = streamTableEnv.getConfig().getConfiguration();
            execConf.setString("execution.checkpointing.interval", "2s");
            execConf.setString("restart-strategy", "fixed-delay");
            execConf.setString("restart-strategy.fixed-delay.attempts", "0");
            this.streamTableEnv = streamTableEnv;
        }
    }

    public TableEnvironment getStreamTableEnv() {
        return this.streamTableEnv;
    }

    public TableEnvironment getBatchTableEnv() {
        Configuration conf = new Configuration();
        conf.setBoolean("execution.sorted-inputs.enabled", false);
        conf.setBoolean("execution.batch-state-backend.enabled", false);
        StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)conf);
        this.settings = EnvironmentSettings.newInstance().inBatchMode().build();
        StreamTableEnvironment batchTableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)execEnv, (EnvironmentSettings)this.settings);
        batchTableEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
        return batchTableEnv;
    }

    public void createHudiTable(String tablePath, String tableName, HoodieTableType tableType) {
        this.tableName = tableName;
        String hoodieTableDDL = QuickstartConfigurations.sql(tableName).option(FlinkOptions.PATH, (Object)tablePath).option(FlinkOptions.READ_AS_STREAMING, (Object)true).option(FlinkOptions.TABLE_TYPE, (Object)tableType).option(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), (Object)false).end();
        this.streamTableEnv.executeSql(hoodieTableDDL);
    }

    public void createFileSource() {
        String createSource = QuickstartConfigurations.getFileSourceDDL("source");
        this.streamTableEnv.executeSql(createSource);
    }

    @NotNull
    List<Row> insertData() throws InterruptedException, TableNotExistException {
        String insertInto = String.format("insert into %s select * from source", this.tableName);
        HoodieFlinkQuickstart.execInsertSql(this.streamTableEnv, insertInto);
        return this.queryData();
    }

    List<Row> queryData() throws InterruptedException, TableNotExistException {
        return HoodieFlinkQuickstart.execSelectSql(this.streamTableEnv, String.format("select * from %s", this.tableName), 10L);
    }

    @NotNull
    List<Row> updateData() throws InterruptedException, TableNotExistException {
        String insertInto = String.format("insert into %s select * from source", this.tableName);
        HoodieFlinkQuickstart.execInsertSql(this.getStreamTableEnv(), insertInto);
        return this.queryData();
    }

    public static void execInsertSql(TableEnvironment tEnv, String insert) {
        TableResult tableResult = tEnv.executeSql(insert);
        try {
            tableResult.await();
        }
        catch (InterruptedException | ExecutionException exception) {
            // empty catch block
        }
    }

    public static List<Row> execSelectSql(TableEnvironment tEnv, String select, long timeout) throws InterruptedException, TableNotExistException {
        return HoodieFlinkQuickstart.execSelectSql(tEnv, select, timeout, null);
    }

    public static List<Row> execSelectSql(TableEnvironment tEnv, String select, long timeout, String sourceTable) throws InterruptedException, TableNotExistException {
        String sinkDDL;
        if (sourceTable != null) {
            ObjectPath objectPath = new ObjectPath(tEnv.getCurrentDatabase(), sourceTable);
            String currentCatalog = tEnv.getCurrentCatalog();
            Catalog catalog = (Catalog)tEnv.getCatalog(currentCatalog).get();
            ResolvedCatalogTable table = (ResolvedCatalogTable)catalog.getTable(objectPath);
            ResolvedSchema schema = table.getResolvedSchema();
            sinkDDL = QuickstartConfigurations.getCollectSinkDDL("sink", schema);
        } else {
            sinkDDL = QuickstartConfigurations.getCollectSinkDDL("sink");
        }
        return HoodieFlinkQuickstart.execSelectSql(tEnv, select, sinkDDL, timeout);
    }

    public static List<Row> execSelectSql(TableEnvironment tEnv, String select, String sinkDDL, long timeout) throws InterruptedException {
        tEnv.executeSql("DROP TABLE IF EXISTS sink");
        tEnv.executeSql(sinkDDL);
        TableResult tableResult = tEnv.executeSql("insert into sink " + select);
        TimeUnit.SECONDS.sleep(timeout);
        tableResult.getJobClient().ifPresent(JobClient::cancel);
        tEnv.executeSql("DROP TABLE IF EXISTS sink");
        return CollectSinkTableFactory.RESULT.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
    }
}

