package org.apache.hudi.sink;

import java.io.File;
import java.lang.invoke.SerializedLambda;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.TestLogger;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsInference;
import org.apache.hudi.exception.SchemaCompatibilityException;
import org.apache.hudi.sink.transform.ChainedTransformer;
import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.sink.utils.Pipelines;
import org.apache.hudi.table.catalog.CatalogOptions;
import org.apache.hudi.table.catalog.HoodieCatalog;
import org.apache.hudi.table.catalog.TableOptionProperties;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.HoodiePipeline;
import org.apache.hudi.util.JsonDeserializationFunction;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.FlinkMiniCluster;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestUtils;
import org.apache.hudi.utils.source.ContinuousFileSource;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@ExtendWith({FlinkMiniCluster.class})
/* loaded from: input_file:org/apache/hudi/sink/ITTestDataStreamWrite.class */
public class ITTestDataStreamWrite extends TestLogger {
    private static final Map<String, List<String>> EXPECTED = new HashMap();
    private static final Map<String, List<String>> EXPECTED_TRANSFORMER = new HashMap();
    private static final Map<String, List<String>> EXPECTED_CHAINED_TRANSFORMER = new HashMap();

    @TempDir
    File tempFile;

    @ValueSource(strings = {"BUCKET", "FLINK_STATE"})
    @ParameterizedTest
    public void testWriteCopyOnWrite(String str) throws Exception {
        Configuration defaultConf = TestConfigurations.getDefaultConf(this.tempFile.toURI().toString());
        defaultConf.setString(FlinkOptions.INDEX_TYPE, str);
        defaultConf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 1);
        defaultConf.setBoolean(FlinkOptions.PRE_COMBINE, true);
        testWriteToHoodie(defaultConf, "cow_write", 2, EXPECTED);
    }

    @Test
    public void testWriteCopyOnWriteWithTransformer() throws Exception {
        testWriteToHoodie(dataStream -> {
            return dataStream.map(rowData -> {
                if (!(rowData instanceof GenericRowData)) {
                    throw new RuntimeException("Unrecognized row type information: " + rowData.getClass().getSimpleName());
                }
                GenericRowData genericRowData = (GenericRowData) rowData;
                genericRowData.setField(2, Integer.valueOf(genericRowData.getInt(2) + 1));
                return genericRowData;
            });
        }, "cow_write_with_transformer", EXPECTED_TRANSFORMER);
    }

    @Test
    public void testWriteCopyOnWriteWithChainedTransformer() throws Exception {
        Transformer transformer = dataStream -> {
            return dataStream.map(rowData -> {
                if (!(rowData instanceof GenericRowData)) {
                    throw new RuntimeException("Unrecognized row type : " + rowData.getClass().getSimpleName());
                }
                GenericRowData genericRowData = (GenericRowData) rowData;
                genericRowData.setField(2, Integer.valueOf(genericRowData.getInt(2) + 1));
                return genericRowData;
            });
        };
        testWriteToHoodie(new ChainedTransformer(Arrays.asList(transformer, transformer)), "cow_write_with_chained_transformer", EXPECTED_CHAINED_TRANSFORMER);
    }

    @ValueSource(strings = {"BUCKET", "FLINK_STATE"})
    @ParameterizedTest
    public void testWriteMergeOnReadWithCompaction(String str) throws Exception {
        Configuration defaultConf = TestConfigurations.getDefaultConf(this.tempFile.toURI().toString());
        defaultConf.setString(FlinkOptions.INDEX_TYPE, str);
        defaultConf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
        defaultConf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
        defaultConf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
        testWriteToHoodie(defaultConf, "mor_write_with_compact", 1, EXPECTED);
    }

    @Test
    public void testWriteCopyOnWriteWithClustering() throws Exception {
        testWriteCopyOnWriteWithClustering(false);
    }

    @Test
    public void testWriteCopyOnWriteWithSortClustering() throws Exception {
        testWriteCopyOnWriteWithClustering(true);
    }

    private void testWriteCopyOnWriteWithClustering(boolean z) throws Exception {
        Configuration defaultConf = TestConfigurations.getDefaultConf(this.tempFile.toURI().toString());
        defaultConf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
        defaultConf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1);
        defaultConf.setString(FlinkOptions.OPERATION, "insert");
        if (z) {
            defaultConf.setString(FlinkOptions.CLUSTERING_SORT_COLUMNS, "uuid");
        }
        testWriteToHoodieWithCluster(defaultConf, "cow_write_with_cluster", 1, EXPECTED);
    }

    private void testWriteToHoodie(Transformer transformer, String str, Map<String, List<String>> map) throws Exception {
        testWriteToHoodie(TestConfigurations.getDefaultConf(this.tempFile.toURI().toString()), Option.of(transformer), str, 2, map);
    }

    private void testWriteToHoodie(Configuration configuration, String str, int i, Map<String, List<String>> map) throws Exception {
        testWriteToHoodie(configuration, Option.empty(), str, i, map);
    }

    private void testWriteToHoodie(Configuration configuration, Option<Transformer> option, String str, int i, Map<String, List<String>> map) throws Exception {
        testWriteToHoodie(configuration, option, str, i, true, map);
    }

    private void testWriteToHoodie(Configuration configuration, Option<Transformer> option, String str, int i, boolean z, Map<String, List<String>> map) throws Exception {
        SingleOutputStreamOperator parallelism;
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().disableObjectReuse();
        executionEnvironment.setParallelism(4);
        executionEnvironment.enableCheckpointing(4000L, CheckpointingMode.EXACTLY_ONCE);
        executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        if (!z) {
            executionEnvironment.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
        }
        RowType logicalType = AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(configuration)).getLogicalType();
        String url = ((URL) Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_source.data"))).toString();
        boolean equals = configuration.getString(FlinkOptions.TABLE_TYPE).equals(HoodieTableType.MERGE_ON_READ.name());
        if (equals) {
            TextInputFormat textInputFormat = new TextInputFormat(new Path(url));
            textInputFormat.setFilesFilter(FilePathFilter.createDefaultFilter());
            BasicTypeInfo basicTypeInfo = BasicTypeInfo.STRING_TYPE_INFO;
            textInputFormat.setCharsetName("UTF-8");
            parallelism = executionEnvironment.readFile(textInputFormat, url, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000L, basicTypeInfo).map(JsonDeserializationFunction.getInstance(logicalType)).setParallelism(1);
        } else {
            parallelism = executionEnvironment.addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(url), i)).name("continuous_file_source").setParallelism(1).map(JsonDeserializationFunction.getInstance(logicalType)).setParallelism(4);
        }
        if (option.isPresent()) {
            parallelism = ((Transformer) option.get()).apply(parallelism);
        }
        OptionsInference.setupSinkTasks(configuration, executionEnvironment.getParallelism());
        DataStream hoodieStreamWrite = Pipelines.hoodieStreamWrite(configuration, Pipelines.bootstrap(configuration, logicalType, parallelism));
        executionEnvironment.addOperator(hoodieStreamWrite.getTransformation());
        if (equals) {
            Pipelines.compact(configuration, hoodieStreamWrite);
        }
        execute(executionEnvironment, equals, str);
        TestData.checkWrittenDataCOW(this.tempFile, map);
    }

    private void testWriteToHoodieWithCluster(Configuration configuration, String str, int i, Map<String, List<String>> map) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().disableObjectReuse();
        executionEnvironment.setParallelism(4);
        executionEnvironment.enableCheckpointing(4000L, CheckpointingMode.EXACTLY_ONCE);
        executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        RowType logicalType = AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(configuration)).getLogicalType();
        SingleOutputStreamOperator parallelism = executionEnvironment.addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(((URL) Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_source.data"))).toString()), i)).name("continuous_file_source").setParallelism(1).map(JsonDeserializationFunction.getInstance(logicalType)).setParallelism(4);
        OptionsInference.setupSinkTasks(configuration, executionEnvironment.getParallelism());
        DataStream append = Pipelines.append(configuration, logicalType, parallelism);
        executionEnvironment.addOperator(append.getTransformation());
        Pipelines.cluster(configuration, logicalType, append);
        execute(executionEnvironment, false, str);
        TestData.checkWrittenDataCOW(this.tempFile, map);
    }

    public void execute(StreamExecutionEnvironment streamExecutionEnvironment, boolean z, String str) throws Exception {
        if (!z) {
            streamExecutionEnvironment.execute(str);
            return;
        }
        JobClient executeAsync = streamExecutionEnvironment.executeAsync(str);
        if (executeAsync.getJobStatus().get() != JobStatus.FAILED) {
            try {
                TimeUnit.SECONDS.sleep(20L);
                executeAsync.cancel();
            } catch (Throwable th) {
            }
        }
    }

    @Test
    public void testHoodiePipelineBuilderSource() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().disableObjectReuse();
        executionEnvironment.setParallelism(1);
        executionEnvironment.enableCheckpointing(4000L, CheckpointingMode.EXACTLY_ONCE);
        executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        Configuration defaultConf = TestConfigurations.getDefaultConf(this.tempFile.toURI().toString());
        defaultConf.setString(FlinkOptions.TABLE_NAME, "t1");
        defaultConf.setString(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
        TestData.writeData(TestData.dataSetInsert(1, 2), defaultConf);
        TestData.writeData(TestData.dataSetInsert(3, 4), defaultConf);
        TestData.writeData(TestData.dataSetInsert(5, 6), defaultConf);
        String lastCompleteInstant = TestUtils.getLastCompleteInstant(this.tempFile.toURI().toString());
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.PATH.key(), this.tempFile.toURI().toString());
        hashMap.put(FlinkOptions.READ_START_COMMIT.key(), lastCompleteInstant);
        DataStream source = HoodiePipeline.builder("test_source").column("uuid string not null").column("name string").column("age int").column("`ts` timestamp(3)").column("`partition` string").pk(new String[]{"uuid"}).partition(new String[]{"partition"}).options(hashMap).source(executionEnvironment);
        ArrayList arrayList = new ArrayList();
        CloseableIterator executeAndCollect = source.executeAndCollect();
        arrayList.getClass();
        executeAndCollect.forEachRemaining((v1) -> {
            r1.add(v1);
        });
        TimeUnit.SECONDS.sleep(2L);
        TestData.assertRowDataEquals(arrayList, TestData.dataSetInsert(5, 6));
    }

    @Test
    public void testHoodiePipelineBuilderSink() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        HashMap hashMap = new HashMap();
        executionEnvironment.getConfig().disableObjectReuse();
        executionEnvironment.setParallelism(4);
        executionEnvironment.enableCheckpointing(4000L, CheckpointingMode.EXACTLY_ONCE);
        executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        hashMap.put(FlinkOptions.PATH.key(), this.tempFile.toURI().toString());
        hashMap.put(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH.key(), ((URL) Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_read_schema.avsc"))).toString());
        Configuration fromMap = Configuration.fromMap(hashMap);
        String url = ((URL) Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_source.data"))).toString();
        TextInputFormat textInputFormat = new TextInputFormat(new Path(url));
        textInputFormat.setFilesFilter(FilePathFilter.createDefaultFilter());
        textInputFormat.setCharsetName("UTF-8");
        HoodiePipeline.builder("test_sink").column("uuid string not null").column("name string").column("age int").column("`ts` timestamp(3)").column("`partition` string").pk(new String[]{"uuid"}).partition(new String[]{"partition"}).options(hashMap).sink(executionEnvironment.addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(url), 2)).name("continuous_file_source").setParallelism(1).map(JsonDeserializationFunction.getInstance(fromMap)).setParallelism(4), false);
        execute(executionEnvironment, false, "Api_Sink_Test");
        TestData.checkWrittenDataCOW(this.tempFile, EXPECTED);
    }

    @Test
    public void testHoodiePipelineBuilderSourceWithSchemaSet() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().disableObjectReuse();
        executionEnvironment.setParallelism(1);
        executionEnvironment.enableCheckpointing(4000L, CheckpointingMode.EXACTLY_ONCE);
        executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        String str = (String) CatalogOptions.DEFAULT_DATABASE.defaultValue();
        File file = new File(this.tempFile, str + "/t1");
        file.mkdir();
        Configuration defaultConf = TestConfigurations.getDefaultConf(file.toURI().toString());
        defaultConf.setString(FlinkOptions.TABLE_NAME, "t1");
        defaultConf.setString(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
        TestData.writeData(TestData.dataSetInsert(1, 2), defaultConf);
        TestData.writeData(TestData.dataSetInsert(3, 4), defaultConf);
        TestData.writeData(TestData.dataSetInsert(5, 6), defaultConf);
        String lastCompleteInstant = TestUtils.getLastCompleteInstant(file.toURI().toString());
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.PATH.key(), file.toURI().toString());
        hashMap.put(FlinkOptions.READ_START_COMMIT.key(), lastCompleteInstant);
        Configuration configuration = new Configuration();
        configuration.setString(CatalogOptions.CATALOG_PATH.key(), this.tempFile.toURI().toString());
        configuration.setString(CatalogOptions.DEFAULT_DATABASE.key(), (String) CatalogOptions.DEFAULT_DATABASE.defaultValue());
        HoodieCatalog hoodieCatalog = new HoodieCatalog("hudi", configuration);
        hoodieCatalog.open();
        ObjectPath objectPath = new ObjectPath(str, "t1");
        TableOptionProperties.createProperties(file.toURI().toString(), HadoopConfigurations.getHadoopConf(configuration), hashMap);
        DataStream source = HoodiePipeline.builder("test_source").schema(hoodieCatalog.getTable(objectPath).getUnresolvedSchema()).pk(new String[]{"uuid"}).partition(new String[]{"partition"}).options(hashMap).source(executionEnvironment);
        ArrayList arrayList = new ArrayList();
        CloseableIterator executeAndCollect = source.executeAndCollect();
        arrayList.getClass();
        executeAndCollect.forEachRemaining((v1) -> {
            r1.add(v1);
        });
        TimeUnit.SECONDS.sleep(2L);
        TestData.assertRowDataEquals(arrayList, TestData.dataSetInsert(5, 6));
    }

    @Test
    public void testHoodiePipelineBuilderSinkWithSchemaSet() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        HashMap hashMap = new HashMap();
        executionEnvironment.getConfig().disableObjectReuse();
        executionEnvironment.setParallelism(4);
        executionEnvironment.enableCheckpointing(4000L, CheckpointingMode.EXACTLY_ONCE);
        executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        hashMap.put(FlinkOptions.PATH.key(), this.tempFile.toURI().toString());
        hashMap.put(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH.key(), ((URL) Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_read_schema.avsc"))).toString());
        Configuration fromMap = Configuration.fromMap(hashMap);
        String url = ((URL) Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_source.data"))).toString();
        TextInputFormat textInputFormat = new TextInputFormat(new Path(url));
        textInputFormat.setFilesFilter(FilePathFilter.createDefaultFilter());
        textInputFormat.setCharsetName("UTF-8");
        HoodiePipeline.builder("test_sink").schema(Schema.newBuilder().column("uuid", DataTypes.STRING().notNull()).column("name", DataTypes.STRING()).column("age", DataTypes.INT()).column("ts", DataTypes.TIMESTAMP(3)).column("partition", DataTypes.STRING()).primaryKey(new String[]{"uuid"}).build()).partition(new String[]{"partition"}).options(hashMap).sink(executionEnvironment.addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(url), 2)).name("continuous_file_source").setParallelism(1).map(JsonDeserializationFunction.getInstance(fromMap)).setParallelism(4), false);
        execute(executionEnvironment, false, "Api_Sink_Test");
        TestData.checkWrittenDataCOW(this.tempFile, EXPECTED);
    }

    @Test
    public void testColumnDroppingIsNotAllowed() throws Exception {
        Configuration defaultConf = TestConfigurations.getDefaultConf(this.tempFile.toURI().toString());
        testWriteToHoodie(defaultConf, "initial write", 1, EXPECTED);
        defaultConf.setBoolean(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key(), false);
        defaultConf.setBoolean(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key(), false);
        defaultConf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH, ((URL) Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_read_schema_dropped_age.avsc"))).toString());
        try {
            testWriteToHoodie(defaultConf, Option.empty(), "failing job", 1, false, Collections.emptyMap());
        } catch (JobExecutionException e) {
            Throwable th = e;
            while (true) {
                Throwable th2 = th;
                if (th2 == null) {
                    break;
                } else if (th2.getClass() == SchemaCompatibilityException.class) {
                    return;
                } else {
                    th = th2.getCause();
                }
            }
        }
        throw new AssertionError(String.format("Excepted exception %s is not found", SchemaCompatibilityException.class));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 751028190:
                if (implMethodName.equals("lambda$null$79e42a0a$1")) {
                    z = true;
                    break;
                }
                break;
            case 751028191:
                if (implMethodName.equals("lambda$null$79e42a0a$2")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/sink/ITTestDataStreamWrite") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/table/data/RowData;)Lorg/apache/flink/table/data/RowData;")) {
                    return rowData -> {
                        if (!(rowData instanceof GenericRowData)) {
                            throw new RuntimeException("Unrecognized row type : " + rowData.getClass().getSimpleName());
                        }
                        GenericRowData genericRowData = (GenericRowData) rowData;
                        genericRowData.setField(2, Integer.valueOf(genericRowData.getInt(2) + 1));
                        return genericRowData;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/sink/ITTestDataStreamWrite") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/table/data/RowData;)Lorg/apache/flink/table/data/RowData;")) {
                    return rowData2 -> {
                        if (!(rowData2 instanceof GenericRowData)) {
                            throw new RuntimeException("Unrecognized row type information: " + rowData2.getClass().getSimpleName());
                        }
                        GenericRowData genericRowData = (GenericRowData) rowData2;
                        genericRowData.setField(2, Integer.valueOf(genericRowData.getInt(2) + 1));
                        return genericRowData;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static {
        EXPECTED.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1", "id2,par1,id2,Stephen,33,2000,par1"));
        EXPECTED.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2", "id4,par2,id4,Fabian,31,4000,par2"));
        EXPECTED.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3"));
        EXPECTED.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4"));
        EXPECTED_TRANSFORMER.put("par1", Arrays.asList("id1,par1,id1,Danny,24,1000,par1", "id2,par1,id2,Stephen,34,2000,par1"));
        EXPECTED_TRANSFORMER.put("par2", Arrays.asList("id3,par2,id3,Julian,54,3000,par2", "id4,par2,id4,Fabian,32,4000,par2"));
        EXPECTED_TRANSFORMER.put("par3", Arrays.asList("id5,par3,id5,Sophia,19,5000,par3", "id6,par3,id6,Emma,21,6000,par3"));
        EXPECTED_TRANSFORMER.put("par4", Arrays.asList("id7,par4,id7,Bob,45,7000,par4", "id8,par4,id8,Han,57,8000,par4"));
        EXPECTED_CHAINED_TRANSFORMER.put("par1", Arrays.asList("id1,par1,id1,Danny,25,1000,par1", "id2,par1,id2,Stephen,35,2000,par1"));
        EXPECTED_CHAINED_TRANSFORMER.put("par2", Arrays.asList("id3,par2,id3,Julian,55,3000,par2", "id4,par2,id4,Fabian,33,4000,par2"));
        EXPECTED_CHAINED_TRANSFORMER.put("par3", Arrays.asList("id5,par3,id5,Sophia,20,5000,par3", "id6,par3,id6,Emma,22,6000,par3"));
        EXPECTED_CHAINED_TRANSFORMER.put("par4", Arrays.asList("id7,par4,id7,Bob,46,7000,par4", "id8,par4,id8,Han,58,8000,par4"));
    }
}
