/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.sink.utils.TestWriteBase;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.utils.TestData;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;

public class TestWriteCopyOnWrite
extends TestWriteBase {
    private static final double BATCH_SIZE_MB = 0.0016;

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testCheckpoint(boolean allowEmptyCommit) throws Exception {
        this.conf.setBoolean(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), allowEmptyCommit);
        this.preparePipeline(this.conf).consume(TestData.DATA_SET_INSERT).emptyEventBuffer().checkpoint(1L).assertNextEvent(4, "par1,par2,par3,par4").checkpointComplete(1L).checkpoint(2L).assertEmptyEvent().emptyCheckpoint(2L).end();
    }

    @Test
    public void testCheckpointFails() throws Exception {
        this.conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 1L);
        this.preparePipeline(this.conf).checkpoint(1L).assertEmptyEvent().checkpointFails(1L).consume(TestData.DATA_SET_INSERT).checkpointFails(2L).end();
    }

    @Test
    public void testSubtaskFails() throws Exception {
        this.conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 1L);
        this.preparePipeline().checkpoint(1L).assertEmptyEvent().subTaskFails(0).noCompleteInstant().consume(TestData.DATA_SET_INSERT).checkpoint(2L).assertNextEvent().checkpointComplete(2L).checkWrittenData(EXPECTED1).subTaskFails(0, 0).assertEmptyEvent().rollbackLastCompleteInstantToInflight().jobFailover().assertNextEvent().checkLastPendingInstantCompleted().end();
    }

    @Test
    public void testAppendInsertAfterFailoverWithEmptyCheckpoint() throws Exception {
        this.conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 10000L);
        this.conf.setString(FlinkOptions.OPERATION, "INSERT");
        this.preparePipeline().assertEmptyDataFiles().checkpoint(1L).assertEmptyEvent().subTaskFails(0, 1).assertNextEvent().assertNextSubTaskEvent().consume(TestData.DATA_SET_INSERT).checkpoint(2L).assertNextEvent().checkpointComplete(2L).checkWrittenData(EXPECTED1).end();
    }

    @Test
    public void testPartialFailover() throws Exception {
        this.conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 1L);
        this.conf.setString(FlinkOptions.OPERATION, "INSERT");
        this.preparePipeline().subTaskFails(0, 1).assertNoEvent().checkpoint(3L).assertNextEvent().restartCoordinator().subTaskFails(0, 2).checkpointThrows(4L, "Timeout(1000ms) while waiting for instant initialize").assertEmptyEvent().subTaskFails(0, 3).assertNoEvent().checkpoint(4L).assertNextEvent().subTaskFails(0, 4).assertEmptyEvent().end();
    }

    @Test
    public void testInsert() throws Exception {
        this.preparePipeline().consume(TestData.DATA_SET_INSERT).assertEmptyDataFiles().checkpoint(1L).assertNextEvent().checkpointComplete(1L).checkWrittenData(EXPECTED1).end();
    }

    @Test
    public void testInsertDuplicates() throws Exception {
        this.conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
        this.preparePipeline(this.conf).consume(TestData.DATA_SET_INSERT_DUPLICATES).assertEmptyDataFiles().checkpoint(1L).assertNextEvent().checkpointComplete(1L).checkWrittenData(EXPECTED3, 1).consume(TestData.DATA_SET_INSERT_DUPLICATES).checkpoint(2L).assertNextEvent().checkpointComplete(2L).checkWrittenData(EXPECTED3, 1).end();
    }

    @Test
    public void testUpsert() throws Exception {
        this.preparePipeline().consume(TestData.DATA_SET_INSERT).assertEmptyDataFiles().checkpoint(1L).assertNextEvent().checkpointComplete(1L).consume(TestData.DATA_SET_UPDATE_INSERT).checkWrittenData(EXPECTED1).checkpoint(2L).assertNextEvent().checkpointComplete(2L).checkWrittenData(EXPECTED2).end();
    }

    @Test
    public void testUpsertWithDelete() throws Exception {
        this.preparePipeline().consume(TestData.DATA_SET_INSERT).assertEmptyDataFiles().checkpoint(1L).assertNextEvent().checkpointComplete(1L).consume(TestData.DATA_SET_UPDATE_DELETE).checkWrittenData(EXPECTED1).checkpoint(2L).assertNextEvent().checkpointComplete(2L).checkWrittenData(this.getUpsertWithDeleteExpected()).end();
    }

    @Test
    public void testInsertWithMiniBatches() throws Exception {
        this.conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, this.getBatchSize());
        Map<String, String> expected = this.getMiniBatchExpected();
        this.preparePipeline(this.conf).consume(TestData.DATA_SET_INSERT_DUPLICATES).assertDataBuffer(1, 2).checkpoint(1L).allDataFlushed().handleEvents(2).checkpointComplete(1L).checkWrittenData(expected, 1).consume(TestData.DATA_SET_INSERT_DUPLICATES).checkpoint(2L).handleEvents(2).checkpointComplete(2L).checkWrittenData(expected, 1).end();
    }

    @Test
    public void testInsertWithDeduplication() throws Exception {
        this.conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, this.getBatchSize());
        this.conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
        HashMap<String, String> expected = new HashMap<String, String>();
        expected.put("par1", "[id1,par1,id1,Danny,23,4,par1]");
        this.preparePipeline(this.conf).consume(TestData.DATA_SET_INSERT_SAME_KEY).assertDataBuffer(1, 2).checkpoint(1L).allDataFlushed().handleEvents(2).checkpointComplete(1L).checkWrittenData(expected, 1).consume(TestData.DATA_SET_INSERT_SAME_KEY).checkpoint(2L).handleEvents(2).checkpointComplete(2L).checkWrittenData(expected, 1).end();
    }

    @Test
    public void testInsertAppendMode() throws Exception {
        this.conf.setString(FlinkOptions.OPERATION, "insert");
        this.preparePipeline().consume(TestData.DATA_SET_INSERT_SAME_KEY).checkpoint(1L).assertNextEvent().checkpointComplete(1L).checkWrittenAllData(EXPECTED4, 1).consume(TestData.DATA_SET_INSERT_SAME_KEY).checkpoint(2L).assertNextEvent().checkpointComplete(2L).checkWrittenDataCOW(EXPECTED5).end();
    }

    @Test
    public void testInsertClustering() throws Exception {
        this.conf.setString(FlinkOptions.OPERATION, "insert");
        this.conf.setBoolean(FlinkOptions.INSERT_CLUSTER, true);
        this.conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0 + this.getBatchSize());
        TestWriteBase.TestHarness.instance().preparePipeline(this.tempFile, this.conf).consume(TestData.DATA_SET_INSERT_SAME_KEY).assertDataBuffer(1, 2).checkpoint(1L).allDataFlushed().handleEvents(2).checkpointComplete(1L).checkWrittenData(EXPECTED4, 1).consume(TestData.DATA_SET_INSERT_SAME_KEY).checkpoint(2L).handleEvents(2).checkpointComplete(2L).checkWrittenDataCOW(EXPECTED5).end();
    }

    @Test
    public void testInsertAsyncClustering() throws Exception {
        this.conf.setString(FlinkOptions.OPERATION, "insert");
        this.conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
        this.conf.setBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true);
        this.conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1);
        this.preparePipeline().consume(TestData.DATA_SET_INSERT_SAME_KEY).checkpoint(1L).handleEvents(1).checkpointComplete(1L).checkWrittenData(EXPECTED4, 1).consume(TestData.DATA_SET_INSERT_SAME_KEY).checkpoint(2L).handleEvents(1).checkpointComplete(2L).checkWrittenDataCOW(EXPECTED5).end();
    }

    @Test
    public void testInsertWithSmallBufferSize() throws Exception {
        this.conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0 + this.getBatchSize());
        Map<String, String> expected = this.getMiniBatchExpected();
        this.preparePipeline(this.conf).consume(TestData.DATA_SET_INSERT_DUPLICATES).assertDataBuffer(1, 2).checkpoint(1L).allDataFlushed().handleEvents(2).checkpointComplete(1L).checkWrittenData(expected, 1).consume(TestData.DATA_SET_INSERT_DUPLICATES).checkpoint(2L).handleEvents(2).checkpointComplete(2L).checkWrittenData(expected, 1).end();
    }

    @Test
    public void testCommitOnEmptyBatch() throws Exception {
        this.conf.setBoolean(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), true);
        this.preparePipeline().consume(TestData.DATA_SET_INSERT).assertEmptyDataFiles().checkpoint(1L).assertNextEvent().checkpointComplete(1L).checkCompletedInstantCount(1).checkpoint(2L).assertNextEvent().checkpointComplete(2L).checkCompletedInstantCount(2).consume(TestData.DATA_SET_UPDATE_INSERT).checkWrittenData(EXPECTED1).checkpoint(3L).assertNextEvent().checkpointComplete(3L).checkCompletedInstantCount(3).checkWrittenData(EXPECTED2).checkpoint(4L).assertNextEvent().checkpointComplete(4L).checkCompletedInstantCount(4).checkWrittenData(EXPECTED2).end();
    }

    protected double getBatchSize() {
        return 0.0016;
    }

    protected Map<String, String> getMiniBatchExpected() {
        HashMap<String, String> expected = new HashMap<String, String>();
        expected.put("par1", "[id1,par1,id1,Danny,23,1,par1, id1,par1,id1,Danny,23,1,par1, id1,par1,id1,Danny,23,1,par1]");
        return expected;
    }

    protected Map<String, String> getUpsertWithDeleteExpected() {
        HashMap<String, String> expected = new HashMap<String, String>();
        expected.put("par1", "[id1,par1,id1,Danny,24,1,par1, id2,par1,id2,Stephen,34,2,par1]");
        expected.put("par2", "[id4,par2,id4,Fabian,31,4,par2]");
        expected.put("par3", "[id6,par3,id6,Emma,20,6,par3]");
        expected.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
        return expected;
    }

    protected Map<String, String> getExpectedBeforeCheckpointComplete() {
        return EXPECTED2;
    }

    @Test
    public void testIndexStateBootstrap() throws Exception {
        this.preparePipeline().consume(TestData.DATA_SET_INSERT).assertEmptyDataFiles().checkpoint(1L).assertNextEvent().checkpointComplete(1L).checkWrittenData(EXPECTED1, 4).end();
        this.conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
        this.validateIndexLoaded();
    }

    protected void validateIndexLoaded() throws Exception {
        this.preparePipeline(this.conf).consume(TestData.DATA_SET_UPDATE_INSERT).checkIndexLoaded(new HoodieKey("id1", "par1"), new HoodieKey("id2", "par1"), new HoodieKey("id3", "par2"), new HoodieKey("id4", "par2"), new HoodieKey("id5", "par3"), new HoodieKey("id6", "par3"), new HoodieKey("id7", "par4"), new HoodieKey("id8", "par4"), new HoodieKey("id9", "par3"), new HoodieKey("id10", "par4"), new HoodieKey("id11", "par4")).checkpoint(1L).assertBootstrapped().assertNextEvent().checkWrittenData(this.getExpectedBeforeCheckpointComplete()).checkpointComplete(1L).checkWrittenData(EXPECTED2).end();
    }

    @Test
    public void testWriteExactlyOnce() throws Exception {
        this.conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 1L);
        this.conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006);
        this.preparePipeline(this.conf).consume(TestData.DATA_SET_INSERT).emptyEventBuffer().checkpoint(1L).assertConfirming().handleEvents(4).checkpointComplete(1L).consume(TestData.DATA_SET_INSERT).assertNotConfirming().checkpoint(2L).assertConsumeThrows(TestData.DATA_SET_INSERT, "Timeout(1000ms) while waiting for instant initialize").end();
    }

    @ParameterizedTest
    @EnumSource(value=WriteConcurrencyMode.class, names={"OPTIMISTIC_CONCURRENCY_CONTROL", "NON_BLOCKING_CONCURRENCY_CONTROL"})
    public void testWriteMultiWriterInvolved(WriteConcurrencyMode writeConcurrencyMode) throws Exception {
        this.conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), writeConcurrencyMode.name());
        this.conf.setString(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
        this.conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
        if (OptionsResolver.isCowTable((Configuration)this.conf) && OptionsResolver.isNonBlockingConcurrencyControl((Configuration)this.conf)) {
            this.validateNonBlockingConcurrencyControlConditions();
        } else {
            TestWriteBase.TestHarness pipeline1 = this.preparePipeline(this.conf).consume(TestData.DATA_SET_INSERT_DUPLICATES).assertEmptyDataFiles();
            Configuration conf2 = this.conf.clone();
            conf2.setString(FlinkOptions.WRITE_CLIENT_ID, "2");
            TestWriteBase.TestHarness pipeline2 = this.preparePipeline(conf2).consume(TestData.DATA_SET_INSERT_DUPLICATES).assertEmptyDataFiles().checkpoint(1L).assertNextEvent().checkpointComplete(1L).checkWrittenData(EXPECTED3, 1);
            this.validateConcurrentCommit(pipeline1);
            pipeline1.end();
            pipeline2.end();
        }
    }

    protected void validateNonBlockingConcurrencyControlConditions() {
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.preparePipeline(this.conf), (String)"Non-blocking concurrency control requires the MOR table with simple bucket index");
    }

    private void validateConcurrentCommit(TestWriteBase.TestHarness pipeline) throws Exception {
        pipeline.checkpoint(1L).assertNextEvent();
        if (OptionsResolver.isNonBlockingConcurrencyControl((Configuration)this.conf)) {
            pipeline.checkpointComplete(1L).checkWrittenData(EXPECTED3, 1);
        } else {
            pipeline.checkpointCompleteThrows(1L, HoodieWriteConflictException.class, "Cannot resolve conflicts");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ParameterizedTest
    @EnumSource(value=WriteConcurrencyMode.class, names={"OPTIMISTIC_CONCURRENCY_CONTROL", "NON_BLOCKING_CONCURRENCY_CONTROL"})
    public void testWriteMultiWriterPartialOverlapping(WriteConcurrencyMode writeConcurrencyMode) throws Exception {
        this.conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), writeConcurrencyMode.name());
        this.conf.setString(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
        this.conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
        if (OptionsResolver.isCowTable((Configuration)this.conf) && OptionsResolver.isNonBlockingConcurrencyControl((Configuration)this.conf)) {
            this.validateNonBlockingConcurrencyControlConditions();
        } else {
            TestWriteBase.TestHarness pipeline1 = null;
            TestWriteBase.TestHarness pipeline2 = null;
            try {
                pipeline1 = this.preparePipeline(this.conf).consume(TestData.DATA_SET_INSERT_DUPLICATES).assertEmptyDataFiles();
                Configuration conf2 = this.conf.clone();
                conf2.setString(FlinkOptions.WRITE_CLIENT_ID, "2");
                pipeline2 = this.preparePipeline(conf2).consume(TestData.DATA_SET_INSERT_DUPLICATES).assertEmptyDataFiles();
                pipeline1.checkpoint(1L).assertNextEvent().checkpointComplete(1L).checkWrittenData(EXPECTED3, 1);
                this.validateConcurrentCommit(pipeline2);
            }
            finally {
                if (pipeline1 != null) {
                    pipeline1.end();
                }
                if (pipeline2 != null) {
                    pipeline2.end();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReuseEmbeddedServer() throws IOException {
        this.conf.setInteger("hoodie.filesystem.view.remote.timeout.secs", 500);
        this.conf.setString("hoodie.metadata.enable", "true");
        this.conf.setString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), "false");
        HoodieFlinkWriteClient writeClient = null;
        HoodieFlinkWriteClient writeClient2 = null;
        try {
            writeClient = FlinkWriteClients.createWriteClient((Configuration)this.conf);
            FileSystemViewStorageConfig viewStorageConfig = writeClient.getConfig().getViewStorageConfig();
            Assertions.assertSame((Object)viewStorageConfig.getStorageType(), (Object)FileSystemViewStorageType.REMOTE_FIRST);
            writeClient2 = FlinkWriteClients.createWriteClient((Configuration)this.conf);
            Assertions.assertSame((Object)writeClient2.getConfig().getViewStorageConfig().getStorageType(), (Object)FileSystemViewStorageType.REMOTE_FIRST);
            Assertions.assertEquals((Integer)viewStorageConfig.getRemoteViewServerPort(), (Integer)writeClient2.getConfig().getViewStorageConfig().getRemoteViewServerPort());
            Assertions.assertEquals((Integer)viewStorageConfig.getRemoteTimelineClientTimeoutSecs(), (int)500);
        }
        finally {
            if (writeClient != null) {
                writeClient.close();
            }
            if (writeClient2 != null) {
                writeClient2.close();
            }
        }
    }

    @Test
    public void testRollbackFailedWritesWithLazyCleanPolicy() throws Exception {
        this.conf.setString(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(), HoodieFailedWritesCleaningPolicy.LAZY.name());
        this.preparePipeline().consume(TestData.DATA_SET_INSERT).checkpoint(1L).assertNextEvent().checkpointComplete(1L).subTaskFails(0, 0).assertEmptyEvent().rollbackLastCompleteInstantToInflight().jobFailover().subTaskFails(0, 1).assertNextEvent().end();
    }
}

