/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.InProcessTimeGenerator;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.apache.parquet.io.InvalidRecordException;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.spark.api.java.function.Function;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;

public class TestUpdateSchemaEvolution
extends HoodieSparkClientTestHarness
implements Serializable {
    @BeforeEach
    public void setUp() throws Exception {
        this.initPath();
        HoodieTestUtils.init((StorageConfiguration)HoodieTestUtils.getDefaultStorageConf(), (String)this.basePath);
        this.initSparkContexts("TestUpdateSchemaEvolution");
        this.initHoodieStorage();
        this.initTimelineService();
    }

    @AfterEach
    public void tearDown() throws IOException {
        this.cleanupResources();
    }

    private WriteStatus prepareFirstRecordCommit(List<String> recordsStrs) throws IOException {
        HoodieWriteConfig config = this.makeHoodieClientConfig("/exampleSchema.avsc");
        HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context);
        List statuses = this.jsc.parallelize(Arrays.asList(1)).map((Function & Serializable)x -> {
            ArrayList<HoodieAvroRecord> insertRecords = new ArrayList<HoodieAvroRecord>();
            for (String recordStr : recordsStrs) {
                RawTripTestPayload rowChange = new RawTripTestPayload(recordStr);
                insertRecords.add(new HoodieAvroRecord(new HoodieKey(rowChange.getRowKey(), rowChange.getPartitionPath()), (HoodieRecordPayload)rowChange));
            }
            Map insertRecordMap = insertRecords.stream().collect(Collectors.toMap(r -> r.getRecordKey(), java.util.function.Function.identity()));
            HoodieWriteHandle createHandle = new CreateHandleFactory(false).create(config, "100", (HoodieTable)table, ((HoodieRecord)insertRecords.get(0)).getPartitionPath(), "f1-0", (TaskContextSupplier)this.supplier);
            for (HoodieRecord record : insertRecordMap.values()) {
                createHandle.write(record, createHandle.getWriterSchemaWithMetaFields(), createHandle.getConfig().getProps());
            }
            return (WriteStatus)createHandle.close().get(0);
        }).collect();
        Path commitFile = new Path(config.getBasePath() + "/.hoodie/timeline/" + HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR.makeCommitFileName("100_" + InProcessTimeGenerator.createNewInstantTime()));
        HadoopFSUtils.getFs((String)this.basePath, (StorageConfiguration)HoodieTestUtils.getDefaultStorageConf()).create(commitFile);
        return (WriteStatus)statuses.get(0);
    }

    private List<String> generateMultipleRecordsForExampleSchema() {
        ArrayList<String> recordsStrs = new ArrayList<String>();
        String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
        String recordStr2 = "{\"_row_key\":\"8eb5b87b-1feu-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
        String recordStr3 = "{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
        recordsStrs.add(recordStr1);
        recordsStrs.add(recordStr2);
        recordsStrs.add(recordStr3);
        return recordsStrs;
    }

    private List<String> generateOneRecordForExampleSchema() {
        ArrayList<String> recordsStrs = new ArrayList<String>();
        String recordStr = "{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
        recordsStrs.add(recordStr);
        return recordsStrs;
    }

    private void assertSchemaEvolutionOnUpdateResult(WriteStatus insertResult, HoodieSparkTable updateTable, List<HoodieRecord> updateRecords, String assertMsg, boolean isAssertThrow, Class expectedExceptionType) {
        this.jsc.parallelize(Arrays.asList(1)).map((Function & Serializable)x -> {
            Executable executable = () -> {
                HoodieMergeHandle mergeHandle = new HoodieMergeHandle(updateTable.getConfig(), "101", (HoodieTable)updateTable, updateRecords.iterator(), ((HoodieRecord)updateRecords.get(0)).getPartitionPath(), insertResult.getFileId(), (TaskContextSupplier)this.supplier, Option.empty());
                List oldRecords = HoodieIOFactory.getIOFactory((HoodieStorage)updateTable.getStorage()).getFileFormatUtils(updateTable.getBaseFileFormat()).readAvroRecords(updateTable.getStorage(), new StoragePath(updateTable.getConfig().getBasePath() + "/" + insertResult.getStat().getPath()), mergeHandle.getWriterSchemaWithMetaFields());
                for (GenericRecord rec : oldRecords) {
                    mergeHandle.write((HoodieRecord)new HoodieAvroIndexedRecord((IndexedRecord)rec));
                }
                mergeHandle.close();
            };
            if (isAssertThrow) {
                Assertions.assertThrows((Class)expectedExceptionType, (Executable)executable, (String)assertMsg);
            } else {
                Assertions.assertDoesNotThrow((Executable)executable, (String)assertMsg);
            }
            return 1;
        }).collect();
    }

    private List<HoodieRecord> buildUpdateRecords(String recordStr, String insertFileId) throws IOException {
        ArrayList<HoodieRecord> updateRecords = new ArrayList<HoodieRecord>();
        RawTripTestPayload rowChange = new RawTripTestPayload(recordStr);
        HoodieAvroRecord record = new HoodieAvroRecord(new HoodieKey(rowChange.getRowKey(), rowChange.getPartitionPath()), (HoodieRecordPayload)rowChange);
        record.setCurrentLocation(new HoodieRecordLocation("101", insertFileId));
        record.seal();
        updateRecords.add((HoodieRecord)record);
        return updateRecords;
    }

    @Test
    public void testSchemaEvolutionOnUpdateSuccessWithAddColumnHaveDefault() throws Exception {
        WriteStatus insertResult = this.prepareFirstRecordCommit(this.generateMultipleRecordsForExampleSchema());
        HoodieWriteConfig config = this.makeHoodieClientConfig("/exampleEvolvedSchema.avsc");
        HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context);
        String recordStr = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12,\"added_field\":1}";
        List<HoodieRecord> updateRecords = this.buildUpdateRecords(recordStr, insertResult.getFileId());
        String assertMsg = "UpdateFunction could not read records written with exampleSchema.avsc using the exampleEvolvedSchema.avsc";
        this.assertSchemaEvolutionOnUpdateResult(insertResult, table, updateRecords, assertMsg, false, null);
    }

    @Test
    public void testSchemaEvolutionOnUpdateSuccessWithChangeColumnOrder() throws Exception {
        WriteStatus insertResult = this.prepareFirstRecordCommit(this.generateMultipleRecordsForExampleSchema());
        HoodieWriteConfig config = this.makeHoodieClientConfig("/exampleEvolvedSchemaChangeOrder.avsc");
        HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context);
        String recordStr = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"added_field\":1,\"number\":12}";
        List<HoodieRecord> updateRecords = this.buildUpdateRecords(recordStr, insertResult.getFileId());
        String assertMsg = "UpdateFunction could not read records written with exampleSchema.avsc using the exampleEvolvedSchemaChangeOrder.avsc as column order change";
        this.assertSchemaEvolutionOnUpdateResult(insertResult, table, updateRecords, assertMsg, false, null);
    }

    @Test
    public void testSchemaEvolutionOnUpdateMisMatchWithDeleteColumn() throws Exception {
        WriteStatus insertResult = this.prepareFirstRecordCommit(this.generateOneRecordForExampleSchema());
        HoodieWriteConfig config = this.makeHoodieClientConfig("/exampleEvolvedSchemaDeleteColumn.avsc");
        HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context);
        String recordStr = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\"}";
        List<HoodieRecord> updateRecords = this.buildUpdateRecords(recordStr, insertResult.getFileId());
        String assertMsg = "UpdateFunction when delete column, Parquet/Avro schema mismatch: Avro field 'xxx' not found";
        this.assertSchemaEvolutionOnUpdateResult(insertResult, table, updateRecords, assertMsg, true, InvalidRecordException.class);
    }

    @Test
    public void testSchemaEvolutionOnUpdateMisMatchWithAddColumnNotHaveDefault() throws Exception {
        WriteStatus insertResult = this.prepareFirstRecordCommit(this.generateOneRecordForExampleSchema());
        HoodieWriteConfig config = this.makeHoodieClientConfig("/exampleEvolvedSchemaColumnRequire.avsc");
        HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context);
        String recordStr = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12,\"added_field\":1}";
        List<HoodieRecord> updateRecords = this.buildUpdateRecords(recordStr, insertResult.getFileId());
        String assertMsg = "UpdateFunction could not read records written with exampleSchema.avsc using the exampleEvolvedSchemaColumnRequire.avsc, because old records do not have required column added_field";
        this.assertSchemaEvolutionOnUpdateResult(insertResult, table, updateRecords, assertMsg, true, HoodieUpsertException.class);
    }

    @Test
    public void testSchemaEvolutionOnUpdateMisMatchWithChangeColumnType() throws Exception {
        WriteStatus insertResult = this.prepareFirstRecordCommit(this.generateOneRecordForExampleSchema());
        HoodieWriteConfig config = this.makeHoodieClientConfig("/exampleEvolvedSchemaColumnType.avsc");
        HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)config, (HoodieEngineContext)this.context);
        String recordStr = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":\"12\"}";
        List<HoodieRecord> updateRecords = this.buildUpdateRecords(recordStr, insertResult.getFileId());
        String assertMsg = "UpdateFunction when change column type, org.apache.parquet.avro.AvroConverters$FieldUTF8Converter";
        this.assertSchemaEvolutionOnUpdateResult(insertResult, table, updateRecords, assertMsg, true, ParquetDecodingException.class);
    }

    private HoodieWriteConfig makeHoodieClientConfig(String name) {
        Schema schema = SchemaTestUtil.getSchemaFromResource(this.getClass(), (String)name);
        return HoodieWriteConfig.newBuilder().withPath(this.basePath).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(Integer.valueOf(timelineServicePort)).build()).withSchema(schema.toString()).build();
    }
}

