/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.commit;

import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.AbstractDeleteHelper;
import org.apache.hudi.table.action.commit.BaseCommitActionExecutor;

public class FlinkDeleteHelper<R>
extends AbstractDeleteHelper<EmptyHoodieRecordPayload, List<HoodieRecord<EmptyHoodieRecordPayload>>, List<HoodieKey>, List<WriteStatus>, R> {
    private FlinkDeleteHelper() {
    }

    public static FlinkDeleteHelper newInstance() {
        return DeleteHelperHolder.FLINK_DELETE_HELPER;
    }

    @Override
    public List<HoodieKey> deduplicateKeys(List<HoodieKey> keys2, HoodieTable<EmptyHoodieRecordPayload, List<HoodieRecord<EmptyHoodieRecordPayload>>, List<HoodieKey>, List<WriteStatus>> table, int parallelism) {
        boolean isIndexingGlobal = table.getIndex().isGlobal();
        if (isIndexingGlobal) {
            HashSet recordKeys = keys2.stream().map(HoodieKey::getRecordKey).collect(Collectors.toCollection(HashSet::new));
            LinkedList<HoodieKey> deduplicatedKeys = new LinkedList<HoodieKey>();
            keys2.forEach(x -> {
                if (recordKeys.contains(x.getRecordKey())) {
                    deduplicatedKeys.add((HoodieKey)x);
                }
            });
            return deduplicatedKeys;
        }
        HashSet<HoodieKey> set = new HashSet<HoodieKey>(keys2);
        keys2.clear();
        keys2.addAll(set);
        return keys2;
    }

    @Override
    public HoodieWriteMetadata<List<WriteStatus>> execute(String instantTime, List<HoodieKey> keys2, HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<EmptyHoodieRecordPayload, List<HoodieRecord<EmptyHoodieRecordPayload>>, List<HoodieKey>, List<WriteStatus>> table, BaseCommitActionExecutor<EmptyHoodieRecordPayload, List<HoodieRecord<EmptyHoodieRecordPayload>>, List<HoodieKey>, List<WriteStatus>, R> deleteExecutor) {
        try {
            HoodieWriteMetadata<List<WriteStatus>> result = null;
            List<HoodieKey> dedupedKeys = keys2;
            int parallelism = config.getDeleteShuffleParallelism();
            if (config.shouldCombineBeforeDelete()) {
                dedupedKeys = this.deduplicateKeys(keys2, table, parallelism);
            }
            List dedupedRecords = dedupedKeys.stream().map(key -> new HoodieRecord<EmptyHoodieRecordPayload>((HoodieKey)key, new EmptyHoodieRecordPayload())).collect(Collectors.toList());
            Instant beginTag = Instant.now();
            List<HoodieRecord<EmptyHoodieRecordPayload>> taggedRecords = table.getIndex().tagLocation(dedupedRecords, context, table);
            Duration tagLocationDuration = Duration.between(beginTag, Instant.now());
            List taggedValidRecords = taggedRecords.stream().filter(HoodieRecord::isCurrentLocationKnown).collect(Collectors.toList());
            if (!taggedValidRecords.isEmpty()) {
                result = deleteExecutor.execute(taggedValidRecords);
                result.setIndexLookupDuration(tagLocationDuration);
            } else {
                deleteExecutor.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(Pair.of(new HashMap(), new WorkloadStat())), instantTime);
                result = new HoodieWriteMetadata();
                result.setWriteStatuses(Collections.EMPTY_LIST);
                deleteExecutor.commitOnAutoCommit(result);
            }
            return result;
        }
        catch (Throwable e) {
            if (e instanceof HoodieUpsertException) {
                throw (HoodieUpsertException)e;
            }
            throw new HoodieUpsertException("Failed to delete for commit time " + instantTime, e);
        }
    }

    private static class DeleteHelperHolder {
        private static final FlinkDeleteHelper FLINK_DELETE_HELPER = new FlinkDeleteHelper();

        private DeleteHelperHolder() {
        }
    }
}

