package org.apache.hudi.sink;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.util.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.utils.MockCoordinatorExecutor;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mockito;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.class */
public class TestStreamWriteOperatorCoordinator {
    private StreamWriteOperatorCoordinator coordinator;

    @TempDir
    File tempFile;

    @BeforeEach
    public void before() throws Exception {
        MockOperatorCoordinatorContext mockOperatorCoordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 2);
        this.coordinator = new StreamWriteOperatorCoordinator(TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath()), mockOperatorCoordinatorContext);
        this.coordinator.start();
        this.coordinator.setExecutor(new MockCoordinatorExecutor(mockOperatorCoordinatorContext));
        this.coordinator.handleEventFromOperator(0, WriteMetadataEvent.emptyBootstrap(0));
        this.coordinator.handleEventFromOperator(1, WriteMetadataEvent.emptyBootstrap(1));
    }

    @AfterEach
    public void after() throws Exception {
        this.coordinator.close();
    }

    @Test
    void testInstantState() {
        String instant = this.coordinator.getInstant();
        Assertions.assertNotEquals("", instant);
        WriteMetadataEvent createOperatorEvent = createOperatorEvent(0, instant, "par1", true, 0.1d);
        WriteMetadataEvent createOperatorEvent2 = createOperatorEvent(1, instant, "par2", false, 0.2d);
        this.coordinator.handleEventFromOperator(0, createOperatorEvent);
        this.coordinator.handleEventFromOperator(1, createOperatorEvent2);
        this.coordinator.notifyCheckpointComplete(1L);
        String lastPendingInstant = TestUtils.getLastPendingInstant(this.tempFile.getAbsolutePath());
        MatcherAssert.assertThat("Instant should be complete", TestUtils.getLastCompleteInstant(this.tempFile.getAbsolutePath()), CoreMatchers.is(instant));
        Assertions.assertNotEquals("", lastPendingInstant, "Should start a new instant");
        Assertions.assertNotEquals(instant, lastPendingInstant, "Should start a new instant");
    }

    @Test
    public void testTableInitialized() throws IOException {
        Configuration hadoopConf = HadoopConfigurations.getHadoopConf(new org.apache.flink.configuration.Configuration());
        String absolutePath = this.tempFile.getAbsolutePath();
        FileSystem fs = FSUtils.getFs(absolutePath, hadoopConf);
        Throwable th = null;
        try {
            try {
                Assertions.assertTrue(fs.exists(new Path(absolutePath, ".hoodie")));
                if (fs != null) {
                    if (0 == 0) {
                        fs.close();
                        return;
                    }
                    try {
                        fs.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (fs != null) {
                if (th != null) {
                    try {
                        fs.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    fs.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testCheckpointAndRestore() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        this.coordinator.checkpointCoordinator(1L, completableFuture);
        this.coordinator.resetToCheckpoint(1L, (byte[]) completableFuture.get());
    }

    @Test
    public void testReceiveInvalidEvent() {
        this.coordinator.checkpointCoordinator(1L, new CompletableFuture());
        WriteMetadataEvent build = WriteMetadataEvent.builder().taskID(0).instantTime("abc").writeStatus(Collections.emptyList()).build();
        assertError(() -> {
            this.coordinator.handleEventFromOperator(0, build);
        }, "Receive an unexpected event for instant abc from task 0");
    }

    @Test
    public void testCheckpointCompleteWithPartialEvents() {
        this.coordinator.checkpointCoordinator(1L, new CompletableFuture());
        String instant = this.coordinator.getInstant();
        this.coordinator.handleEventFromOperator(0, WriteMetadataEvent.builder().taskID(0).instantTime(instant).writeStatus(Collections.emptyList()).build());
        Assertions.assertDoesNotThrow(() -> {
            this.coordinator.notifyCheckpointComplete(1L);
        }, "Returns early for empty write results");
        Assertions.assertNull(TestUtils.getLastCompleteInstant(this.tempFile.getAbsolutePath()), "Returns early for empty write results");
        Assertions.assertNull(this.coordinator.getEventBuffer()[0]);
        this.coordinator.handleEventFromOperator(1, createOperatorEvent(1, instant, "par2", false, 0.2d));
        Assertions.assertDoesNotThrow(() -> {
            this.coordinator.notifyCheckpointComplete(2L);
        }, "Commits the instant with partial events anyway");
        MatcherAssert.assertThat("Commits the instant with partial events anyway", TestUtils.getLastCompleteInstant(this.tempFile.getAbsolutePath()), CoreMatchers.is(instant));
    }

    @Test
    public void testRecommitWithPartialUncommittedEvents() {
        this.coordinator.checkpointCoordinator(1L, new CompletableFuture());
        String instant = this.coordinator.getInstant();
        Assertions.assertNull(TestUtils.getLastCompleteInstant(this.tempFile.getAbsolutePath()), "Returns early for empty write results");
        WriteMetadataEvent createOperatorEvent = createOperatorEvent(0, instant, "par1", false, 0.2d);
        createOperatorEvent.setBootstrap(true);
        WriteMetadataEvent emptyBootstrap = WriteMetadataEvent.emptyBootstrap(1);
        this.coordinator.handleEventFromOperator(0, createOperatorEvent);
        this.coordinator.handleEventFromOperator(1, emptyBootstrap);
        MatcherAssert.assertThat("Recommits the instant with partial uncommitted events", TestUtils.getLastCompleteInstant(this.tempFile.getAbsolutePath()), CoreMatchers.is(instant));
    }

    @Test
    public void testHiveSyncInvoked() throws Exception {
        reset();
        org.apache.flink.configuration.Configuration defaultConf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        defaultConf.setBoolean(FlinkOptions.HIVE_SYNC_ENABLED, true);
        MockOperatorCoordinatorContext mockOperatorCoordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1);
        this.coordinator = new StreamWriteOperatorCoordinator(defaultConf, mockOperatorCoordinatorContext);
        this.coordinator.start();
        this.coordinator.setExecutor(new MockCoordinatorExecutor(mockOperatorCoordinatorContext));
        this.coordinator.handleEventFromOperator(0, WriteMetadataEvent.emptyBootstrap(0));
        Assertions.assertNotEquals("", mockWriteWithMetadata());
        Assertions.assertDoesNotThrow(() -> {
            this.coordinator.notifyCheckpointComplete(1L);
        });
    }

    @Test
    void testSyncMetadataTable() throws Exception {
        reset();
        org.apache.flink.configuration.Configuration defaultConf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        defaultConf.setBoolean(FlinkOptions.METADATA_ENABLED, true);
        defaultConf.setInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS, 5);
        MockOperatorCoordinatorContext mockOperatorCoordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1);
        this.coordinator = new StreamWriteOperatorCoordinator(defaultConf, mockOperatorCoordinatorContext);
        this.coordinator.start();
        this.coordinator.setExecutor(new MockCoordinatorExecutor(mockOperatorCoordinatorContext));
        this.coordinator.handleEventFromOperator(0, WriteMetadataEvent.emptyBootstrap(0));
        String instant = this.coordinator.getInstant();
        Assertions.assertNotEquals("", instant);
        HoodieTableMetaClient createMetaClient = StreamerUtil.createMetaClient(HoodieTableMetadata.getMetadataTableBasePath(this.tempFile.getAbsolutePath()), HadoopConfigurations.getHadoopConf(defaultConf));
        HoodieTimeline filterCompletedInstants = createMetaClient.getActiveTimeline().filterCompletedInstants();
        MatcherAssert.assertThat("One instant need to sync to metadata table", Long.valueOf(filterCompletedInstants.getInstants().count()), CoreMatchers.is(1L));
        MatcherAssert.assertThat(((HoodieInstant) filterCompletedInstants.lastInstant().get()).getTimestamp(), CoreMatchers.is("00000000000000"));
        for (int i = 1; i < 5; i++) {
            instant = mockWriteWithMetadata();
            createMetaClient.reloadActiveTimeline();
            HoodieTimeline filterCompletedInstants2 = createMetaClient.getActiveTimeline().filterCompletedInstants();
            MatcherAssert.assertThat("One instant need to sync to metadata table", Long.valueOf(filterCompletedInstants2.getInstants().count()), CoreMatchers.is(Long.valueOf(i + 1)));
            MatcherAssert.assertThat(((HoodieInstant) filterCompletedInstants2.lastInstant().get()).getTimestamp(), CoreMatchers.is(instant));
        }
        mockWriteWithMetadata();
        createMetaClient.reloadActiveTimeline();
        HoodieTimeline filterCompletedAndCompactionInstants = createMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
        MatcherAssert.assertThat("One instant need to sync to metadata table", Long.valueOf(filterCompletedAndCompactionInstants.getInstants().count()), CoreMatchers.is(7L));
        MatcherAssert.assertThat(((HoodieInstant) filterCompletedAndCompactionInstants.nthFromLastInstant(1).get()).getTimestamp(), CoreMatchers.is(instant + "001"));
        MatcherAssert.assertThat(((HoodieInstant) filterCompletedAndCompactionInstants.nthFromLastInstant(1).get()).getAction(), CoreMatchers.is("commit"));
        for (int i2 = 7; i2 < 8; i2++) {
            String mockWriteWithMetadata = mockWriteWithMetadata();
            createMetaClient.reloadActiveTimeline();
            HoodieTimeline filterCompletedInstants3 = createMetaClient.getActiveTimeline().filterCompletedInstants();
            MatcherAssert.assertThat("One instant need to sync to metadata table", Long.valueOf(filterCompletedInstants3.getInstants().count()), CoreMatchers.is(Long.valueOf(i2 + 1)));
            MatcherAssert.assertThat(((HoodieInstant) filterCompletedInstants3.lastInstant().get()).getTimestamp(), CoreMatchers.is(mockWriteWithMetadata));
        }
        String mockWriteWithMetadata2 = mockWriteWithMetadata();
        createMetaClient.reloadActiveTimeline();
        HoodieTimeline filterCompletedAndCompactionInstants2 = createMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
        MatcherAssert.assertThat("One instant need to sync to metadata table", Long.valueOf(filterCompletedAndCompactionInstants2.getInstants().count()), CoreMatchers.is(10L));
        MatcherAssert.assertThat(((HoodieInstant) filterCompletedAndCompactionInstants2.lastInstant().get()).getTimestamp(), CoreMatchers.is(mockWriteWithMetadata2 + "002"));
        MatcherAssert.assertThat(((HoodieInstant) filterCompletedAndCompactionInstants2.lastInstant().get()).getAction(), CoreMatchers.is("clean"));
        mockWriteWithMetadata();
        String mockWriteWithMetadata3 = mockWriteWithMetadata();
        mockWriteWithMetadata();
        createMetaClient.reloadActiveTimeline();
        HoodieTimeline filterCompletedAndCompactionInstants3 = createMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
        MatcherAssert.assertThat("One instant need to sync to metadata table", Long.valueOf(filterCompletedAndCompactionInstants3.getInstants().count()), CoreMatchers.is(14L));
        MatcherAssert.assertThat(((HoodieInstant) filterCompletedAndCompactionInstants3.nthFromLastInstant(1).get()).getTimestamp(), CoreMatchers.is(mockWriteWithMetadata3 + "001"));
        MatcherAssert.assertThat(((HoodieInstant) filterCompletedAndCompactionInstants3.nthFromLastInstant(1).get()).getAction(), CoreMatchers.is("commit"));
    }

    @Test
    void testSyncMetadataTableWithReusedInstant() throws Exception {
        reset();
        org.apache.flink.configuration.Configuration defaultConf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        defaultConf.setBoolean(FlinkOptions.METADATA_ENABLED, true);
        MockOperatorCoordinatorContext mockOperatorCoordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1);
        this.coordinator = new StreamWriteOperatorCoordinator(defaultConf, mockOperatorCoordinatorContext);
        this.coordinator.start();
        this.coordinator.setExecutor(new MockCoordinatorExecutor(mockOperatorCoordinatorContext));
        this.coordinator.handleEventFromOperator(0, WriteMetadataEvent.emptyBootstrap(0));
        Assertions.assertNotEquals("", this.coordinator.getInstant());
        HoodieTableMetaClient createMetaClient = StreamerUtil.createMetaClient(HoodieTableMetadata.getMetadataTableBasePath(this.tempFile.getAbsolutePath()), HadoopConfigurations.getHadoopConf(defaultConf));
        HoodieTimeline filterCompletedInstants = createMetaClient.getActiveTimeline().filterCompletedInstants();
        MatcherAssert.assertThat("One instant need to sync to metadata table", Long.valueOf(filterCompletedInstants.getInstants().count()), CoreMatchers.is(1L));
        MatcherAssert.assertThat(((HoodieInstant) filterCompletedInstants.lastInstant().get()).getTimestamp(), CoreMatchers.is("00000000000000"));
        mockWriteWithMetadata();
        String instant = this.coordinator.getInstant();
        createMetaClient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, "deltacommit", instant));
        createMetaClient.getActiveTimeline().transitionRequestedToInflight("deltacommit", instant);
        createMetaClient.reloadActiveTimeline();
        String mockWriteWithMetadata = mockWriteWithMetadata();
        createMetaClient.reloadActiveTimeline();
        HoodieTimeline filterCompletedInstants2 = createMetaClient.getActiveTimeline().filterCompletedInstants();
        MatcherAssert.assertThat("One instant need to sync to metadata table", Long.valueOf(filterCompletedInstants2.getInstants().count()), CoreMatchers.is(3L));
        MatcherAssert.assertThat(((HoodieInstant) filterCompletedInstants2.lastInstant().get()).getTimestamp(), CoreMatchers.is(mockWriteWithMetadata));
    }

    @Test
    public void testEndInputIsTheLastEvent() throws Exception {
        org.apache.flink.configuration.Configuration defaultConf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        MockOperatorCoordinatorContext mockOperatorCoordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1);
        NonThrownExecutor build = NonThrownExecutor.builder((Logger) Mockito.mock(Logger.class)).waitForTasksFinish(true).build();
        StreamWriteOperatorCoordinator streamWriteOperatorCoordinator = new StreamWriteOperatorCoordinator(defaultConf, mockOperatorCoordinatorContext);
        Throwable th = null;
        try {
            try {
                streamWriteOperatorCoordinator.start();
                streamWriteOperatorCoordinator.setExecutor(build);
                streamWriteOperatorCoordinator.handleEventFromOperator(0, WriteMetadataEvent.emptyBootstrap(0));
                TimeUnit.SECONDS.sleep(5L);
                for (int i = 0; i < 20000; i++) {
                    streamWriteOperatorCoordinator.handleEventFromOperator(0, createOperatorEvent(0, streamWriteOperatorCoordinator.getInstant(), "par1", true, 0.1d));
                }
                streamWriteOperatorCoordinator.handleEventFromOperator(0, WriteMetadataEvent.builder().taskID(0).instantTime(streamWriteOperatorCoordinator.getInstant()).writeStatus(Collections.emptyList()).endInput(true).build());
                build.close();
                Assertions.assertNull(streamWriteOperatorCoordinator.getEventBuffer()[0]);
                if (streamWriteOperatorCoordinator != null) {
                    if (0 == 0) {
                        streamWriteOperatorCoordinator.close();
                        return;
                    }
                    try {
                        streamWriteOperatorCoordinator.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (streamWriteOperatorCoordinator != null) {
                if (th != null) {
                    try {
                        streamWriteOperatorCoordinator.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    streamWriteOperatorCoordinator.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testLockForMetadataTable() throws Exception {
        reset();
        org.apache.flink.configuration.Configuration defaultConf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        defaultConf.setBoolean(FlinkOptions.METADATA_ENABLED, true);
        defaultConf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), "optimistic_concurrency_control");
        defaultConf.setInteger("hoodie.write.lock.client.num_retries", 1);
        MockOperatorCoordinatorContext mockOperatorCoordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1);
        this.coordinator = new StreamWriteOperatorCoordinator(defaultConf, mockOperatorCoordinatorContext);
        this.coordinator.start();
        this.coordinator.setExecutor(new MockCoordinatorExecutor(mockOperatorCoordinatorContext));
        this.coordinator.handleEventFromOperator(0, WriteMetadataEvent.emptyBootstrap(0));
        Assertions.assertNotEquals("", this.coordinator.getInstant());
        HoodieTableMetaClient createMetaClient = StreamerUtil.createMetaClient(HoodieTableMetadata.getMetadataTableBasePath(this.tempFile.getAbsolutePath()), HadoopConfigurations.getHadoopConf(defaultConf));
        HoodieTimeline filterCompletedInstants = createMetaClient.getActiveTimeline().filterCompletedInstants();
        MatcherAssert.assertThat("One instant need to sync to metadata table", Integer.valueOf(filterCompletedInstants.countInstants()), CoreMatchers.is(1));
        MatcherAssert.assertThat(((HoodieInstant) filterCompletedInstants.lastInstant().get()).getTimestamp(), CoreMatchers.is("00000000000000"));
        String mockWriteWithMetadata = mockWriteWithMetadata();
        createMetaClient.reloadActiveTimeline();
        HoodieTimeline filterCompletedInstants2 = createMetaClient.getActiveTimeline().filterCompletedInstants();
        MatcherAssert.assertThat("One instant need to sync to metadata table", Integer.valueOf(filterCompletedInstants2.countInstants()), CoreMatchers.is(2));
        MatcherAssert.assertThat(((HoodieInstant) filterCompletedInstants2.lastInstant().get()).getTimestamp(), CoreMatchers.is(mockWriteWithMetadata));
    }

    private String mockWriteWithMetadata() {
        String instant = this.coordinator.getInstant();
        this.coordinator.handleEventFromOperator(0, createOperatorEvent(0, instant, "par1", true, 0.1d));
        this.coordinator.notifyCheckpointComplete(0L);
        return instant;
    }

    private static WriteMetadataEvent createOperatorEvent(int i, String str, String str2, boolean z, double d) {
        WriteStatus writeStatus = new WriteStatus(Boolean.valueOf(z), Double.valueOf(d));
        writeStatus.setPartitionPath(str2);
        HoodieWriteStat hoodieWriteStat = new HoodieWriteStat();
        hoodieWriteStat.setPartitionPath(str2);
        hoodieWriteStat.setFileId("fileId123");
        hoodieWriteStat.setPath("path123");
        hoodieWriteStat.setFileSizeInBytes(123L);
        hoodieWriteStat.setTotalWriteBytes(123L);
        hoodieWriteStat.setNumWrites(1L);
        writeStatus.setStat(hoodieWriteStat);
        return WriteMetadataEvent.builder().taskID(i).instantTime(str).writeStatus(Collections.singletonList(writeStatus)).lastBatch(true).build();
    }

    private void reset() throws Exception {
        FileUtils.cleanDirectory(this.tempFile);
    }

    private void assertError(Runnable runnable, String str) {
        runnable.run();
        MatcherAssert.assertThat(this.coordinator.getContext(), CoreMatchers.instanceOf(MockOperatorCoordinatorContext.class));
        Assertions.assertTrue(this.coordinator.getContext().isJobFailed(), str);
    }
}
