package org.apache.hudi.source;

import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.mailbox.SteppingMailboxProcessor;
import org.apache.flink.streaming.util.CollectingSourceContext;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.table.format.mor.MergeOnReadTableState;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/hudi/source/TestStreamReadOperator.class */
public class TestStreamReadOperator {
    private static final Map<String, String> EXPECTED = new HashMap();
    private Configuration conf;

    @TempDir
    File tempFile;

    @BeforeEach
    public void before() throws Exception {
        this.conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        this.conf.setString(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
        StreamerUtil.initTableIfNotExists(this.conf);
    }

    @Test
    void testWriteRecords() throws Exception {
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> createReader = createReader();
        Throwable th = null;
        try {
            createReader.setup();
            createReader.open();
            SteppingMailboxProcessor createLocalMailbox = createLocalMailbox(createReader);
            StreamReadMonitoringFunction monitorFunc = TestUtils.getMonitorFunc(this.conf);
            List<MergeOnReadInputSplit> generateSplits = generateSplits(monitorFunc);
            MatcherAssert.assertThat("Should have 4 splits", Integer.valueOf(generateSplits.size()), CoreMatchers.is(4));
            Iterator<MergeOnReadInputSplit> it = generateSplits.iterator();
            while (it.hasNext()) {
                createReader.processElement(it.next(), -1L);
                MatcherAssert.assertThat("Should process 1 split", createLocalMailbox.runMailboxStep());
            }
            TestData.assertRowDataEquals((List<RowData>) createReader.extractOutputValues(), TestData.DATA_SET_INSERT);
            TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
            List<MergeOnReadInputSplit> generateSplits2 = generateSplits(monitorFunc);
            MatcherAssert.assertThat("Should have 4 splits", Integer.valueOf(generateSplits2.size()), CoreMatchers.is(4));
            Iterator<MergeOnReadInputSplit> it2 = generateSplits2.iterator();
            while (it2.hasNext()) {
                createReader.processElement(it2.next(), -1L);
                MatcherAssert.assertThat("Should processed 1 split", createLocalMailbox.runMailboxStep());
            }
            ArrayList arrayList = new ArrayList(TestData.DATA_SET_INSERT);
            arrayList.addAll(TestData.DATA_SET_UPDATE_INSERT);
            TestData.assertRowDataEquals((List<RowData>) createReader.extractOutputValues(), arrayList);
            if (createReader != null) {
                if (0 == 0) {
                    createReader.close();
                    return;
                }
                try {
                    createReader.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createReader != null) {
                if (0 != 0) {
                    try {
                        createReader.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createReader.close();
                }
            }
            throw th3;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v3, types: [org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness] */
    @Test
    public void testCheckpoint() throws Exception {
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        long j = 0;
        ?? createReader = createReader();
        Throwable th = null;
        try {
            try {
                createReader.setup();
                createReader.open();
                SteppingMailboxProcessor createLocalMailbox = createLocalMailbox(createReader);
                List<MergeOnReadInputSplit> generateSplits = generateSplits(TestUtils.getMonitorFunc(this.conf));
                MatcherAssert.assertThat("Should have 4 splits", Integer.valueOf(generateSplits.size()), CoreMatchers.is(4));
                Iterator<MergeOnReadInputSplit> it = generateSplits.iterator();
                while (it.hasNext()) {
                    long j2 = j + 1;
                    j = createReader;
                    createReader.processElement(it.next(), j2);
                }
                createLocalMailbox.getMainMailboxExecutor().execute(() -> {
                    createReader.snapshot(1L, 3L);
                }, "Trigger snapshot");
                Assertions.assertTrue(createLocalMailbox.runMailboxStep(), "Should have processed the split0");
                Assertions.assertTrue(createLocalMailbox.runMailboxStep(), "Should have processed the snapshot state action");
                MatcherAssert.assertThat(TestData.rowDataToString(createReader.extractOutputValues()), CoreMatchers.is(getSplitExpected(Collections.singletonList(generateSplits.get(0)), EXPECTED)));
                Assertions.assertTrue(createLocalMailbox.runMailboxStep(), "Should have processed the split1");
                Assertions.assertTrue(createLocalMailbox.runMailboxStep(), "Should have processed the split2");
                Assertions.assertTrue(createLocalMailbox.runMailboxStep(), "Should have processed the split3");
                TestData.assertRowDataEquals((List<RowData>) createReader.extractOutputValues(), TestData.DATA_SET_INSERT);
                if (createReader != 0) {
                    if (0 == 0) {
                        createReader.close();
                        return;
                    }
                    try {
                        createReader.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createReader != 0) {
                if (th != null) {
                    try {
                        createReader.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createReader.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testCheckpointRestore() throws Exception {
        List<MergeOnReadInputSplit> generateSplits;
        OperatorSubtaskState snapshot;
        Throwable th;
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> createReader = createReader();
        Throwable th2 = null;
        try {
            try {
                createReader.setup();
                createReader.open();
                generateSplits = generateSplits(TestUtils.getMonitorFunc(this.conf));
                MatcherAssert.assertThat("Should have 4 splits", Integer.valueOf(generateSplits.size()), CoreMatchers.is(4));
                Iterator<MergeOnReadInputSplit> it = generateSplits.iterator();
                while (it.hasNext()) {
                    createReader.processElement(it.next(), -1L);
                }
                SteppingMailboxProcessor createLocalMailbox = createLocalMailbox(createReader);
                for (int i = 0; i < 2; i++) {
                    Assertions.assertTrue(createLocalMailbox.runMailboxStep(), "Should have processed the split#" + i);
                }
                MatcherAssert.assertThat(TestData.rowDataToString(createReader.extractOutputValues()), CoreMatchers.is(getSplitExpected(generateSplits.subList(0, 2), EXPECTED)));
                snapshot = createReader.snapshot(1L, 1L);
                if (createReader != null) {
                    if (0 != 0) {
                        try {
                            createReader.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        createReader.close();
                    }
                }
                createReader = createReader();
                th = null;
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
            try {
                try {
                    createReader.setup();
                    createReader.initializeState(snapshot);
                    createReader.open();
                    SteppingMailboxProcessor createLocalMailbox2 = createLocalMailbox(createReader);
                    for (int i2 = 2; i2 < 4; i2++) {
                        Assertions.assertTrue(createLocalMailbox2.runMailboxStep(), "Should have processed one split#" + i2);
                    }
                    MatcherAssert.assertThat(TestData.rowDataToString(createReader.extractOutputValues()), CoreMatchers.is(getSplitExpected(generateSplits.subList(2, 4), EXPECTED)));
                    if (createReader != null) {
                        if (0 == 0) {
                            createReader.close();
                            return;
                        }
                        try {
                            createReader.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                } catch (Throwable th6) {
                    th = th6;
                    throw th6;
                }
            } finally {
            }
        } finally {
        }
    }

    private static String getSplitExpected(List<MergeOnReadInputSplit> list, Map<String, String> map) {
        Stream<R> map2 = list.stream().map(TestUtils::getSplitPartitionPath);
        map.getClass();
        return ((List) map2.map((v1) -> {
            return r1.get(v1);
        }).sorted(Comparator.naturalOrder()).collect(Collectors.toList())).toString();
    }

    private List<MergeOnReadInputSplit> generateSplits(StreamReadMonitoringFunction streamReadMonitoringFunction) throws Exception {
        ArrayList arrayList = new ArrayList();
        streamReadMonitoringFunction.open(this.conf);
        streamReadMonitoringFunction.monitorDirAndForwardSplits(new CollectingSourceContext(new Object(), arrayList));
        return arrayList;
    }

    private OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> createReader() throws Exception {
        String absolutePath = this.tempFile.getAbsolutePath();
        org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf();
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(absolutePath).build();
        List singletonList = Collections.singletonList("partition");
        try {
            Schema tableAvroSchema = new TableSchemaResolver(build).getTableAvroSchema();
            DataType convertToDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema);
            OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness<>(StreamReadOperator.factory(new MergeOnReadInputFormat(this.conf, FilePathUtils.toFlinkPaths(FilePathUtils.getReadPaths(new Path(absolutePath), this.conf, hadoopConf, singletonList)), new MergeOnReadTableState(convertToDataType.getLogicalType(), TestConfigurations.ROW_TYPE, tableAvroSchema.toString(), AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE).toString(), Collections.emptyList()), convertToDataType.getChildren(), "default", 1000L)), 1, 1, 0);
            oneInputStreamOperatorTestHarness.getStreamConfig().setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
            return oneInputStreamOperatorTestHarness;
        } catch (Exception e) {
            throw new HoodieException("Get table avro schema error", e);
        }
    }

    private SteppingMailboxProcessor createLocalMailbox(OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> oneInputStreamOperatorTestHarness) {
        return new SteppingMailboxProcessor((v0) -> {
            v0.suspendDefaultAction();
        }, oneInputStreamOperatorTestHarness.getTaskMailbox(), StreamTaskActionExecutor.IMMEDIATE);
    }

    static {
        EXPECTED.put("par1", "id1,Danny,23,1970-01-01T00:00:00.001,par1, id2,Stephen,33,1970-01-01T00:00:00.002,par1");
        EXPECTED.put("par2", "id3,Julian,53,1970-01-01T00:00:00.003,par2, id4,Fabian,31,1970-01-01T00:00:00.004,par2");
        EXPECTED.put("par3", "id5,Sophia,18,1970-01-01T00:00:00.005,par3, id6,Emma,20,1970-01-01T00:00:00.006,par3");
        EXPECTED.put("par4", "id7,Bob,44,1970-01-01T00:00:00.007,par4, id8,Han,56,1970-01-01T00:00:00.008,par4");
    }
}
