/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.index.bucket;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.ConsistentHashingNode;
import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.bucket.BucketIndexLocationMapper;
import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
import org.apache.hudi.index.bucket.HoodieBucketIndex;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public class HoodieSparkConsistentBucketIndex
extends HoodieBucketIndex {
    private static final Logger LOG = LogManager.getLogger(HoodieSparkConsistentBucketIndex.class);

    public HoodieSparkConsistentBucketIndex(HoodieWriteConfig config) {
        super(config);
    }

    @Override
    public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context, HoodieTable hoodieTable) throws HoodieIndexException {
        return writeStatuses;
    }

    @Override
    public boolean rollbackCommit(String instantTime) {
        return true;
    }

    @Override
    protected BucketIndexLocationMapper getLocationMapper(HoodieTable table, List<String> partitionPath) {
        return new ConsistentBucketIndexLocationMapper(table, partitionPath);
    }

    public HoodieConsistentHashingMetadata loadOrCreateMetadata(HoodieTable table, String partition) {
        HoodieConsistentHashingMetadata metadata = HoodieSparkConsistentBucketIndex.loadMetadata(table, partition);
        if (metadata != null) {
            return metadata;
        }
        metadata = new HoodieConsistentHashingMetadata(partition, this.numBuckets);
        if (HoodieSparkConsistentBucketIndex.saveMetadata(table, metadata, false)) {
            return metadata;
        }
        metadata = HoodieSparkConsistentBucketIndex.loadMetadata(table, partition);
        ValidationUtils.checkState(metadata != null, "Failed to load or create metadata, partition: " + partition);
        return metadata;
    }

    public static HoodieConsistentHashingMetadata loadMetadata(HoodieTable table, String partition) {
        Path metadataPath = FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), partition);
        try {
            if (!table.getMetaClient().getFs().exists(metadataPath)) {
                return null;
            }
            FileStatus[] metaFiles = table.getMetaClient().getFs().listStatus(metadataPath);
            HoodieTimeline completedCommits = table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants();
            Predicate<FileStatus> metaFilePredicate = fileStatus -> {
                String filename = fileStatus.getPath().getName();
                if (!filename.contains(".hashing_meta")) {
                    return false;
                }
                String timestamp = HoodieConsistentHashingMetadata.getTimestampFromFile(filename);
                return completedCommits.containsInstant(timestamp) || timestamp.equals("00000000000000");
            };
            FileStatus metaFile = Arrays.stream(metaFiles).filter(metaFilePredicate).max(Comparator.comparing(a -> a.getPath().getName())).orElse(null);
            if (metaFile == null) {
                return null;
            }
            byte[] content = FileIOUtils.readAsByteArray((InputStream)table.getMetaClient().getFs().open(metaFile.getPath()));
            return HoodieConsistentHashingMetadata.fromBytes(content);
        }
        catch (IOException e) {
            LOG.error((Object)("Error when loading hashing metadata, partition: " + partition), (Throwable)e);
            throw new HoodieIndexException("Error while loading hashing metadata", e);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private static boolean saveMetadata(HoodieTable table, HoodieConsistentHashingMetadata metadata, boolean overwrite) {
        HoodieWrapperFileSystem fs = table.getMetaClient().getFs();
        Path dir = FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), metadata.getPartitionPath());
        Path fullPath = new Path(dir, metadata.getFilename());
        try (FSDataOutputStream fsOut = fs.create(fullPath, overwrite);){
            byte[] bytes = metadata.toBytes();
            fsOut.write(bytes);
            fsOut.close();
            boolean bl = true;
            return bl;
        }
        catch (IOException e) {
            LOG.warn((Object)("Failed to update bucket metadata: " + metadata), (Throwable)e);
            return false;
        }
    }

    public class ConsistentBucketIndexLocationMapper
    implements BucketIndexLocationMapper {
        private final Map<String, ConsistentBucketIdentifier> partitionToIdentifier;

        public ConsistentBucketIndexLocationMapper(HoodieTable table, List<String> partitions) {
            this.partitionToIdentifier = partitions.stream().collect(Collectors.toMap(p -> p, p -> {
                HoodieConsistentHashingMetadata metadata = HoodieSparkConsistentBucketIndex.this.loadOrCreateMetadata(table, (String)p);
                return new ConsistentBucketIdentifier(metadata);
            }));
        }

        @Override
        public Option<HoodieRecordLocation> getRecordLocation(HoodieKey key, String partitionPath) {
            ConsistentHashingNode node = this.partitionToIdentifier.get(partitionPath).getBucket(key, HoodieSparkConsistentBucketIndex.this.indexKeyFields);
            if (!StringUtils.isNullOrEmpty(node.getFileIdPrefix())) {
                return Option.of(new HoodieRecordLocation(null, FSUtils.createNewFileId(node.getFileIdPrefix(), 0)));
            }
            LOG.error((Object)("Consistent hashing node has no file group, partition: " + partitionPath + ", meta: " + this.partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", record_key: " + key.toString()));
            throw new HoodieIndexException("Failed to getBucket as hashing node has no file group");
        }
    }
}

