/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.checkpointing;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider;

public class KafkaConnectHdfsProvider
extends InitialCheckPointProvider {
    private static final String FILENAME_SEPARATOR = "[\\+\\.]";

    public KafkaConnectHdfsProvider(TypedProperties props) {
        super(props);
    }

    @Override
    public void init(Configuration config) throws HoodieException {
        try {
            this.fs = FileSystem.get((Configuration)config);
        }
        catch (IOException e) {
            throw new HoodieException("KafkaConnectHdfsProvider initialization failed");
        }
    }

    private static String buildCheckpointStr(String topic, HashMap<Integer, Integer> checkpoint) {
        StringBuilder checkpointStr = new StringBuilder();
        checkpointStr.append(topic);
        for (int i = 0; i < checkpoint.size(); ++i) {
            checkpointStr.append(",").append(i).append(":").append(checkpoint.get(i));
        }
        return checkpointStr.toString();
    }

    private ArrayList<FileStatus> listAllFileStatus(Path curPath, KafkaConnectPathFilter filter) throws IOException {
        FileStatus[] fileStatus;
        ArrayList<FileStatus> allFileStatus = new ArrayList<FileStatus>();
        for (FileStatus status : fileStatus = this.fs.listStatus(curPath)) {
            if (status.isDirectory() && filter.acceptDir(status.getPath())) {
                allFileStatus.addAll(this.listAllFileStatus(status.getPath(), filter));
                continue;
            }
            if (!filter.accept(status.getPath())) continue;
            allFileStatus.add(status);
        }
        return allFileStatus;
    }

    @Override
    public String getCheckpoint() throws HoodieException {
        ArrayList<FileStatus> fileStatus;
        KafkaConnectPathFilter filter = new KafkaConnectPathFilter();
        try {
            fileStatus = this.listAllFileStatus(this.path, filter);
        }
        catch (IOException e) {
            throw new HoodieException(e.toString());
        }
        if (fileStatus.size() == 0) {
            throw new HoodieException("No valid Kafka Connect Hdfs file found under:" + this.path.getName());
        }
        String topic = fileStatus.get(0).getPath().getName().split(FILENAME_SEPARATOR)[0];
        int maxPartition = -1;
        HashMap<Integer, Integer> checkpointMap = new HashMap<Integer, Integer>();
        for (FileStatus status : fileStatus) {
            String filename = status.getPath().getName();
            String[] groups2 = filename.split(FILENAME_SEPARATOR);
            int partition = Integer.parseInt(groups2[1]);
            int offsetUpper = Integer.parseInt(groups2[3]);
            maxPartition = Math.max(maxPartition, partition);
            if (checkpointMap.containsKey(partition)) {
                checkpointMap.put(partition, Math.max(checkpointMap.get(partition), offsetUpper));
                continue;
            }
            checkpointMap.put(partition, offsetUpper);
        }
        if (checkpointMap.size() != maxPartition + 1) {
            throw new HoodieException("Missing partition from the file scan, max partition found(start from 0): " + maxPartition + " total partitions number appear in " + this.path.getName() + " is: " + checkpointMap.size() + " total partitions number expected: " + (maxPartition + 1));
        }
        return KafkaConnectHdfsProvider.buildCheckpointStr(topic, checkpointMap);
    }

    public static class KafkaConnectPathFilter
    implements PathFilter {
        private static final Pattern DIRECTORY_PATTERN = Pattern.compile(".*=.*");
        private static final Pattern PATTERN = Pattern.compile("[a-zA-Z0-9\\._\\-]+\\+\\d+\\+\\d+\\+\\d+(.\\w+)?");

        public boolean accept(Path path) {
            String filename = path.getName();
            Matcher matcher = PATTERN.matcher(filename);
            return matcher.matches();
        }

        public boolean acceptDir(Path path) {
            String dirName = path.getName();
            Matcher matcher = DIRECTORY_PATTERN.matcher(dirName);
            return matcher.matches();
        }
    }
}

