package org.apache.hudi.sink;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.utils.TestWriteBase;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/hudi/sink/TestWriteCopyOnWrite.class */
public class TestWriteCopyOnWrite extends TestWriteBase {
    protected Configuration conf;

    @TempDir
    File tempFile;

    @BeforeEach
    public void before() {
        this.conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        this.conf.setString(FlinkOptions.TABLE_TYPE, getTableType().name());
        setUp(this.conf);
    }

    protected void setUp(Configuration configuration) {
    }

    @Test
    public void testCheckpoint() throws Exception {
        preparePipeline().consume(TestData.DATA_SET_INSERT).emptyEventBuffer().checkpoint(1L).assertNextEvent(4, "par1,par2,par3,par4").checkpointComplete(1L).checkpoint(2L).assertEmptyEvent().emptyCheckpoint(2L).end();
    }

    @Test
    public void testCheckpointFails() throws Exception {
        this.conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 1L);
        preparePipeline(this.conf).checkpoint(1L).assertEmptyEvent().checkpointFails(1L).consume(TestData.DATA_SET_INSERT).checkpointFails(2L).end();
    }

    @Test
    public void testSubtaskFails() throws Exception {
        preparePipeline().checkpoint(1L).assertEmptyEvent().subTaskFails(0).noCompleteInstant().end();
    }

    @Test
    public void testInsert() throws Exception {
        preparePipeline().consume(TestData.DATA_SET_INSERT).assertEmptyDataFiles().checkpoint(1L).assertNextEvent().checkpointComplete(1L).checkWrittenData(EXPECTED1).end();
    }

    @Test
    public void testInsertDuplicates() throws Exception {
        this.conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
        preparePipeline(this.conf).consume(TestData.DATA_SET_INSERT_DUPLICATES).assertEmptyDataFiles().checkpoint(1L).assertNextEvent().checkpointComplete(1L).checkWrittenData(EXPECTED3, 1).consume(TestData.DATA_SET_INSERT_DUPLICATES).checkpoint(2L).assertNextEvent().checkpointComplete(2L).checkWrittenData(EXPECTED3, 1).end();
    }

    @Test
    public void testUpsert() throws Exception {
        preparePipeline().consume(TestData.DATA_SET_INSERT).assertEmptyDataFiles().checkpoint(1L).assertNextEvent().checkpointComplete(1L).consume(TestData.DATA_SET_UPDATE_INSERT).checkWrittenData(EXPECTED1).checkpoint(2L).assertNextEvent().checkpointComplete(2L).checkWrittenData(EXPECTED2).end();
    }

    @Test
    public void testUpsertWithDelete() throws Exception {
        preparePipeline().consume(TestData.DATA_SET_INSERT).assertEmptyDataFiles().checkpoint(1L).assertNextEvent().checkpointComplete(1L).consume(TestData.DATA_SET_UPDATE_DELETE).checkWrittenData(EXPECTED1).checkpoint(2L).assertNextEvent().checkpointComplete(2L).checkWrittenData(getUpsertWithDeleteExpected()).end();
    }

    @Test
    public void testInsertWithMiniBatches() throws Exception {
        this.conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 8.0E-4d);
        Map<String, String> miniBatchExpected = getMiniBatchExpected();
        preparePipeline(this.conf).consume(TestData.DATA_SET_INSERT_DUPLICATES).assertDataBuffer(1, 2).checkpoint(1L).allDataFlushed().handleEvents(2).checkpointComplete(1L).checkWrittenData(miniBatchExpected, 1).consume(TestData.DATA_SET_INSERT_DUPLICATES).checkpoint(2L).handleEvents(2).checkpointComplete(2L).checkWrittenData(miniBatchExpected, 1).end();
    }

    @Test
    public void testInsertWithDeduplication() throws Exception {
        this.conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 8.0E-4d);
        this.conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
        HashMap hashMap = new HashMap();
        hashMap.put("par1", "[id1,par1,id1,Danny,23,4,par1]");
        preparePipeline(this.conf).consume(TestData.DATA_SET_INSERT_SAME_KEY).assertDataBuffer(1, 2).checkpoint(1L).allDataFlushed().handleEvents(2).checkpointComplete(1L).checkWrittenData(hashMap, 1).consume(TestData.DATA_SET_INSERT_SAME_KEY).checkpoint(2L).handleEvents(2).checkpointComplete(2L).checkWrittenData(hashMap, 1).end();
    }

    @Test
    public void testInsertAppendMode() throws Exception {
        prepareInsertPipeline().consume(TestData.DATA_SET_INSERT_SAME_KEY).checkpoint(1L).assertNextEvent().checkpointComplete(1L).checkWrittenAllData(EXPECTED4, 1).consume(TestData.DATA_SET_INSERT_SAME_KEY).checkpoint(2L).assertNextEvent().checkpointComplete(2L).checkWrittenDataCOW(EXPECTED5).end();
    }

    @Test
    public void testInsertClustering() throws Exception {
        this.conf.setString(FlinkOptions.OPERATION, "insert");
        this.conf.setBoolean(FlinkOptions.INSERT_CLUSTER, true);
        this.conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0008d);
        TestWriteBase.TestHarness.instance().preparePipeline(this.tempFile, this.conf).consume(TestData.DATA_SET_INSERT_SAME_KEY).assertDataBuffer(1, 2).checkpoint(1L).allDataFlushed().handleEvents(2).checkpointComplete(1L).checkWrittenData(EXPECTED4, 1).consume(TestData.DATA_SET_INSERT_SAME_KEY).checkpoint(2L).handleEvents(2).checkpointComplete(2L).checkWrittenDataCOW(EXPECTED5).end();
    }

    @Test
    public void testInsertAsyncClustering() throws Exception {
        this.conf.setString(FlinkOptions.OPERATION, "insert");
        this.conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
        this.conf.setBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true);
        this.conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1);
        prepareInsertPipeline(this.conf).consume(TestData.DATA_SET_INSERT_SAME_KEY).checkpoint(1L).handleEvents(1).checkpointComplete(1L).checkWrittenData(EXPECTED4, 1).consume(TestData.DATA_SET_INSERT_SAME_KEY).checkpoint(2L).handleEvents(1).checkpointComplete(2L).checkWrittenDataCOW(EXPECTED5).end();
    }

    @Test
    public void testInsertWithSmallBufferSize() throws Exception {
        this.conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0008d);
        Map<String, String> miniBatchExpected = getMiniBatchExpected();
        preparePipeline(this.conf).consume(TestData.DATA_SET_INSERT_DUPLICATES).assertDataBuffer(1, 2).checkpoint(1L).allDataFlushed().handleEvents(2).checkpointComplete(1L).checkWrittenData(miniBatchExpected, 1).consume(TestData.DATA_SET_INSERT_DUPLICATES).checkpoint(2L).handleEvents(2).checkpointComplete(2L).checkWrittenData(miniBatchExpected, 1).end();
    }

    protected Map<String, String> getMiniBatchExpected() {
        HashMap hashMap = new HashMap();
        hashMap.put("par1", "[id1,par1,id1,Danny,23,1,par1, id1,par1,id1,Danny,23,1,par1, id1,par1,id1,Danny,23,1,par1]");
        return hashMap;
    }

    protected Map<String, String> getUpsertWithDeleteExpected() {
        HashMap hashMap = new HashMap();
        hashMap.put("par1", "[id1,par1,id1,Danny,24,1,par1, id2,par1,id2,Stephen,34,2,par1]");
        hashMap.put("par2", "[id4,par2,id4,Fabian,31,4,par2]");
        hashMap.put("par3", "[id6,par3,id6,Emma,20,6,par3]");
        hashMap.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
        return hashMap;
    }

    protected Map<String, String> getExpectedBeforeCheckpointComplete() {
        return EXPECTED2;
    }

    @Test
    public void testIndexStateBootstrap() throws Exception {
        preparePipeline().consume(TestData.DATA_SET_INSERT).assertEmptyDataFiles().checkpoint(1L).assertNextEvent().checkpointComplete(1L).checkWrittenData(EXPECTED1, 4).end();
        this.conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
        validateIndexLoaded();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void validateIndexLoaded() throws Exception {
        preparePipeline(this.conf).consume(TestData.DATA_SET_UPDATE_INSERT).checkIndexLoaded(new HoodieKey("id1", "par1"), new HoodieKey("id2", "par1"), new HoodieKey("id3", "par2"), new HoodieKey("id4", "par2"), new HoodieKey("id5", "par3"), new HoodieKey("id6", "par3"), new HoodieKey("id7", "par4"), new HoodieKey("id8", "par4"), new HoodieKey("id9", "par3"), new HoodieKey("id10", "par4"), new HoodieKey("id11", "par4")).checkpoint(1L).assertBootstrapped().assertNextEvent().checkWrittenData(getExpectedBeforeCheckpointComplete()).checkpointComplete(1L).checkWrittenData(EXPECTED2).end();
    }

    @Test
    public void testWriteExactlyOnce() throws Exception {
        this.conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 1L);
        this.conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006d);
        preparePipeline(this.conf).consume(TestData.DATA_SET_INSERT).emptyEventBuffer().checkpoint(1L).assertConfirming().handleEvents(4).checkpointComplete(1L).consume(TestData.DATA_SET_INSERT).assertNotConfirming().checkpoint(2L).assertConsumeThrows(TestData.DATA_SET_INSERT, "Timeout(1000ms) while waiting for instant initialize").end();
    }

    @Test
    public void testReuseEmbeddedServer() throws IOException {
        this.conf.setInteger("hoodie.filesystem.view.remote.timeout.secs", 500);
        FileSystemViewStorageConfig viewStorageConfig = FlinkWriteClients.createWriteClient(this.conf).getConfig().getViewStorageConfig();
        Assertions.assertSame(viewStorageConfig.getStorageType(), FileSystemViewStorageType.REMOTE_FIRST);
        HoodieFlinkWriteClient createWriteClient = FlinkWriteClients.createWriteClient(this.conf);
        Assertions.assertSame(createWriteClient.getConfig().getViewStorageConfig().getStorageType(), FileSystemViewStorageType.REMOTE_FIRST);
        Assertions.assertEquals(viewStorageConfig.getRemoteViewServerPort(), createWriteClient.getConfig().getViewStorageConfig().getRemoteViewServerPort());
        Assertions.assertEquals(viewStorageConfig.getRemoteTimelineClientTimeoutSecs(), 500);
    }

    private TestWriteBase.TestHarness preparePipeline() throws Exception {
        return preparePipeline(this.conf);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public TestWriteBase.TestHarness preparePipeline(Configuration configuration) throws Exception {
        return TestWriteBase.TestHarness.instance().preparePipeline(this.tempFile, configuration);
    }

    protected TestWriteBase.TestHarness prepareInsertPipeline() throws Exception {
        return prepareInsertPipeline(this.conf);
    }

    protected TestWriteBase.TestHarness prepareInsertPipeline(Configuration configuration) throws Exception {
        return TestWriteBase.TestHarness.instance().preparePipeline(this.tempFile, configuration, true);
    }

    protected HoodieTableType getTableType() {
        return HoodieTableType.COPY_ON_WRITE;
    }
}
