/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.io;

import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.AWSDmsAvroPayload;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.PartialUpdateAvroPayload;
import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.HoodieMergedReadHandle;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class TestHoodieMergedReadHandle
extends SparkClientFunctionalTestHarness {
    private static Stream<Arguments> avroPayloadClasses() {
        return Stream.of(Arguments.of((Object[])new Object[]{HoodieTableType.COPY_ON_WRITE, OverwriteWithLatestAvroPayload.class}), Arguments.of((Object[])new Object[]{HoodieTableType.COPY_ON_WRITE, OverwriteNonDefaultsWithLatestAvroPayload.class}), Arguments.of((Object[])new Object[]{HoodieTableType.COPY_ON_WRITE, PartialUpdateAvroPayload.class}), Arguments.of((Object[])new Object[]{HoodieTableType.COPY_ON_WRITE, DefaultHoodieRecordPayload.class}), Arguments.of((Object[])new Object[]{HoodieTableType.COPY_ON_WRITE, AWSDmsAvroPayload.class}), Arguments.of((Object[])new Object[]{HoodieTableType.COPY_ON_WRITE, MySqlDebeziumAvroPayload.class}), Arguments.of((Object[])new Object[]{HoodieTableType.COPY_ON_WRITE, PostgresDebeziumAvroPayload.class}), Arguments.of((Object[])new Object[]{HoodieTableType.MERGE_ON_READ, OverwriteWithLatestAvroPayload.class}), Arguments.of((Object[])new Object[]{HoodieTableType.MERGE_ON_READ, OverwriteNonDefaultsWithLatestAvroPayload.class}), Arguments.of((Object[])new Object[]{HoodieTableType.MERGE_ON_READ, PartialUpdateAvroPayload.class}), Arguments.of((Object[])new Object[]{HoodieTableType.MERGE_ON_READ, DefaultHoodieRecordPayload.class}), Arguments.of((Object[])new Object[]{HoodieTableType.MERGE_ON_READ, AWSDmsAvroPayload.class}), Arguments.of((Object[])new Object[]{HoodieTableType.MERGE_ON_READ, MySqlDebeziumAvroPayload.class}), Arguments.of((Object[])new Object[]{HoodieTableType.MERGE_ON_READ, PostgresDebeziumAvroPayload.class}));
    }

    @ParameterizedTest
    @MethodSource(value={"avroPayloadClasses"})
    public void testReadLatestRecordsWithDeletes(HoodieTableType tableType, Class<?> payloadClass) throws IOException {
        HoodieWriteConfig writeConfig = this.getWriteConfig(payloadClass);
        HoodieTableMetaClient metaClient = this.getHoodieMetaClient(tableType, (Properties)writeConfig.getProps());
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(writeConfig);){
            int totalRecords = 4;
            String partition = "foo";
            String commitTimeAtEpoch0 = HoodieTestDataGenerator.getCommitTimeAtUTC((long)0L);
            List insertsAtEpoch0 = HoodieAdaptablePayloadDataGenerator.getInserts((int)4, (String)"foo", (long)0L, payloadClass);
            client.startCommitWithTime(commitTimeAtEpoch0);
            Assertions.assertNoWriteErrors((List)client.upsert(this.jsc().parallelize(insertsAtEpoch0, 1), commitTimeAtEpoch0).collect());
            this.doMergedReadAndValidate(metaClient, writeConfig, 4, "foo", 0L, payloadClass);
            String commitTimeAtEpoch5 = HoodieTestDataGenerator.getCommitTimeAtUTC((long)5L);
            List updatesAtEpoch5 = HoodieAdaptablePayloadDataGenerator.getUpdates((List)insertsAtEpoch0, (long)5L, payloadClass);
            client.startCommitWithTime(commitTimeAtEpoch5);
            Assertions.assertNoWriteErrors((List)client.upsert(this.jsc().parallelize(updatesAtEpoch5, 1), commitTimeAtEpoch5).collect());
            this.doMergedReadAndValidate(metaClient, writeConfig, 4, "foo", 5L, payloadClass);
            String commitTimeAtEpoch6 = HoodieTestDataGenerator.getCommitTimeAtUTC((long)6L);
            client.startCommitWithTime(commitTimeAtEpoch6);
            List deletesAtEpoch6 = HoodieAdaptablePayloadDataGenerator.getDeletes(updatesAtEpoch5.subList(3, 4), (long)6L, payloadClass);
            Assertions.assertNoWriteErrors((List)client.upsert(this.jsc().parallelize(deletesAtEpoch6, 1), commitTimeAtEpoch6).collect());
            this.doMergedReadAndValidate(metaClient, writeConfig, 3, "foo", 5L, payloadClass);
            String commitTimeAtEpoch7 = HoodieTestDataGenerator.getCommitTimeAtUTC((long)7L);
            client.startCommitWithTime(commitTimeAtEpoch7);
            List deletesAtEpoch7 = HoodieAdaptablePayloadDataGenerator.getDeletesWithEmptyPayload(updatesAtEpoch5.subList(2, 3));
            Assertions.assertNoWriteErrors((List)client.upsert(this.jsc().parallelize(deletesAtEpoch7, 1), commitTimeAtEpoch7).collect());
            this.doMergedReadAndValidate(metaClient, writeConfig, 2, "foo", 5L, payloadClass);
            String commitTimeAtEpoch9 = HoodieTestDataGenerator.getCommitTimeAtUTC((long)9L);
            List updatesAtEpoch9 = HoodieAdaptablePayloadDataGenerator.getUpdates((List)updatesAtEpoch5, (long)9L, payloadClass);
            client.startCommitWithTime(commitTimeAtEpoch9);
            Assertions.assertNoWriteErrors((List)client.upsert(this.jsc().parallelize(updatesAtEpoch9, 1), commitTimeAtEpoch9).collect());
            this.doMergedReadAndValidate(metaClient, writeConfig, 4, "foo", 9L, payloadClass);
        }
    }

    private static Stream<Arguments> avroPayloadClassesThatHonorOrdering() {
        return Stream.of(Arguments.of((Object[])new Object[]{HoodieTableType.COPY_ON_WRITE, PartialUpdateAvroPayload.class}), Arguments.of((Object[])new Object[]{HoodieTableType.COPY_ON_WRITE, DefaultHoodieRecordPayload.class}), Arguments.of((Object[])new Object[]{HoodieTableType.COPY_ON_WRITE, MySqlDebeziumAvroPayload.class}), Arguments.of((Object[])new Object[]{HoodieTableType.COPY_ON_WRITE, PostgresDebeziumAvroPayload.class}), Arguments.of((Object[])new Object[]{HoodieTableType.MERGE_ON_READ, PartialUpdateAvroPayload.class}), Arguments.of((Object[])new Object[]{HoodieTableType.MERGE_ON_READ, DefaultHoodieRecordPayload.class}), Arguments.of((Object[])new Object[]{HoodieTableType.MERGE_ON_READ, MySqlDebeziumAvroPayload.class}), Arguments.of((Object[])new Object[]{HoodieTableType.MERGE_ON_READ, PostgresDebeziumAvroPayload.class}));
    }

    @ParameterizedTest
    @MethodSource(value={"avroPayloadClassesThatHonorOrdering"})
    public void testReadLatestRecordsWithLateArrivedRecords(HoodieTableType tableType, Class<?> payloadClass) throws IOException {
        HoodieWriteConfig writeConfig = this.getWriteConfig(payloadClass);
        HoodieTableMetaClient metaClient = this.getHoodieMetaClient(tableType, (Properties)writeConfig.getProps());
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(writeConfig);){
            int totalRecords = 4;
            String partition = "foo";
            String commitTimeAtEpoch0 = HoodieTestDataGenerator.getCommitTimeAtUTC((long)0L);
            List insertsAtEpoch0 = HoodieAdaptablePayloadDataGenerator.getInserts((int)4, (String)"foo", (long)0L, payloadClass);
            client.startCommitWithTime(commitTimeAtEpoch0);
            Assertions.assertNoWriteErrors((List)client.upsert(this.jsc().parallelize(insertsAtEpoch0, 1), commitTimeAtEpoch0).collect());
            this.doMergedReadAndValidate(metaClient, writeConfig, 4, "foo", 0L, payloadClass);
            String commitTimeAtEpoch5 = HoodieTestDataGenerator.getCommitTimeAtUTC((long)5L);
            List updatesAtEpoch5 = HoodieAdaptablePayloadDataGenerator.getUpdates((List)insertsAtEpoch0, (long)5L, payloadClass);
            client.startCommitWithTime(commitTimeAtEpoch5);
            Assertions.assertNoWriteErrors((List)client.upsert(this.jsc().parallelize(updatesAtEpoch5, 1), commitTimeAtEpoch5).collect());
            this.doMergedReadAndValidate(metaClient, writeConfig, 4, "foo", 5L, payloadClass);
            String commitTimeAtEpoch6 = HoodieTestDataGenerator.getCommitTimeAtUTC((long)6L);
            List updatesAtEpoch1 = HoodieAdaptablePayloadDataGenerator.getUpdates((List)insertsAtEpoch0, (long)1L, payloadClass);
            client.startCommitWithTime(commitTimeAtEpoch6);
            Assertions.assertNoWriteErrors((List)client.upsert(this.jsc().parallelize(updatesAtEpoch1, 1), commitTimeAtEpoch6).collect());
            this.doMergedReadAndValidate(metaClient, writeConfig, 4, "foo", 5L, payloadClass);
            String commitTimeAtEpoch9 = HoodieTestDataGenerator.getCommitTimeAtUTC((long)9L);
            List updatesAtEpoch9 = HoodieAdaptablePayloadDataGenerator.getUpdates((List)updatesAtEpoch5, (long)9L, payloadClass);
            client.startCommitWithTime(commitTimeAtEpoch9);
            Assertions.assertNoWriteErrors((List)client.upsert(this.jsc().parallelize(updatesAtEpoch9, 1), commitTimeAtEpoch9).collect());
            this.doMergedReadAndValidate(metaClient, writeConfig, 4, "foo", 9L, payloadClass);
        }
    }

    private void doMergedReadAndValidate(HoodieTableMetaClient metaClient, HoodieWriteConfig writeConfig, int totalRecords, String partition, long timestamp, Class<?> payloadClass) throws IOException {
        String orderingField = new HoodieAdaptablePayloadDataGenerator.RecordGen(payloadClass).getOrderingField();
        metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)metaClient);
        HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)writeConfig, (HoodieEngineContext)this.context(), (HoodieTableMetaClient)metaClient);
        List partitionPathAndFileIDPairs = table.getHoodieView().getLatestBaseFiles(partition).map(baseFile -> Pair.of((Object)partition, (Object)baseFile.getFileId())).collect(Collectors.toList());
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)partitionPathAndFileIDPairs.size());
        String latestCommitTime = ((HoodieInstant)table.getActiveTimeline().lastInstant().get()).requestedTime();
        HoodieMergedReadHandle mergedReadHandle = new HoodieMergedReadHandle(writeConfig, Option.of((Object)latestCommitTime), (HoodieTable)table, (Pair)partitionPathAndFileIDPairs.get(0));
        List mergedRecords = mergedReadHandle.getMergedRecords();
        org.junit.jupiter.api.Assertions.assertEquals((int)totalRecords, (int)mergedRecords.size());
        List sortedMergedRecords = mergedRecords.stream().sorted(Comparator.comparing(HoodieRecord::getRecordKey)).collect(Collectors.toList());
        for (int i = 0; i < sortedMergedRecords.size(); ++i) {
            HoodieRecord r = (HoodieRecord)sortedMergedRecords.get(i);
            org.junit.jupiter.api.Assertions.assertEquals((int)i, (int)Integer.parseInt(r.getRecordKey()));
            org.junit.jupiter.api.Assertions.assertEquals((Object)partition, (Object)r.getPartitionPath());
            org.junit.jupiter.api.Assertions.assertEquals((Object)payloadClass.getName(), (Object)r.getData().getClass().getName());
            Option valueOpt = ((HoodieRecordPayload)r.getData()).getInsertValue(HoodieAdaptablePayloadDataGenerator.SCHEMA_WITH_METAFIELDS);
            org.junit.jupiter.api.Assertions.assertTrue((boolean)valueOpt.isPresent());
            GenericRecord avroValue = (GenericRecord)valueOpt.get();
            org.junit.jupiter.api.Assertions.assertEquals((int)i, (int)Integer.parseInt(avroValue.get("id").toString()));
            org.junit.jupiter.api.Assertions.assertEquals((Object)partition, (Object)avroValue.get("pt").toString());
            org.junit.jupiter.api.Assertions.assertEquals((long)timestamp, (long)Long.parseLong(avroValue.get(orderingField).toString()));
        }
    }

    private HoodieWriteConfig getWriteConfig(Class<?> payloadClass) {
        return this.getConfigBuilder(true).withProperties(HoodieAdaptablePayloadDataGenerator.getKeyGenProps(payloadClass)).withParallelism(2, 2).withBulkInsertParallelism(2).withDeleteParallelism(1).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).withSchema(HoodieAdaptablePayloadDataGenerator.SCHEMA_STR).withPayloadConfig(HoodiePayloadConfig.newBuilder().fromProperties(HoodieAdaptablePayloadDataGenerator.getPayloadProps(payloadClass)).build()).withRecordMergeMode(RecordMergeMode.CUSTOM).build();
    }
}

