package org.apache.hudi.sink;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.class */
public class TestStreamWriteOperatorCoordinator {
    private StreamWriteOperatorCoordinator coordinator;

    @TempDir
    File tempFile;

    @BeforeEach
    public void before() throws Exception {
        this.coordinator = new StreamWriteOperatorCoordinator(TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath()), 2);
        this.coordinator.start();
    }

    @AfterEach
    public void after() {
        this.coordinator.close();
    }

    @Test
    void testInstantState() {
        String instant = this.coordinator.getInstant();
        Assertions.assertNotEquals("", instant);
        WriteStatus writeStatus = new WriteStatus(true, Double.valueOf(0.1d));
        writeStatus.setPartitionPath("par1");
        writeStatus.setStat(new HoodieWriteStat());
        BatchWriteSuccessEvent build = BatchWriteSuccessEvent.builder().taskID(0).instantTime(instant).writeStatus(Collections.singletonList(writeStatus)).isLastBatch(true).build();
        WriteStatus writeStatus2 = new WriteStatus(false, Double.valueOf(0.2d));
        writeStatus2.setPartitionPath("par2");
        writeStatus2.setStat(new HoodieWriteStat());
        BatchWriteSuccessEvent build2 = BatchWriteSuccessEvent.builder().taskID(1).instantTime(instant).writeStatus(Collections.singletonList(writeStatus2)).isLastBatch(true).build();
        this.coordinator.handleEventFromOperator(0, build);
        this.coordinator.handleEventFromOperator(1, build2);
        this.coordinator.checkpointComplete(1L);
        String inflightAndRequestedInstant = this.coordinator.getWriteClient().getInflightAndRequestedInstant("COPY_ON_WRITE");
        MatcherAssert.assertThat("Instant should be complete", this.coordinator.getWriteClient().getLastCompletedInstant("COPY_ON_WRITE"), CoreMatchers.is(instant));
        Assertions.assertNotEquals("", inflightAndRequestedInstant, "Should start a new instant");
        Assertions.assertNotEquals(instant, inflightAndRequestedInstant, "Should start a new instant");
    }

    @Test
    public void testTableInitialized() throws IOException {
        Configuration hadoopConf = StreamerUtil.getHadoopConf();
        String absolutePath = this.tempFile.getAbsolutePath();
        FileSystem fs = FSUtils.getFs(absolutePath, hadoopConf);
        Throwable th = null;
        try {
            Assertions.assertTrue(fs.exists(new Path(absolutePath, ".hoodie")));
            if (fs != null) {
                if (0 == 0) {
                    fs.close();
                    return;
                }
                try {
                    fs.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (fs != null) {
                if (0 != 0) {
                    try {
                        fs.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fs.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testCheckpointAndRestore() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        this.coordinator.checkpointCoordinator(1L, completableFuture);
        this.coordinator.resetToCheckpoint((byte[]) completableFuture.get());
    }

    @Test
    public void testReceiveInvalidEvent() {
        this.coordinator.checkpointCoordinator(1L, new CompletableFuture());
        BatchWriteSuccessEvent build = BatchWriteSuccessEvent.builder().taskID(0).instantTime("abc").writeStatus(Collections.emptyList()).build();
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.coordinator.handleEventFromOperator(0, build);
        }, "Receive an unexpected event for instant abc from task 0");
    }

    @Test
    public void testCheckpointCompleteWithRetry() {
        this.coordinator.checkpointCoordinator(1L, new CompletableFuture());
        this.coordinator.handleEventFromOperator(0, BatchWriteSuccessEvent.builder().taskID(0).instantTime(this.coordinator.getInstant()).writeStatus(Collections.emptyList()).build());
        Assertions.assertThrows(HoodieException.class, () -> {
            this.coordinator.checkpointComplete(1L);
        }, "Try 3 to commit instant");
    }
}
