package org.apache.hudi.utils;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.configuration.Configuration;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.FlinkTables;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/hudi/utils/TestCompactionUtil.class */
public class TestCompactionUtil {
    private HoodieFlinkTable<?> table;
    private HoodieTableMetaClient metaClient;
    private Configuration conf;

    @TempDir
    File tempFile;

    void beforeEach() throws IOException {
        beforeEach(Collections.emptyMap());
    }

    void beforeEach(Map<String, String> map) throws IOException {
        this.conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        this.conf.setString(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
        map.forEach((str, str2) -> {
            this.conf.setString(str, str2);
        });
        StreamerUtil.initTableIfNotExists(this.conf);
        this.table = FlinkTables.createTable(this.conf);
        this.metaClient = this.table.getMetaClient();
        if (this.conf.getBoolean(FlinkOptions.METADATA_ENABLED)) {
            FlinkHoodieBackedTableMetadataWriter.create(this.table.getHadoopConf(), this.table.getConfig(), this.table.getContext(), Option.empty(), Option.empty());
        }
    }

    @Test
    void rollbackCompaction() throws Exception {
        beforeEach();
        List list = (List) IntStream.range(0, 3).mapToObj(i -> {
            return generateCompactionPlan();
        }).collect(Collectors.toList());
        MatcherAssert.assertThat("all the instants should be in pending state", Integer.valueOf(((List) this.metaClient.getActiveTimeline().filterPendingCompactionTimeline().filter(hoodieInstant -> {
            return hoodieInstant.getState() == HoodieInstant.State.INFLIGHT;
        }).getInstants().collect(Collectors.toList())).size()), CoreMatchers.is(3));
        CompactionUtil.rollbackCompaction(this.table);
        Assertions.assertTrue(this.metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants().allMatch(hoodieInstant2 -> {
            return hoodieInstant2.getState() == HoodieInstant.State.REQUESTED;
        }), "all the instants should be rolled back");
        MatcherAssert.assertThat((List) this.metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants().map((v0) -> {
            return v0.getTimestamp();
        }).collect(Collectors.toList()), CoreMatchers.is(list));
    }

    @Test
    void rollbackEarliestCompaction() throws Exception {
        beforeEach();
        this.conf.setInteger(FlinkOptions.COMPACTION_TIMEOUT_SECONDS, 0);
        List list = (List) IntStream.range(0, 3).mapToObj(i -> {
            return generateCompactionPlan();
        }).collect(Collectors.toList());
        MatcherAssert.assertThat("all the instants should be in pending state", Integer.valueOf(((List) this.metaClient.getActiveTimeline().filterPendingCompactionTimeline().filter(hoodieInstant -> {
            return hoodieInstant.getState() == HoodieInstant.State.INFLIGHT;
        }).getInstants().collect(Collectors.toList())).size()), CoreMatchers.is(3));
        CompactionUtil.rollbackEarliestCompaction(this.table, this.conf);
        MatcherAssert.assertThat("Only the first instant expects to be rolled back", Long.valueOf(this.metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants().filter(hoodieInstant2 -> {
            return hoodieInstant2.getState() == HoodieInstant.State.REQUESTED;
        }).count()), CoreMatchers.is(1L));
        MatcherAssert.assertThat(((HoodieInstant) this.metaClient.getActiveTimeline().filterPendingCompactionTimeline().filter(hoodieInstant3 -> {
            return hoodieInstant3.getState() == HoodieInstant.State.REQUESTED;
        }).firstInstant().get()).getTimestamp(), CoreMatchers.is(list.get(0)));
    }

    @Test
    void testScheduleCompaction() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
        hashMap.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(), "time_elapsed");
        hashMap.put(FlinkOptions.COMPACTION_DELTA_SECONDS.key(), "0");
        beforeEach(hashMap);
        TestData.writeDataAsBatch(TestData.DATA_SET_SINGLE_INSERT, this.conf);
        HoodieFlinkWriteClient createWriteClient = FlinkWriteClients.createWriteClient(this.conf);
        CompactionUtil.scheduleCompaction(this.metaClient, createWriteClient, true, true);
        Assertions.assertTrue(this.metaClient.reloadActiveTimeline().filterPendingCompactionTimeline().lastInstant().isPresent(), "A compaction plan expects to be scheduled");
        TestData.writeDataAsBatch(TestData.DATA_SET_INSERT, this.conf);
        TimeUnit.SECONDS.sleep(3L);
        createWriteClient.startCommit();
        CompactionUtil.scheduleCompaction(this.metaClient, createWriteClient, true, false);
        MatcherAssert.assertThat("Two compaction plan expects to be scheduled", Integer.valueOf(this.metaClient.reloadActiveTimeline().filterPendingCompactionTimeline().countInstants()), CoreMatchers.is(2));
    }

    private String generateCompactionPlan() {
        HoodieCompactionPlan hoodieCompactionPlan = new HoodieCompactionPlan(Collections.singletonList(new HoodieCompactionOperation()), Collections.emptyMap(), 1);
        String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
        HoodieInstant hoodieInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, "compaction", createNewInstantTime);
        try {
            this.metaClient.getActiveTimeline().saveToCompactionRequested(hoodieInstant, TimelineMetadataUtils.serializeCompactionPlan(hoodieCompactionPlan));
            this.table.getActiveTimeline().transitionCompactionRequestedToInflight(hoodieInstant);
            this.metaClient.reloadActiveTimeline();
            return createNewInstantTime;
        } catch (IOException e) {
            throw new HoodieIOException("Exception scheduling compaction", e);
        }
    }
}
