/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.format.cow;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.common.io.GlobFilePathFilter;
import org.apache.flink.api.common.io.compression.InflaterInputStreamFactory;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.utils.SerializableConfiguration;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.PartitionPathUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil;
import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader;
import org.apache.hudi.util.DataTypeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CopyOnWriteInputFormat
extends FileInputFormat<RowData> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteInputFormat.class);
    private final String[] fullFieldNames;
    private final DataType[] fullFieldTypes;
    private final int[] selectedFields;
    private final String partDefaultName;
    private final boolean utcTimestamp;
    private final SerializableConfiguration conf;
    private final long limit;
    private transient ParquetColumnarRowSplitReader reader;
    private transient long currentReadCount;
    private FilePathFilter localFilesFilter = new GlobFilePathFilter();

    public CopyOnWriteInputFormat(Path[] paths, String[] fullFieldNames, DataType[] fullFieldTypes, int[] selectedFields, String partDefaultName, long limit, Configuration conf, boolean utcTimestamp) {
        super.setFilePaths(paths);
        this.limit = limit;
        this.partDefaultName = partDefaultName;
        this.fullFieldNames = fullFieldNames;
        this.fullFieldTypes = fullFieldTypes;
        this.selectedFields = selectedFields;
        this.conf = new SerializableConfiguration(conf);
        this.utcTimestamp = utcTimestamp;
    }

    @Override
    public void open(FileInputSplit fileSplit) throws IOException {
        List<String> fieldNameList = Arrays.asList(this.fullFieldNames);
        LinkedHashMap partSpec = PartitionPathUtils.extractPartitionSpecFromPath((Path)fileSplit.getPath());
        LinkedHashMap partObjects = new LinkedHashMap();
        partSpec.forEach((k, v) -> {
            int idx = fieldNameList.indexOf(k);
            if (idx == -1) {
                return;
            }
            DataType fieldType = this.fullFieldTypes[idx];
            if (!DataTypeUtils.isDatetimeType(fieldType)) {
                partObjects.put(k, DataTypeUtils.resolvePartition(this.partDefaultName.equals(v) ? null : v, fieldType));
            }
        });
        this.reader = ParquetSplitReaderUtil.genPartColumnarRowReader((boolean)this.utcTimestamp, (boolean)true, (Configuration)this.conf.conf(), (String[])this.fullFieldNames, (DataType[])this.fullFieldTypes, partObjects, (int[])this.selectedFields, (int)2048, (Path)fileSplit.getPath(), (long)fileSplit.getStart(), (long)fileSplit.getLength());
        this.currentReadCount = 0L;
    }

    @Override
    public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException {
        FileSystem fs;
        if (minNumSplits < 1) {
            throw new IllegalArgumentException("Number of input splits has to be at least 1.");
        }
        minNumSplits = Math.max(minNumSplits, this.numSplits);
        ArrayList<FileInputSplit> inputSplits = new ArrayList<FileInputSplit>(minNumSplits);
        ArrayList<FileStatus> files = new ArrayList<FileStatus>();
        long totalLength = 0L;
        for (Path path : this.getFilePaths()) {
            org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(path.toUri());
            fs = FSUtils.getFs(hadoopPath.toString(), this.conf.conf());
            FileStatus pathFile = fs.getFileStatus(hadoopPath);
            if (pathFile.isDirectory()) {
                totalLength += this.addFilesInDir(hadoopPath, files, true);
                continue;
            }
            this.testForUnsplittable(pathFile);
            files.add(pathFile);
            totalLength += pathFile.getLen();
        }
        if (this.unsplittable) {
            int splitNum = 0;
            for (FileStatus file : files) {
                FileSystem fs2 = FSUtils.getFs(file.getPath().toString(), this.conf.conf());
                BlockLocation[] blocks = fs2.getFileBlockLocations(file, 0L, file.getLen());
                HashSet<String> hosts = new HashSet<String>();
                for (BlockLocation block : blocks) {
                    hosts.addAll(Arrays.asList(block.getHosts()));
                }
                long len = file.getLen();
                if (this.testForUnsplittable(file)) {
                    len = -1L;
                }
                FileInputSplit fis = new FileInputSplit(splitNum++, new Path(file.getPath().toUri()), 0L, len, hosts.toArray(new String[0]));
                inputSplits.add(fis);
            }
            return inputSplits.toArray(new FileInputSplit[0]);
        }
        long maxSplitSize = totalLength / (long)minNumSplits + (long)(totalLength % (long)minNumSplits == 0L ? 0 : 1);
        int splitNum = 0;
        for (FileStatus file : files) {
            BlockLocation[] blocks;
            long minSplitSize;
            fs = FSUtils.getFs(file.getPath().toString(), this.conf.conf());
            long len = file.getLen();
            long blockSize = file.getBlockSize();
            if (this.minSplitSize <= blockSize) {
                minSplitSize = this.minSplitSize;
            } else {
                if (LOG.isWarnEnabled()) {
                    LOG.warn("Minimal split size of " + this.minSplitSize + " is larger than the block size of " + blockSize + ". Decreasing minimal split size to block size.");
                }
                minSplitSize = blockSize;
            }
            long splitSize = Math.max(minSplitSize, Math.min(maxSplitSize, blockSize));
            long halfSplit = splitSize >>> 1;
            long maxBytesForLastSplit = (long)((float)splitSize * 1.1f);
            if (len > 0L) {
                FileInputSplit fis;
                long bytesUnassigned;
                blocks = fs.getFileBlockLocations(file, 0L, len);
                Arrays.sort(blocks, Comparator.comparingLong(BlockLocation::getOffset));
                long position = 0L;
                int blockIndex = 0;
                for (bytesUnassigned = len; bytesUnassigned > maxBytesForLastSplit; bytesUnassigned -= splitSize) {
                    blockIndex = this.getBlockIndexForPosition(blocks, position, halfSplit, blockIndex);
                    fis = new FileInputSplit(splitNum++, new Path(file.getPath().toUri()), position, splitSize, blocks[blockIndex].getHosts());
                    inputSplits.add(fis);
                    position += splitSize;
                }
                if (bytesUnassigned <= 0L) continue;
                blockIndex = this.getBlockIndexForPosition(blocks, position, halfSplit, blockIndex);
                fis = new FileInputSplit(splitNum++, new Path(file.getPath().toUri()), position, bytesUnassigned, blocks[blockIndex].getHosts());
                inputSplits.add(fis);
                continue;
            }
            blocks = fs.getFileBlockLocations(file, 0L, 0L);
            String[] hosts = blocks.length > 0 ? blocks[0].getHosts() : new String[]{};
            FileInputSplit fis = new FileInputSplit(splitNum++, new Path(file.getPath().toUri()), 0L, 0L, hosts);
            inputSplits.add(fis);
        }
        return inputSplits.toArray(new FileInputSplit[0]);
    }

    @Override
    public boolean supportsMultiPaths() {
        return true;
    }

    @Override
    public boolean reachedEnd() throws IOException {
        if (this.currentReadCount >= this.limit) {
            return true;
        }
        return this.reader.reachedEnd();
    }

    @Override
    public RowData nextRecord(RowData reuse) {
        ++this.currentReadCount;
        return this.reader.nextRecord();
    }

    @Override
    public void close() throws IOException {
        if (this.reader != null) {
            this.reader.close();
        }
        this.reader = null;
    }

    private long addFilesInDir(org.apache.hadoop.fs.Path path, List<FileStatus> files, boolean logExcludedFiles) throws IOException {
        org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(path.toUri());
        FileSystem fs = FSUtils.getFs(hadoopPath.toString(), this.conf.conf());
        long length = 0L;
        for (FileStatus dir : fs.listStatus(hadoopPath)) {
            if (dir.isDirectory()) {
                if (this.acceptFile(dir) && this.enumerateNestedFiles) {
                    length += this.addFilesInDir(dir.getPath(), files, logExcludedFiles);
                    continue;
                }
                if (!logExcludedFiles || !LOG.isDebugEnabled()) continue;
                LOG.debug("Directory " + dir.getPath().toString() + " did not pass the file-filter and is excluded.");
                continue;
            }
            if (this.acceptFile(dir)) {
                files.add(dir);
                length += dir.getLen();
                this.testForUnsplittable(dir);
                continue;
            }
            if (!logExcludedFiles || !LOG.isDebugEnabled()) continue;
            LOG.debug("Directory " + dir.getPath().toString() + " did not pass the file-filter and is excluded.");
        }
        return length;
    }

    @Override
    public void setFilesFilter(FilePathFilter filesFilter) {
        this.localFilesFilter = filesFilter;
        super.setFilesFilter(filesFilter);
    }

    public boolean acceptFile(FileStatus fileStatus) {
        String name = fileStatus.getPath().getName();
        return !name.startsWith("_") && !name.startsWith(".") && !this.localFilesFilter.filterPath(new Path(fileStatus.getPath().toUri()));
    }

    private int getBlockIndexForPosition(BlockLocation[] blocks, long offset, long halfSplitSize, int startIndex) {
        for (int i = startIndex; i < blocks.length; ++i) {
            long blockStart = blocks[i].getOffset();
            long blockEnd = blockStart + blocks[i].getLength();
            if (offset < blockStart || offset >= blockEnd) continue;
            if (i < blocks.length - 1 && blockEnd - offset < halfSplitSize) {
                return i + 1;
            }
            return i;
        }
        throw new IllegalArgumentException("The given offset is not contained in the any block.");
    }

    private boolean testForUnsplittable(FileStatus pathFile) {
        if (this.getInflaterInputStreamFactory(pathFile.getPath()) != null) {
            this.unsplittable = true;
            return true;
        }
        return false;
    }

    private InflaterInputStreamFactory<?> getInflaterInputStreamFactory(org.apache.hadoop.fs.Path path) {
        String fileExtension = CopyOnWriteInputFormat.extractFileExtension(path.getName());
        if (fileExtension != null) {
            return CopyOnWriteInputFormat.getInflaterInputStreamFactory(fileExtension);
        }
        return null;
    }
}

