package org.apache.hudi.table;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.internal.schema.Types;
import org.apache.hudi.internal.schema.action.TableChange;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.utils.FlinkMiniCluster;
import org.apache.hudi.utils.TestConfigurations;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith({FlinkMiniCluster.class})
/* loaded from: input_file:org/apache/hudi/table/ITTestSchemaEvolution.class */
public class ITTestSchemaEvolution {

    @TempDir
    File tempFile;
    private StreamTableEnvironment tEnv;
    private static final ExpectedResult EXPECTED_MERGED_RESULT = new ExpectedResult(new String[]{"+I[Indica, null, 12, null, {Indica=1212.0}, [12.0], null, null, null]", "+I[Danny, 10000.1, 23, +I[1, 1, s1, 11, t1, drop_add1], {Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]", "+I[Stephen, null, 33, +I[2, null, s2, 2, null, null], {Stephen=3333.0}, [33.0], null, null, null]", "+I[Julian, 30000.3, 53, +I[3, 3, s3, 33, t3, drop_add3], {Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]", "+I[Fabian, null, 31, +I[4, null, s4, 4, null, null], {Fabian=3131.0}, [31.0], null, null, null]", "+I[Sophia, null, 18, +I[5, null, s5, 5, null, null], {Sophia=1818.0}, [18.0, 18.0], null, null, null]", "+I[Emma, null, 20, +I[6, null, s6, 6, null, null], {Emma=2020.0}, [20.0], null, null, null]", "+I[Bob, null, 44, +I[7, null, s7, 7, null, null], {Bob=4444.0}, [44.0, 44.0], null, null, null]", "+I[Han, null, 56, +I[8, null, s8, 8, null, null], {Han=5656.0}, [56.0, 56.0, 56.0], null, null, null]", "+I[Alice, 90000.9, unknown, +I[9, 9, s9, 99, t9, drop_add9], {Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]"}, new String[]{"+I[uuid:id0, Indica, null, 12, null, {Indica=1212.0}, [12.0], null, null, null]", "+I[uuid:id1, Danny, 10000.1, 23, +I[1, 1, s1, 11, t1, drop_add1], {Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]", "+I[uuid:id2, Stephen, null, 33, +I[2, null, s2, 2, null, null], {Stephen=3333.0}, [33.0], null, null, null]", "+I[uuid:id3, Julian, 30000.3, 53, +I[3, 3, s3, 33, t3, drop_add3], {Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]", "+I[uuid:id4, Fabian, null, 31, +I[4, null, s4, 4, null, null], {Fabian=3131.0}, [31.0], null, null, null]", "+I[uuid:id5, Sophia, null, 18, +I[5, null, s5, 5, null, null], {Sophia=1818.0}, [18.0, 18.0], null, null, null]", "+I[uuid:id6, Emma, null, 20, +I[6, null, s6, 6, null, null], {Emma=2020.0}, [20.0], null, null, null]", "+I[uuid:id7, Bob, null, 44, +I[7, null, s7, 7, null, null], {Bob=4444.0}, [44.0, 44.0], null, null, null]", "+I[uuid:id8, Han, null, 56, +I[8, null, s8, 8, null, null], {Han=5656.0}, [56.0, 56.0, 56.0], null, null, null]", "+I[uuid:id9, Alice, 90000.9, unknown, +I[9, 9, s9, 99, t9, drop_add9], {Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]"}, new String[]{"+I[1]", "+U[2]", "+U[3]", "+U[4]", "+U[5]", "+U[6]", "+U[7]", "+U[8]", "+U[9]", "+U[10]", "-U[1]", "-U[2]", "-U[3]", "-U[4]", "-U[5]", "-U[6]", "-U[7]", "-U[8]", "-U[9]"});
    private static final ExpectedResult EXPECTED_UNMERGED_RESULT = new ExpectedResult(new String[]{"+I[Indica, null, 12, null, {Indica=1212.0}, [12.0], null, null, null]", "+I[Danny, null, 23, +I[1, null, s1, 1, null, null], {Danny=2323.0}, [23.0, 23.0], null, null, null]", "+I[Stephen, null, 33, +I[2, null, s2, 2, null, null], {Stephen=3333.0}, [33.0], null, null, null]", "+I[Julian, null, 53, +I[3, null, s3, 3, null, null], {Julian=5353.0}, [53.0, 53.0], null, null, null]", "+I[Fabian, null, 31, +I[4, null, s4, 4, null, null], {Fabian=3131.0}, [31.0], null, null, null]", "+I[Sophia, null, 18, +I[5, null, s5, 5, null, null], {Sophia=1818.0}, [18.0, 18.0], null, null, null]", "+I[Emma, null, 20, +I[6, null, s6, 6, null, null], {Emma=2020.0}, [20.0], null, null, null]", "+I[Bob, null, 44, +I[7, null, s7, 7, null, null], {Bob=4444.0}, [44.0, 44.0], null, null, null]", "+I[Han, null, 56, +I[8, null, s8, 8, null, null], {Han=5656.0}, [56.0, 56.0, 56.0], null, null, null]", "+I[Alice, 90000.9, unknown, +I[9, 9, s9, 99, t9, drop_add9], {Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]", "+I[Danny, 10000.1, 23, +I[1, 1, s1, 11, t1, drop_add1], {Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]", "+I[Julian, 30000.3, 53, +I[3, 3, s3, 33, t3, drop_add3], {Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]"}, new String[]{"+I[uuid:id0, Indica, null, 12, null, {Indica=1212.0}, [12.0], null, null, null]", "+I[uuid:id1, Danny, null, 23, +I[1, null, s1, 1, null, null], {Danny=2323.0}, [23.0, 23.0], null, null, null]", "+I[uuid:id2, Stephen, null, 33, +I[2, null, s2, 2, null, null], {Stephen=3333.0}, [33.0], null, null, null]", "+I[uuid:id3, Julian, null, 53, +I[3, null, s3, 3, null, null], {Julian=5353.0}, [53.0, 53.0], null, null, null]", "+I[uuid:id4, Fabian, null, 31, +I[4, null, s4, 4, null, null], {Fabian=3131.0}, [31.0], null, null, null]", "+I[uuid:id5, Sophia, null, 18, +I[5, null, s5, 5, null, null], {Sophia=1818.0}, [18.0, 18.0], null, null, null]", "+I[uuid:id6, Emma, null, 20, +I[6, null, s6, 6, null, null], {Emma=2020.0}, [20.0], null, null, null]", "+I[uuid:id7, Bob, null, 44, +I[7, null, s7, 7, null, null], {Bob=4444.0}, [44.0, 44.0], null, null, null]", "+I[uuid:id8, Han, null, 56, +I[8, null, s8, 8, null, null], {Han=5656.0}, [56.0, 56.0, 56.0], null, null, null]", "+I[uuid:id9, Alice, 90000.9, unknown, +I[9, 9, s9, 99, t9, drop_add9], {Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]", "+I[uuid:id1, Danny, 10000.1, 23, +I[1, 1, s1, 11, t1, drop_add1], {Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]", "+I[uuid:id3, Julian, 30000.3, 53, +I[3, 3, s3, 33, t3, drop_add3], {Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]"}, new String[]{"+I[1]", "+U[2]", "+U[3]", "+U[4]", "+U[5]", "+U[6]", "+U[7]", "+U[8]", "+U[9]", "+U[10]", "-U[10]", "+U[11]", "-U[11]", "+U[12]", "-U[1]", "-U[2]", "-U[3]", "-U[4]", "-U[5]", "-U[6]", "-U[7]", "-U[8]", "-U[9]"});

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/table/ITTestSchemaEvolution$ExpectedResult.class */
    public static final class ExpectedResult {
        final String[] evolvedRows;
        final String[] rowsWithMeta;
        final String[] rowCount;

        private ExpectedResult(String[] strArr, String[] strArr2, String[] strArr3) {
            this.evolvedRows = strArr;
            this.rowsWithMeta = strArr2;
            this.rowCount = strArr3;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/table/ITTestSchemaEvolution$TableOptions.class */
    public static final class TableOptions {
        private final Map<String, String> map = new HashMap();

        TableOptions(Object... objArr) {
            Preconditions.checkArgument(objArr.length % 2 == 0);
            for (int i = 0; i < objArr.length; i += 2) {
                withOption(objArr[i].toString(), objArr[i + 1]);
            }
        }

        TableOptions withOption(String str, Object obj) {
            if (StringUtils.isNullOrEmpty(str)) {
                throw new IllegalArgumentException("optionName must be presented");
            }
            this.map.put(str, obj.toString());
            return this;
        }

        Configuration toConfig() {
            return FlinkOptions.fromMap(this.map);
        }

        public String toString() {
            return (String) this.map.entrySet().stream().map(entry -> {
                return String.format("'%s' = '%s'", entry.getKey(), entry.getValue());
            }).collect(Collectors.joining(", "));
        }
    }

    @BeforeEach
    public void setUp() {
        this.tEnv = StreamTableEnvironment.create(StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1));
    }

    @Test
    public void testCopyOnWriteInputFormat() throws Exception {
        testSchemaEvolution(defaultTableOptions(this.tempFile.getAbsolutePath()));
    }

    @Test
    public void testMergeOnReadInputFormatBaseFileOnlyIterator() throws Exception {
        testSchemaEvolution(defaultTableOptions(this.tempFile.getAbsolutePath()).withOption(FlinkOptions.READ_AS_STREAMING.key(), true).withOption(FlinkOptions.READ_START_COMMIT.key(), "earliest"));
    }

    @Test
    public void testMergeOnReadInputFormatBaseFileOnlyFilteringIterator() throws Exception {
        testSchemaEvolution(defaultTableOptions(this.tempFile.getAbsolutePath()).withOption(FlinkOptions.READ_AS_STREAMING.key(), true).withOption(FlinkOptions.READ_START_COMMIT.key(), 1));
    }

    @Test
    public void testMergeOnReadInputFormatLogFileOnlyIteratorGetLogFileIterator() throws Exception {
        testSchemaEvolution(defaultTableOptions(this.tempFile.getAbsolutePath()).withOption(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ));
    }

    @Test
    public void testMergeOnReadInputFormatLogFileOnlyIteratorGetUnMergedLogFileIterator() throws Exception {
        testSchemaEvolution(defaultTableOptions(this.tempFile.getAbsolutePath()).withOption(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ).withOption(FlinkOptions.READ_AS_STREAMING.key(), true).withOption(FlinkOptions.READ_START_COMMIT.key(), "earliest").withOption(FlinkOptions.CHANGELOG_ENABLED.key(), true), false, EXPECTED_UNMERGED_RESULT);
    }

    @Test
    public void testMergeOnReadInputFormatMergeIterator() throws Exception {
        testSchemaEvolution(defaultTableOptions(this.tempFile.getAbsolutePath()).withOption(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ).withOption(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), 1), true);
    }

    @Test
    public void testMergeOnReadInputFormatSkipMergeIterator() throws Exception {
        testSchemaEvolution(defaultTableOptions(this.tempFile.getAbsolutePath()).withOption(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ).withOption(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), 1).withOption(FlinkOptions.MERGE_TYPE.key(), "skip_merge"), true, EXPECTED_UNMERGED_RESULT);
    }

    @Test
    public void testCompaction() throws Exception {
        TableOptions withOption = defaultTableOptions(this.tempFile.getAbsolutePath()).withOption(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ).withOption(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), 1);
        testSchemaEvolution(withOption);
        HoodieFlinkWriteClient createWriteClient = FlinkWriteClients.createWriteClient(withOption.toConfig());
        Throwable th = null;
        try {
            try {
                createWriteClient.compact((String) createWriteClient.scheduleCompaction(Option.empty()).get());
                if (createWriteClient != null) {
                    if (0 != 0) {
                        try {
                            createWriteClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createWriteClient.close();
                    }
                }
                checkAnswerEvolved(EXPECTED_MERGED_RESULT.evolvedRows);
            } finally {
            }
        } catch (Throwable th3) {
            if (createWriteClient != null) {
                if (th != null) {
                    try {
                        createWriteClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createWriteClient.close();
                }
            }
            throw th3;
        }
    }

    private void testSchemaEvolution(TableOptions tableOptions) throws Exception {
        testSchemaEvolution(tableOptions, false);
    }

    private void testSchemaEvolution(TableOptions tableOptions, boolean z) throws Exception {
        testSchemaEvolution(tableOptions, z, EXPECTED_MERGED_RESULT);
    }

    private void testSchemaEvolution(TableOptions tableOptions, boolean z, ExpectedResult expectedResult) throws Exception {
        writeTableWithSchema1(tableOptions);
        changeTableSchema(tableOptions, z);
        writeTableWithSchema2(tableOptions);
        checkAnswerEvolved(expectedResult.evolvedRows);
        checkAnswerCount(expectedResult.rowCount);
        checkAnswerWithMeta(tableOptions, expectedResult.rowsWithMeta);
    }

    private void writeTableWithSchema1(TableOptions tableOptions) throws ExecutionException, InterruptedException {
        this.tEnv.executeSql("create table t1 (  uuid string,  name string,  gender char,  age int,  ts timestamp,  f_struct row<f0 int, f1 string, drop_add string, change_type int>,  f_map map<string, int>,  f_array array<int>,  `partition` string) partitioned by (`partition`) with (" + tableOptions + ")");
        this.tEnv.executeSql("insert into t1 select   cast(uuid as string),  cast(name as string),  cast(gender as char),  cast(age as int),  cast(ts as timestamp),  cast(f_struct as row<f0 int, f1 string, drop_add string, change_type int>),  cast(f_map as map<string, int>),  cast(f_array as array<int>),  cast(`partition` as string) from (values   ('id0', 'Indica', 'F', 12, '2000-01-01 00:00:00', cast(null as row<f0 int, f1 string, drop_add string, change_type int>), map['Indica', 1212], array[12], 'par0'),  ('id1', 'Danny', 'M', 23, '2000-01-01 00:00:01', row(1, 's1', '', 1), cast(map['Danny', 2323] as map<string, int>), array[23, 23], 'par1'),  ('id2', 'Stephen', 'M', 33, '2000-01-01 00:00:02', row(2, 's2', '', 2), cast(map['Stephen', 3333] as map<string, int>), array[33], 'par1'),  ('id3', 'Julian', 'M', 53, '2000-01-01 00:00:03', row(3, 's3', '', 3), cast(map['Julian', 5353] as map<string, int>), array[53, 53], 'par2'),  ('id4', 'Fabian', 'M', 31, '2000-01-01 00:00:04', row(4, 's4', '', 4), cast(map['Fabian', 3131] as map<string, int>), array[31], 'par2'),  ('id5', 'Sophia', 'F', 18, '2000-01-01 00:00:05', row(5, 's5', '', 5), cast(map['Sophia', 1818] as map<string, int>), array[18, 18], 'par3'),  ('id6', 'Emma', 'F', 20, '2000-01-01 00:00:06', row(6, 's6', '', 6), cast(map['Emma', 2020] as map<string, int>), array[20], 'par3'),  ('id7', 'Bob', 'M', 44, '2000-01-01 00:00:07', row(7, 's7', '', 7), cast(map['Bob', 4444] as map<string, int>), array[44, 44], 'par4'),  ('id8', 'Han', 'M', 56, '2000-01-01 00:00:08', row(8, 's8', '', 8), cast(map['Han', 5656] as map<string, int>), array[56, 56, 56], 'par4')) as A(uuid, name, gender, age, ts, f_struct, f_map, f_array, `partition`)").await();
    }

    private void changeTableSchema(TableOptions tableOptions, boolean z) throws IOException {
        HoodieFlinkWriteClient createWriteClient = FlinkWriteClients.createWriteClient(tableOptions.toConfig());
        Throwable th = null;
        if (z) {
            try {
                try {
                    createWriteClient.compact((String) createWriteClient.scheduleCompaction(Option.empty()).get());
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (createWriteClient != null) {
                    if (th != null) {
                        try {
                            createWriteClient.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        createWriteClient.close();
                    }
                }
                throw th3;
            }
        }
        Schema schema = (Schema) ((SchemaBuilder.UnionAccumulator) ((SchemaBuilder.UnionAccumulator) SchemaBuilder.unionOf().nullType()).and().intType()).endUnion();
        Schema schema2 = (Schema) ((SchemaBuilder.UnionAccumulator) ((SchemaBuilder.UnionAccumulator) SchemaBuilder.unionOf().nullType()).and().doubleType()).endUnion();
        Schema schema3 = (Schema) ((SchemaBuilder.UnionAccumulator) ((SchemaBuilder.UnionAccumulator) SchemaBuilder.unionOf().nullType()).and().stringType()).endUnion();
        Schema schema4 = (Schema) SchemaBuilder.builder().record("new_row_col").fields().name("f0").type("long").noDefault().name("f1").type("string").noDefault().endRecord();
        Schema createUnion = Schema.createUnion(new Schema[]{(Schema) SchemaBuilder.builder().array().items(schema3), (Schema) SchemaBuilder.builder().nullType()});
        Schema createUnion2 = Schema.createUnion(new Schema[]{(Schema) SchemaBuilder.builder().map().values(schema3), (Schema) SchemaBuilder.builder().nullType()});
        createWriteClient.addColumn("salary", schema2, (String) null, "name", TableChange.ColumnPositionChange.ColumnPositionType.AFTER);
        createWriteClient.deleteColumns(new String[]{"gender"});
        createWriteClient.renameColumn("name", "first_name");
        createWriteClient.updateColumnType("age", Types.StringType.get());
        createWriteClient.addColumn("last_name", schema3, "empty allowed", "salary", TableChange.ColumnPositionChange.ColumnPositionType.BEFORE);
        createWriteClient.reOrderColPosition("age", "first_name", TableChange.ColumnPositionChange.ColumnPositionType.BEFORE);
        createWriteClient.addColumn("f_struct.f2", schema, "add field in middle of struct", "f_struct.f0", TableChange.ColumnPositionChange.ColumnPositionType.AFTER);
        createWriteClient.addColumn("f_struct.f3", schema3);
        createWriteClient.deleteColumns(new String[]{"f_struct.drop_add"});
        createWriteClient.addColumn("f_struct.drop_add", schema2);
        createWriteClient.updateColumnType("f_struct.change_type", Types.LongType.get());
        createWriteClient.renameColumn("f_struct.change_type", "renamed_change_type");
        createWriteClient.updateColumnType("f_array.element", Types.DoubleType.get());
        createWriteClient.updateColumnType("f_map.value", Types.DoubleType.get());
        createWriteClient.addColumn("new_row_col", schema4);
        createWriteClient.addColumn("new_array_col", createUnion);
        createWriteClient.addColumn("new_map_col", createUnion2);
        if (createWriteClient != null) {
            if (0 == 0) {
                createWriteClient.close();
                return;
            }
            try {
                createWriteClient.close();
            } catch (Throwable th5) {
                th.addSuppressed(th5);
            }
        }
    }

    private void writeTableWithSchema2(TableOptions tableOptions) throws ExecutionException, InterruptedException {
        tableOptions.withOption(FlinkOptions.SOURCE_AVRO_SCHEMA.key(), AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE_EVOLUTION_AFTER, "hoodie.t1.t1_record"));
        this.tEnv.executeSql("drop table t1");
        this.tEnv.executeSql("create table t1 (  uuid string,  age string,  first_name string,  last_name string,  salary double,  ts timestamp,  f_struct row<f0 int, f2 int, f1 string, renamed_change_type bigint, f3 string, drop_add string>,  f_map map<string, double>,  f_array array<double>,  new_row_col row<f0 bigint, f1 string>,  new_array_col array<string>,  new_map_col map<string, string>,  `partition` string) partitioned by (`partition`) with (" + tableOptions + ")");
        this.tEnv.executeSql("insert into t1 select   cast(uuid as string),  cast(age as string),  cast(first_name as string),  cast(last_name as string),  cast(salary as double),  cast(ts as timestamp),  cast(f_struct as row<f0 int, f2 int, f1 string, renamed_change_type bigint, f3 string, drop_add string>),  cast(f_map as map<string, double>),  cast(f_array as array<double>),  cast(new_row_col as row<f0 bigint, f1 string>),  cast(new_array_col as array<string>),  cast(new_map_col as map<string, string>),  cast(`partition` as string) from (values   ('id1', '23', 'Danny', '', 10000.1, '2000-01-01 00:00:01', row(1, 1, 's1', 11, 't1', 'drop_add1'), cast(map['Danny', 2323.23] as map<string, double>), array[23, 23, 23],   row(1, '1'), array['1'], Map['k1','v1'], 'par1'),  ('id9', 'unknown', 'Alice', '', 90000.9, '2000-01-01 00:00:09', row(9, 9, 's9', 99, 't9', 'drop_add9'), cast(map['Alice', 9999.99] as map<string, double>), array[9999, 9999],   row(9, '9'), array['9'], Map['k9','v9'], 'par1'),  ('id3', '53', 'Julian', '', 30000.3, '2000-01-01 00:00:03', row(3, 3, 's3', 33, 't3', 'drop_add3'), cast(map['Julian', 5353.53] as map<string, double>), array[53],   row(3, '3'), array['3'], Map['k3','v3'], 'par2')) as A(uuid, age, first_name, last_name, salary, ts, f_struct, f_map, f_array, new_row_col, new_array_col, new_map_col, `partition`)").await();
    }

    private TableOptions defaultTableOptions(String str) {
        return new TableOptions(FactoryUtil.CONNECTOR.key(), "hudi", FlinkOptions.PATH.key(), str, FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_COPY_ON_WRITE, HoodieTableConfig.NAME.key(), "t1", FlinkOptions.READ_AS_STREAMING.key(), false, FlinkOptions.QUERY_TYPE.key(), "snapshot", KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid", KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partition", KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key(), true, HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), ComplexAvroKeyGenerator.class.getName(), FlinkOptions.WRITE_BATCH_SIZE.key(), Double.valueOf(1.0E-6d), FlinkOptions.SOURCE_AVRO_SCHEMA.key(), AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE_EVOLUTION_BEFORE), FlinkOptions.READ_TASKS.key(), 1, FlinkOptions.WRITE_TASKS.key(), 1, FlinkOptions.INDEX_BOOTSTRAP_TASKS.key(), 1, FlinkOptions.BUCKET_ASSIGN_TASKS.key(), 1, FlinkOptions.COMPACTION_TASKS.key(), 1, FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), false, HoodieWriteConfig.EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED.key(), false, HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), true);
    }

    private void checkAnswerEvolved(String... strArr) throws Exception {
        checkAnswer("select   first_name,   salary,   age,   f_struct,   f_map,   f_array,   new_row_col,   new_array_col,   new_map_col from t1", strArr);
    }

    private void checkAnswerCount(String... strArr) throws Exception {
        checkAnswer("select count(*) from t1", strArr);
    }

    private void checkAnswerWithMeta(TableOptions tableOptions, String... strArr) throws Exception {
        this.tEnv.executeSql("drop table t1");
        this.tEnv.executeSql("create table t1 (  `_hoodie_commit_time` string,  `_hoodie_commit_seqno` string,  `_hoodie_record_key` string,  `_hoodie_partition_path` string,  `_hoodie_file_name` string,  uuid string,  age string,  first_name string,  last_name string,  salary double,  ts timestamp,  f_struct row<f0 int, f2 int, f1 string, renamed_change_type bigint, f3 string, drop_add string>,  f_map map<string, double>,  f_array array<double>,  new_row_col row<f0 bigint, f1 string>,  new_array_col array<string>,  new_map_col map<string, string>,  `partition` string) partitioned by (`partition`) with (" + tableOptions + ")");
        checkAnswer("select   `_hoodie_record_key`,   first_name,   salary,   age,   f_struct,   f_map,   f_array,   new_row_col,   new_array_col,   new_map_col from t1", strArr);
    }

    private void checkAnswer(String str, String... strArr) {
        TableResult executeSql = this.tEnv.executeSql(str);
        HashSet hashSet = new HashSet(Arrays.asList(strArr));
        HashSet hashSet2 = new HashSet();
        Runnable runnable = () -> {
            try {
                CloseableIterator collect = executeSql.collect();
                Throwable th = null;
                while (collect.hasNext()) {
                    try {
                        try {
                            hashSet2.add(((Row) collect.next()).toString());
                        } finally {
                        }
                    } finally {
                    }
                }
                if (collect != null) {
                    if (0 != 0) {
                        try {
                            collect.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        collect.close();
                    }
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        };
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        Future<?> submit = newSingleThreadExecutor.submit(runnable);
        try {
            try {
                try {
                    submit.get(5L, TimeUnit.SECONDS);
                    newSingleThreadExecutor.shutdownNow();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            } catch (TimeoutException e2) {
                submit.cancel(true);
                newSingleThreadExecutor.shutdownNow();
            }
            Assertions.assertEquals(hashSet, hashSet2);
        } catch (Throwable th) {
            newSingleThreadExecutor.shutdownNow();
            throw th;
        }
    }
}
