package org.apache.hudi.source;

import java.io.File;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/hudi/source/TestStreamReadMonitoringFunction.class */
public class TestStreamReadMonitoringFunction {
    private static final long WAIT_TIME_MILLIS = 5000;
    private Configuration conf;

    @TempDir
    File tempFile;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/source/TestStreamReadMonitoringFunction$CollectingSourceContext.class */
    public static class CollectingSourceContext implements SourceFunction.SourceContext<MergeOnReadInputSplit> {
        private final List<MergeOnReadInputSplit> splits = new ArrayList();
        private final Object checkpointLock = new Object();
        private volatile CountDownLatch latch;

        CollectingSourceContext(CountDownLatch countDownLatch) {
            this.latch = countDownLatch;
        }

        public void collect(MergeOnReadInputSplit mergeOnReadInputSplit) {
            this.splits.add(mergeOnReadInputSplit);
            this.latch.countDown();
        }

        public void collectWithTimestamp(MergeOnReadInputSplit mergeOnReadInputSplit, long j) {
            collect(mergeOnReadInputSplit);
        }

        public void emitWatermark(Watermark watermark) {
        }

        public void markAsTemporarilyIdle() {
        }

        public Object getCheckpointLock() {
            return this.checkpointLock;
        }

        public void close() {
        }

        public void reset(CountDownLatch countDownLatch) {
            this.latch = countDownLatch;
            this.splits.clear();
        }

        public String getPartitionPaths() {
            return (String) this.splits.stream().map(TestUtils::getSplitPartitionPath).distinct().sorted(Comparator.naturalOrder()).collect(Collectors.joining(","));
        }
    }

    @BeforeEach
    public void before() throws Exception {
        this.conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        this.conf.setString(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
        this.conf.setInteger(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 2);
        StreamerUtil.initTableIfNotExists(this.conf);
    }

    @Test
    public void testConsumeFromLatestCommit() throws Exception {
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
        StreamReadMonitoringFunction monitorFunc = TestUtils.getMonitorFunc(this.conf);
        AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> createHarness = createHarness(monitorFunc);
        Throwable th = null;
        try {
            try {
                createHarness.setup();
                createHarness.open();
                CountDownLatch countDownLatch = new CountDownLatch(4);
                CollectingSourceContext collectingSourceContext = new CollectingSourceContext(countDownLatch);
                runAsync(collectingSourceContext, monitorFunc);
                Assertions.assertTrue(countDownLatch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation");
                MatcherAssert.assertThat("Should produce the expected splits", collectingSourceContext.getPartitionPaths(), CoreMatchers.is("par1,par2,par3,par4"));
                Assertions.assertTrue(collectingSourceContext.splits.stream().allMatch(mergeOnReadInputSplit -> {
                    return mergeOnReadInputSplit.getInstantRange().isPresent();
                }), "All the instants should have range limit");
                String lastCompleteInstant = TestUtils.getLastCompleteInstant(this.tempFile.getAbsolutePath());
                Assertions.assertTrue(collectingSourceContext.splits.stream().allMatch(mergeOnReadInputSplit2 -> {
                    return mergeOnReadInputSplit2.getLatestCommit().equals(lastCompleteInstant);
                }), "All the splits should be with latestCommit instant time");
                monitorFunc.close();
                if (createHarness != null) {
                    if (0 == 0) {
                        createHarness.close();
                        return;
                    }
                    try {
                        createHarness.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createHarness != null) {
                if (th != null) {
                    try {
                        createHarness.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createHarness.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testConsumeFromLastCommit() throws Exception {
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        StreamReadMonitoringFunction monitorFunc = TestUtils.getMonitorFunc(this.conf);
        AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> createHarness = createHarness(monitorFunc);
        Throwable th = null;
        try {
            try {
                createHarness.setup();
                createHarness.open();
                CountDownLatch countDownLatch = new CountDownLatch(4);
                CollectingSourceContext collectingSourceContext = new CollectingSourceContext(countDownLatch);
                runAsync(collectingSourceContext, monitorFunc);
                Assertions.assertTrue(countDownLatch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation");
                MatcherAssert.assertThat("Should produce the expected splits", collectingSourceContext.getPartitionPaths(), CoreMatchers.is("par1,par2,par3,par4"));
                Assertions.assertTrue(collectingSourceContext.splits.stream().allMatch(mergeOnReadInputSplit -> {
                    return mergeOnReadInputSplit.getInstantRange().isPresent();
                }), "All instants should have range limit");
                Thread.sleep(1000L);
                CountDownLatch countDownLatch2 = new CountDownLatch(4);
                collectingSourceContext.reset(countDownLatch2);
                TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
                Assertions.assertTrue(countDownLatch2.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation");
                MatcherAssert.assertThat("Should produce the expected splits", collectingSourceContext.getPartitionPaths(), CoreMatchers.is("par1,par2,par3,par4"));
                Assertions.assertTrue(collectingSourceContext.splits.stream().allMatch(mergeOnReadInputSplit2 -> {
                    return mergeOnReadInputSplit2.getInstantRange().isPresent();
                }), "All the instants should have range limit");
                monitorFunc.close();
                if (createHarness != null) {
                    if (0 == 0) {
                        createHarness.close();
                        return;
                    }
                    try {
                        createHarness.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createHarness != null) {
                if (th != null) {
                    try {
                        createHarness.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createHarness.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testConsumeFromSpecifiedCommit() throws Exception {
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
        String lastCompleteInstant = TestUtils.getLastCompleteInstant(this.tempFile.getAbsolutePath());
        this.conf.setString(FlinkOptions.READ_START_COMMIT, lastCompleteInstant);
        StreamReadMonitoringFunction monitorFunc = TestUtils.getMonitorFunc(this.conf);
        AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> createHarness = createHarness(monitorFunc);
        Throwable th = null;
        try {
            createHarness.setup();
            createHarness.open();
            CountDownLatch countDownLatch = new CountDownLatch(4);
            CollectingSourceContext collectingSourceContext = new CollectingSourceContext(countDownLatch);
            runAsync(collectingSourceContext, monitorFunc);
            Assertions.assertTrue(countDownLatch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation");
            MatcherAssert.assertThat("Should produce the expected splits", collectingSourceContext.getPartitionPaths(), CoreMatchers.is("par1,par2,par3,par4"));
            Assertions.assertTrue(collectingSourceContext.splits.stream().allMatch(mergeOnReadInputSplit -> {
                return mergeOnReadInputSplit.getInstantRange().isPresent();
            }), "All the instants should have range limit");
            Assertions.assertTrue(collectingSourceContext.splits.stream().allMatch(mergeOnReadInputSplit2 -> {
                return mergeOnReadInputSplit2.getLatestCommit().equals(lastCompleteInstant);
            }), "All the splits should be with specified instant time");
            monitorFunc.close();
            if (createHarness != null) {
                if (0 == 0) {
                    createHarness.close();
                    return;
                }
                try {
                    createHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createHarness != null) {
                if (0 != 0) {
                    try {
                        createHarness.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createHarness.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testConsumeFromEarliestCommit() throws Exception {
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
        String lastCompleteInstant = TestUtils.getLastCompleteInstant(this.tempFile.getAbsolutePath());
        this.conf.setString(FlinkOptions.READ_START_COMMIT, "earliest");
        StreamReadMonitoringFunction monitorFunc = TestUtils.getMonitorFunc(this.conf);
        AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> createHarness = createHarness(monitorFunc);
        Throwable th = null;
        try {
            createHarness.setup();
            createHarness.open();
            CountDownLatch countDownLatch = new CountDownLatch(4);
            CollectingSourceContext collectingSourceContext = new CollectingSourceContext(countDownLatch);
            runAsync(collectingSourceContext, monitorFunc);
            Assertions.assertTrue(countDownLatch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation");
            MatcherAssert.assertThat("Should produce the expected splits", collectingSourceContext.getPartitionPaths(), CoreMatchers.is("par1,par2,par3,par4"));
            Assertions.assertTrue(collectingSourceContext.splits.stream().noneMatch(mergeOnReadInputSplit -> {
                return mergeOnReadInputSplit.getInstantRange().isPresent();
            }), "No instants should have range limit");
            Assertions.assertTrue(collectingSourceContext.splits.stream().allMatch(mergeOnReadInputSplit2 -> {
                return mergeOnReadInputSplit2.getLatestCommit().equals(lastCompleteInstant);
            }), "All the splits should be with specified instant time");
            monitorFunc.close();
            if (createHarness != null) {
                if (0 == 0) {
                    createHarness.close();
                    return;
                }
                try {
                    createHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createHarness != null) {
                if (0 != 0) {
                    try {
                        createHarness.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createHarness.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testConsumingHollowInstants() throws Exception {
        this.conf.setString("hoodie.parquet.small.file.limit", "0");
        for (int i = 0; i < 8; i += 2) {
            TestData.writeData(TestData.dataSetInsert(i + 1, i + 2), this.conf);
        }
        HoodieTableMetaClient createMetaClient = StreamerUtil.createMetaClient(this.tempFile.getAbsolutePath(), HadoopConfigurations.getHadoopConf(this.conf));
        List instants = createMetaClient.getCommitsTimeline().filterCompletedInstants().getInstants();
        MatcherAssert.assertThat(Integer.valueOf(instants.size()), CoreMatchers.is(4));
        ArrayList arrayList = new ArrayList();
        for (int i2 = 1; i2 <= 2; i2++) {
            arrayList.add(TestUtils.deleteInstantFile(createMetaClient, (HoodieInstant) instants.get(i2)));
        }
        List instants2 = createMetaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants();
        MatcherAssert.assertThat(Integer.valueOf(instants2.size()), CoreMatchers.is(2));
        String timestamp = ((HoodieInstant) instants.get(1)).getTimestamp();
        String timestamp2 = ((HoodieInstant) instants.get(2)).getTimestamp();
        String timestamp3 = ((HoodieInstant) instants2.get(1)).getTimestamp();
        this.conf.setString(FlinkOptions.READ_START_COMMIT, "earliest");
        StreamReadMonitoringFunction monitorFunc = TestUtils.getMonitorFunc(this.conf);
        AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> createHarness = createHarness(monitorFunc);
        Throwable th = null;
        try {
            try {
                createHarness.setup();
                createHarness.open();
                CountDownLatch countDownLatch = new CountDownLatch(2);
                CollectingSourceContext collectingSourceContext = new CollectingSourceContext(countDownLatch);
                runAsync(collectingSourceContext, monitorFunc);
                Assertions.assertTrue(countDownLatch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation");
                MatcherAssert.assertThat("Should produce the expected splits", collectingSourceContext.getPartitionPaths(), CoreMatchers.is("par1"));
                Assertions.assertTrue(collectingSourceContext.splits.stream().noneMatch(mergeOnReadInputSplit -> {
                    return mergeOnReadInputSplit.getInstantRange().isPresent();
                }), "No instants should have range limit");
                Assertions.assertTrue(collectingSourceContext.splits.stream().allMatch(mergeOnReadInputSplit2 -> {
                    return mergeOnReadInputSplit2.getLatestCommit().equals(timestamp3);
                }), "All the splits should be with specified instant time");
                CountDownLatch countDownLatch2 = new CountDownLatch(1);
                collectingSourceContext.reset(countDownLatch2);
                TestUtils.saveInstantAsComplete(createMetaClient, (HoodieInstant) instants.get(1), (HoodieCommitMetadata) arrayList.get(0));
                Assertions.assertTrue(countDownLatch2.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation");
                MatcherAssert.assertThat("Should produce the expected splits", collectingSourceContext.getPartitionPaths(), CoreMatchers.is("par1"));
                Assertions.assertTrue(collectingSourceContext.splits.stream().allMatch(mergeOnReadInputSplit3 -> {
                    return mergeOnReadInputSplit3.getInstantRange().isPresent();
                }), "All instants should have range limit");
                Assertions.assertTrue(collectingSourceContext.splits.stream().allMatch(mergeOnReadInputSplit4 -> {
                    return isPointInstantRange((InstantRange) mergeOnReadInputSplit4.getInstantRange().get(), timestamp);
                }), "All the splits should have point instant range");
                Assertions.assertTrue(collectingSourceContext.splits.stream().allMatch(mergeOnReadInputSplit5 -> {
                    return mergeOnReadInputSplit5.getLatestCommit().equals(timestamp);
                }), "All the splits should be with specified instant time");
                CountDownLatch countDownLatch3 = new CountDownLatch(1);
                collectingSourceContext.reset(countDownLatch3);
                TestUtils.saveInstantAsComplete(createMetaClient, (HoodieInstant) instants.get(2), (HoodieCommitMetadata) arrayList.get(1));
                Assertions.assertTrue(countDownLatch3.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation");
                MatcherAssert.assertThat("Should produce the expected splits", collectingSourceContext.getPartitionPaths(), CoreMatchers.is("par1"));
                Assertions.assertTrue(collectingSourceContext.splits.stream().allMatch(mergeOnReadInputSplit6 -> {
                    return mergeOnReadInputSplit6.getInstantRange().isPresent();
                }), "All instants should have range limit");
                Assertions.assertTrue(collectingSourceContext.splits.stream().allMatch(mergeOnReadInputSplit7 -> {
                    return isPointInstantRange((InstantRange) mergeOnReadInputSplit7.getInstantRange().get(), timestamp2);
                }), "All the splits should have point instant range");
                Assertions.assertTrue(collectingSourceContext.splits.stream().allMatch(mergeOnReadInputSplit8 -> {
                    return mergeOnReadInputSplit8.getLatestCommit().equals(timestamp2);
                }), "All the splits should be with specified instant time");
                monitorFunc.close();
                if (createHarness != null) {
                    if (0 == 0) {
                        createHarness.close();
                        return;
                    }
                    try {
                        createHarness.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createHarness != null) {
                if (th != null) {
                    try {
                        createHarness.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createHarness.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testCheckpointRestore() throws Exception {
        OperatorSubtaskState snapshot;
        StreamReadMonitoringFunction monitorFunc;
        Throwable th;
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        StreamReadMonitoringFunction monitorFunc2 = TestUtils.getMonitorFunc(this.conf);
        AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> createHarness = createHarness(monitorFunc2);
        Throwable th2 = null;
        try {
            try {
                createHarness.setup();
                createHarness.open();
                CountDownLatch countDownLatch = new CountDownLatch(4);
                CollectingSourceContext collectingSourceContext = new CollectingSourceContext(countDownLatch);
                runAsync(collectingSourceContext, monitorFunc2);
                Assertions.assertTrue(countDownLatch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation");
                Thread.sleep(1000L);
                snapshot = createHarness.snapshot(1L, 1L);
                monitorFunc2.close();
                Assertions.assertTrue(countDownLatch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation");
                MatcherAssert.assertThat("Should produce the expected splits", collectingSourceContext.getPartitionPaths(), CoreMatchers.is("par1,par2,par3,par4"));
                Assertions.assertTrue(collectingSourceContext.splits.stream().allMatch(mergeOnReadInputSplit -> {
                    return mergeOnReadInputSplit.getInstantRange().isPresent();
                }), "All instants should have range limit");
                if (createHarness != null) {
                    if (0 != 0) {
                        try {
                            createHarness.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        createHarness.close();
                    }
                }
                TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
                monitorFunc = TestUtils.getMonitorFunc(this.conf);
                createHarness = createHarness(monitorFunc);
                th = null;
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
            try {
                try {
                    createHarness.setup();
                    createHarness.initializeState(snapshot);
                    createHarness.open();
                    CountDownLatch countDownLatch2 = new CountDownLatch(4);
                    CollectingSourceContext collectingSourceContext2 = new CollectingSourceContext(countDownLatch2);
                    runAsync(collectingSourceContext2, monitorFunc);
                    monitorFunc2.close();
                    Assertions.assertTrue(countDownLatch2.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation");
                    MatcherAssert.assertThat("Should produce the expected splits", collectingSourceContext2.getPartitionPaths(), CoreMatchers.is("par1,par2,par3,par4"));
                    Assertions.assertTrue(collectingSourceContext2.splits.stream().allMatch(mergeOnReadInputSplit2 -> {
                        return mergeOnReadInputSplit2.getInstantRange().isPresent();
                    }), "All the instants should have range limit");
                    if (createHarness != null) {
                        if (0 == 0) {
                            createHarness.close();
                            return;
                        }
                        try {
                            createHarness.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                } catch (Throwable th6) {
                    th = th6;
                    throw th6;
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testStopWithSavepointAndRestore() throws Exception {
        OperatorSubtaskState snapshot;
        StreamReadMonitoringFunction monitorFunc;
        Throwable th;
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        this.conf.setString(FlinkOptions.READ_START_COMMIT, "earliest");
        StreamReadMonitoringFunction monitorFunc2 = TestUtils.getMonitorFunc(this.conf);
        AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> createHarness = createHarness(monitorFunc2);
        Throwable th2 = null;
        try {
            try {
                createHarness.setup();
                createHarness.open();
                CountDownLatch countDownLatch = new CountDownLatch(4);
                CollectingSourceContext collectingSourceContext = new CollectingSourceContext(countDownLatch);
                runAsync(collectingSourceContext, monitorFunc2);
                Assertions.assertTrue(countDownLatch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation");
                Thread.sleep(1000L);
                monitorFunc2.cancel();
                snapshot = createHarness.snapshot(1L, 1L);
                monitorFunc2.close();
                Assertions.assertTrue(countDownLatch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation");
                MatcherAssert.assertThat("Should produce the expected splits", collectingSourceContext.getPartitionPaths(), CoreMatchers.is("par1,par2,par3,par4"));
                Assertions.assertTrue(collectingSourceContext.splits.stream().noneMatch(mergeOnReadInputSplit -> {
                    return mergeOnReadInputSplit.getInstantRange().isPresent();
                }), "All instants should have range limit");
                if (createHarness != null) {
                    if (0 != 0) {
                        try {
                            createHarness.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        createHarness.close();
                    }
                }
                TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, this.conf);
                monitorFunc = TestUtils.getMonitorFunc(this.conf);
                createHarness = createHarness(monitorFunc);
                th = null;
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
            try {
                try {
                    createHarness.setup();
                    createHarness.initializeState(snapshot);
                    createHarness.open();
                    CountDownLatch countDownLatch2 = new CountDownLatch(4);
                    CollectingSourceContext collectingSourceContext2 = new CollectingSourceContext(countDownLatch2);
                    runAsync(collectingSourceContext2, monitorFunc);
                    monitorFunc2.close();
                    Assertions.assertTrue(countDownLatch2.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation");
                    MatcherAssert.assertThat("Should produce the expected splits", collectingSourceContext2.getPartitionPaths(), CoreMatchers.is("par1,par2,par3,par4"));
                    Assertions.assertTrue(collectingSourceContext2.splits.stream().allMatch(mergeOnReadInputSplit2 -> {
                        return mergeOnReadInputSplit2.getInstantRange().isPresent();
                    }), "All the instants should have range limit");
                    if (createHarness != null) {
                        if (0 == 0) {
                            createHarness.close();
                            return;
                        }
                        try {
                            createHarness.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                } catch (Throwable th6) {
                    th = th6;
                    throw th6;
                }
            } finally {
            }
        } finally {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean isPointInstantRange(InstantRange instantRange, String str) {
        return instantRange != null && Objects.equals(str, instantRange.getStartInstant()) && Objects.equals(str, instantRange.getEndInstant());
    }

    private AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> createHarness(StreamReadMonitoringFunction streamReadMonitoringFunction) throws Exception {
        return new AbstractStreamOperatorTestHarness<>(new StreamSource(streamReadMonitoringFunction), 1, 1, 0);
    }

    private void runAsync(CollectingSourceContext collectingSourceContext, StreamReadMonitoringFunction streamReadMonitoringFunction) {
        new Thread(() -> {
            try {
                streamReadMonitoringFunction.run(collectingSourceContext);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }).start();
    }
}
