package org.apache.hudi.table.upgrade;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

/* loaded from: input_file:org/apache/hudi/table/upgrade/TestUpgradeDowngrade.class */
public class TestUpgradeDowngrade extends HoodieClientTestBase {
    private static final String TEST_NAME_WITH_PARAMS = "[{index}] Test with deletePartialMarkerFiles={0} and TableType = {1}";

    public static Stream<Arguments> configParams() {
        return Stream.of(new Object[]{true, HoodieTableType.COPY_ON_WRITE}, new Object[]{false, HoodieTableType.COPY_ON_WRITE}, new Object[]{true, HoodieTableType.MERGE_ON_READ}, new Object[]{false, HoodieTableType.MERGE_ON_READ}).map(Arguments::of);
    }

    @Override // org.apache.hudi.testutils.HoodieClientTestBase
    @BeforeEach
    public void setUp() throws Exception {
        initSparkContexts();
        initDFS();
        initTestDataGenerator();
        initDFSMetaClient();
    }

    @AfterEach
    public void cleanUp() throws Exception {
        cleanupResources();
    }

    @Test
    public void testLeftOverUpdatedPropFileCleanup() throws IOException {
        testUpgradeInternal(true, true, HoodieTableType.MERGE_ON_READ);
    }

    @MethodSource({"configParams"})
    @ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
    public void testUpgrade(boolean z, HoodieTableType hoodieTableType) throws IOException {
        testUpgradeInternal(false, z, hoodieTableType);
    }

    public void testUpgradeInternal(boolean z, boolean z2, HoodieTableType hoodieTableType) throws IOException {
        HashMap hashMap = new HashMap();
        if (hoodieTableType == HoodieTableType.MERGE_ON_READ) {
            hashMap.put("hoodie.table.type", HoodieTableType.MERGE_ON_READ.name());
            this.metaClient = HoodieTestUtils.init(this.dfs.getConf(), this.dfsBasePath, HoodieTableType.MERGE_ON_READ);
        }
        HoodieWriteConfig build = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(false).withProps(hashMap).build();
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        Pair<List<HoodieRecord>, List<HoodieRecord>> twoUpsertCommitDataWithTwoPartitions = twoUpsertCommitDataWithTwoPartitions(arrayList, arrayList2, build, hoodieWriteClient, false);
        HoodieSparkTable hoodieTable = getHoodieTable(this.metaClient, build);
        HoodieInstant hoodieInstant = (HoodieInstant) hoodieTable.getPendingCommitTimeline().lastInstant().get();
        List allMarkerFilePaths = new MarkerFiles(hoodieTable, hoodieInstant.getTimestamp()).allMarkerFilePaths();
        if (z2) {
            String str = (String) allMarkerFilePaths.get(0);
            hoodieTable.getMetaClient().getFs().delete(new Path(hoodieTable.getMetaClient().getTempFolderPath() + "/" + hoodieInstant.getTimestamp() + "/" + str));
            allMarkerFilePaths.remove(str);
        }
        this.metaClient.getTableConfig().setTableVersion(HoodieTableVersion.ZERO);
        if (z) {
            createResidualFile();
        }
        new SparkUpgradeDowngrade(this.metaClient, build, this.context).run(this.metaClient, HoodieTableVersion.ONE, build, this.context, (String) null);
        assertMarkerFilesForUpgrade(hoodieTable, hoodieInstant, arrayList, arrayList2);
        Assertions.assertEquals(this.metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.ONE.versionCode());
        assertTableVersionFromPropertyFile(HoodieTableVersion.ONE);
        assertRows((List) twoUpsertCommitDataWithTwoPartitions.getKey(), triggerCommit("003", hoodieTableType, true));
        if (z) {
            Assertions.assertFalse(this.dfs.exists(new Path(this.metaClient.getMetaPath(), "hoodie.properties.updated")));
        }
    }

    @MethodSource({"configParams"})
    @ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
    public void testDowngrade(boolean z, HoodieTableType hoodieTableType) throws IOException {
        HashMap hashMap = new HashMap();
        if (hoodieTableType == HoodieTableType.MERGE_ON_READ) {
            hashMap.put("hoodie.table.type", HoodieTableType.MERGE_ON_READ.name());
            this.metaClient = HoodieTestUtils.init(this.hadoopConf, this.basePath, HoodieTableType.MERGE_ON_READ);
        }
        HoodieWriteConfig build = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(false).withProps(hashMap).build();
        Pair<List<HoodieRecord>, List<HoodieRecord>> twoUpsertCommitDataWithTwoPartitions = twoUpsertCommitDataWithTwoPartitions(new ArrayList(), new ArrayList(), build, getHoodieWriteClient(build), false);
        HoodieSparkTable hoodieTable = getHoodieTable(this.metaClient, build);
        HoodieInstant hoodieInstant = (HoodieInstant) hoodieTable.getPendingCommitTimeline().lastInstant().get();
        List allMarkerFilePaths = new MarkerFiles(hoodieTable, hoodieInstant.getTimestamp()).allMarkerFilePaths();
        if (z) {
            String str = (String) allMarkerFilePaths.get(0);
            hoodieTable.getMetaClient().getFs().delete(new Path(hoodieTable.getMetaClient().getTempFolderPath() + "/" + hoodieInstant.getTimestamp() + "/" + str));
            allMarkerFilePaths.remove(str);
        }
        prepForDowngrade();
        new SparkUpgradeDowngrade(this.metaClient, build, this.context).run(this.metaClient, HoodieTableVersion.ZERO, build, this.context, (String) null);
        assertMarkerFilesForDowngrade(hoodieTable, hoodieInstant);
        Assertions.assertEquals(this.metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.ZERO.versionCode());
        assertTableVersionFromPropertyFile(HoodieTableVersion.ZERO);
        assertRows((List) twoUpsertCommitDataWithTwoPartitions.getKey(), triggerCommit("003", hoodieTableType, false));
    }

    private void assertMarkerFilesForDowngrade(HoodieTable hoodieTable, HoodieInstant hoodieInstant) throws IOException {
        Assertions.assertFalse(new MarkerFiles(hoodieTable, hoodieInstant.getTimestamp()).doesMarkerDirExist());
    }

    private void assertMarkerFilesForUpgrade(HoodieTable hoodieTable, HoodieInstant hoodieInstant, List<FileSlice> list, List<FileSlice> list2) throws IOException {
        MarkerFiles markerFiles = new MarkerFiles(hoodieTable, hoodieInstant.getTimestamp());
        Assertions.assertTrue(markerFiles.doesMarkerDirExist());
        List allMarkerFilePaths = markerFiles.allMarkerFilePaths();
        Assertions.assertEquals(2, allMarkerFilePaths.size());
        ArrayList<String> arrayList = new ArrayList();
        Iterator it = allMarkerFilePaths.iterator();
        while (it.hasNext()) {
            arrayList.add(MarkerFiles.stripMarkerSuffix((String) it.next()));
        }
        ArrayList<FileSlice> arrayList2 = new ArrayList();
        arrayList2.addAll(list);
        arrayList2.addAll(list2);
        ArrayList<String> arrayList3 = new ArrayList();
        ArrayList arrayList4 = new ArrayList();
        for (FileSlice fileSlice : arrayList2) {
            String partitionPath = fileSlice.getPartitionPath();
            if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
                for (HoodieLogFile hoodieLogFile : (List) fileSlice.getLogFiles().collect(Collectors.toList())) {
                    arrayList4.add(Pair.of(partitionPath + "/" + hoodieLogFile.getFileId(), hoodieLogFile.getBaseCommitTime()));
                }
            }
            if (fileSlice.getBaseInstantTime().equals(hoodieInstant.getTimestamp())) {
                String path = ((HoodieBaseFile) fileSlice.getBaseFile().get()).getPath();
                arrayList3.add(path.substring(path.indexOf(partitionPath)));
            }
        }
        ArrayList<String> arrayList5 = new ArrayList();
        for (String str : arrayList) {
            if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
                arrayList5.add(str.substring(0, str.lastIndexOf(46)));
            } else {
                arrayList5.add(str);
            }
        }
        for (String str2 : arrayList3) {
            if (arrayList5.contains(str2)) {
                arrayList5.remove(str2);
            }
        }
        if (arrayList4.size() <= 0) {
            Assertions.assertTrue(arrayList5.size() == 0);
            return;
        }
        ArrayList arrayList6 = new ArrayList();
        for (String str3 : arrayList5) {
            arrayList6.add(Pair.of(str3.substring(0, str3.indexOf(95)), str3.substring(str3.lastIndexOf(95) + 1)));
        }
        Assertions.assertEquals(arrayList4.size(), arrayList6.size());
        Iterator it2 = arrayList4.iterator();
        while (it2.hasNext()) {
            Assertions.assertTrue(arrayList6.contains((Pair) it2.next()));
        }
    }

    private List<HoodieRecord> triggerCommit(String str, HoodieTableType hoodieTableType, boolean z) {
        HashMap hashMap = new HashMap();
        if (hoodieTableType == HoodieTableType.MERGE_ON_READ) {
            hashMap.put("hoodie.table.type", HoodieTableType.MERGE_ON_READ.name());
        }
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(z).withProps(hashMap).build());
        hoodieWriteClient.startCommitWithTime(str);
        List<HoodieRecord> generateInsertsContainsAllPartitions = this.dataGen.generateInsertsContainsAllPartitions(str, 2);
        JavaRDD upsert = hoodieWriteClient.upsert(this.jsc.parallelize(generateInsertsContainsAllPartitions, 1), str);
        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(upsert.collect());
        hoodieWriteClient.commit(str, upsert);
        return generateInsertsContainsAllPartitions;
    }

    private void assertRows(List<HoodieRecord> list, List<HoodieRecord> list2) {
        String[] strArr = new String[this.dataGen.getPartitionPaths().length];
        for (int i = 0; i < strArr.length; i++) {
            strArr[i] = String.format("%s/%s/*", this.basePath, this.dataGen.getPartitionPaths()[i]);
        }
        Dataset<Row> read = HoodieClientTestUtils.read(this.jsc, this.metaClient.getBasePath(), this.sqlContext, this.metaClient.getFs(), strArr);
        ArrayList arrayList = new ArrayList();
        Iterator<HoodieRecord> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getRecordKey());
        }
        Iterator<HoodieRecord> it2 = list2.iterator();
        while (it2.hasNext()) {
            arrayList.add(it2.next().getRecordKey());
        }
        List collectAsList = read.collectAsList();
        Assertions.assertEquals(arrayList.size(), read.count());
        ArrayList arrayList2 = new ArrayList();
        Iterator it3 = collectAsList.iterator();
        while (it3.hasNext()) {
            arrayList2.add(((Row) it3.next()).getAs("_row_key"));
        }
        Iterator it4 = arrayList.iterator();
        while (it4.hasNext()) {
            Assertions.assertTrue(arrayList2.contains((String) it4.next()));
        }
    }

    private Pair<List<HoodieRecord>, List<HoodieRecord>> twoUpsertCommitDataWithTwoPartitions(List<FileSlice> list, List<FileSlice> list2, HoodieWriteConfig hoodieWriteConfig, SparkRDDWriteClient sparkRDDWriteClient, boolean z) throws IOException {
        this.dataGen = new HoodieTestDataGenerator(new String[]{"2016/03/15", "2015/03/16"});
        HoodieTestDataGenerator.writePartitionMetadata(this.dfs, new String[]{"2016/03/15", "2015/03/16"}, this.dfsBasePath);
        sparkRDDWriteClient.startCommitWithTime("001");
        List generateInsertsContainsAllPartitions = this.dataGen.generateInsertsContainsAllPartitions("001", 2);
        JavaRDD upsert = sparkRDDWriteClient.upsert(this.jsc.parallelize(generateInsertsContainsAllPartitions, 1), "001");
        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(upsert.collect());
        sparkRDDWriteClient.commit("001", upsert);
        sparkRDDWriteClient.startCommitWithTime("002");
        List generateUpdates = this.dataGen.generateUpdates("002", generateInsertsContainsAllPartitions);
        JavaRDD upsert2 = sparkRDDWriteClient.upsert(this.jsc.parallelize(generateUpdates, 1), "002");
        org.apache.hudi.testutils.Assertions.assertNoWriteErrors(upsert2.collect());
        if (z) {
            sparkRDDWriteClient.commit("002", upsert2);
        }
        SyncableFileSystemView fileSystemViewWithUnCommittedSlices = getFileSystemViewWithUnCommittedSlices(getHoodieTable(this.metaClient, hoodieWriteConfig).getMetaClient());
        List list3 = (List) fileSystemViewWithUnCommittedSlices.getAllFileGroups("2016/03/15").collect(Collectors.toList());
        Assertions.assertEquals(1, list3.size());
        list.addAll((Collection) ((HoodieFileGroup) list3.get(0)).getAllFileSlices().collect(Collectors.toList()));
        List list4 = (List) fileSystemViewWithUnCommittedSlices.getAllFileGroups("2015/03/16").collect(Collectors.toList());
        Assertions.assertEquals(1, list4.size());
        list2.addAll((Collection) ((HoodieFileGroup) list4.get(0)).getAllFileSlices().collect(Collectors.toList()));
        if (this.metaClient.getTableType().equals(HoodieTableType.COPY_ON_WRITE)) {
            Assertions.assertEquals(2, list.size());
            Assertions.assertEquals(2, list2.size());
        } else {
            Assertions.assertEquals(1, list.size());
            Assertions.assertEquals(1, list2.size());
        }
        return Pair.of(generateInsertsContainsAllPartitions, generateUpdates);
    }

    private void prepForDowngrade() throws IOException {
        this.metaClient.getTableConfig().setTableVersion(HoodieTableVersion.ONE);
        OutputStream create = this.metaClient.getFs().create(new Path(this.metaClient.getMetaPath() + "/hoodie.properties"));
        Throwable th = null;
        try {
            try {
                this.metaClient.getTableConfig().getProperties().store(create, "");
                if (create != null) {
                    if (0 == 0) {
                        create.close();
                        return;
                    }
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    create.close();
                }
            }
            throw th4;
        }
    }

    private void createResidualFile() throws IOException {
        FileUtil.copy(this.metaClient.getFs(), new Path(this.metaClient.getMetaPath() + "/hoodie.properties"), this.metaClient.getFs(), new Path(this.metaClient.getMetaPath() + "/hoodie.properties.updated"), false, this.metaClient.getHadoopConf());
    }

    private void assertTableVersionFromPropertyFile(HoodieTableVersion hoodieTableVersion) throws IOException {
        InputStream open = this.metaClient.getFs().open(new Path(this.metaClient.getMetaPath() + "/hoodie.properties"));
        Properties properties = new Properties();
        properties.load(open);
        open.close();
        Assertions.assertEquals(Integer.toString(hoodieTableVersion.versionCode()), properties.getProperty("hoodie.table.version"));
    }
}
