package org.apache.hudi.sink.meta;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.meta.CkpMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/sink/meta/CkpMetadata.class */
public class CkpMetadata implements Serializable, AutoCloseable {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(CkpMetadata.class);
    private static final int MAX_RETAIN_CKP_NUM = 3;
    private static final String CKP_META = "ckp_meta";
    private final FileSystem fs;
    protected final Path path;
    private List<CkpMessage> messages;
    private List<String> instantCache;

    private CkpMetadata(Configuration configuration) {
        this(FSUtils.getFs(configuration.getString(FlinkOptions.PATH), HadoopConfigurations.getHadoopConf(configuration)), configuration.getString(FlinkOptions.PATH), configuration.getString(FlinkOptions.WRITE_CLIENT_ID));
    }

    private CkpMetadata(FileSystem fileSystem, String str, String str2) {
        this.fs = fileSystem;
        this.path = new Path(ckpMetaPath(str, str2));
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.instantCache = null;
    }

    public void bootstrap() throws IOException {
        this.fs.delete(this.path, true);
        this.fs.mkdirs(this.path);
    }

    public void startInstant(String str) {
        try {
            this.fs.createNewFile(fullPath(CkpMessage.getFileName(str, CkpMessage.State.INFLIGHT)));
            cache(str);
            clean();
        } catch (IOException e) {
            throw new HoodieException("Exception while adding checkpoint start metadata for instant: " + str, e);
        }
    }

    private void cache(String str) {
        if (this.instantCache == null) {
            this.instantCache = new ArrayList();
        }
        this.instantCache.add(str);
    }

    private void clean() {
        if (this.instantCache.size() > MAX_RETAIN_CKP_NUM) {
            boolean[] zArr = new boolean[1];
            CkpMessage.getAllFileNames(this.instantCache.get(0)).stream().map(this::fullPath).forEach(path -> {
                try {
                    this.fs.delete(path, false);
                } catch (IOException e) {
                    zArr[0] = true;
                    LOG.warn("Exception while cleaning the checkpoint meta file: " + path);
                }
            });
            if (zArr[0]) {
                return;
            }
            this.instantCache.remove(0);
        }
    }

    public void commitInstant(String str) {
        try {
            this.fs.createNewFile(fullPath(CkpMessage.getFileName(str, CkpMessage.State.COMPLETED)));
        } catch (IOException e) {
            throw new HoodieException("Exception while adding checkpoint commit metadata for instant: " + str, e);
        }
    }

    public void abortInstant(String str) {
        try {
            this.fs.createNewFile(fullPath(CkpMessage.getFileName(str, CkpMessage.State.ABORTED)));
        } catch (IOException e) {
            throw new HoodieException("Exception while adding checkpoint abort metadata for instant: " + str);
        }
    }

    private void load() {
        try {
            this.messages = scanCkpMetadata(this.path);
        } catch (IOException e) {
            throw new HoodieException("Exception while scanning the checkpoint meta files under path: " + this.path, e);
        }
    }

    @Nullable
    public String lastPendingInstant() {
        load();
        if (this.messages.size() <= 0) {
            return null;
        }
        CkpMessage ckpMessage = this.messages.get(this.messages.size() - 1);
        if (ckpMessage.isComplete()) {
            return null;
        }
        return ckpMessage.getInstant();
    }

    public List<CkpMessage> getMessages() {
        load();
        return this.messages;
    }

    public boolean isAborted(String str) {
        ValidationUtils.checkState(this.messages != null, "The checkpoint metadata should #load first");
        return this.messages.stream().anyMatch(ckpMessage -> {
            return str.equals(ckpMessage.getInstant()) && ckpMessage.isAborted();
        });
    }

    @VisibleForTesting
    public List<String> getInstantCache() {
        return this.instantCache;
    }

    public static CkpMetadata getInstance(Configuration configuration) {
        return new CkpMetadata(configuration);
    }

    public static CkpMetadata getInstance(HoodieTableMetaClient hoodieTableMetaClient, String str) {
        return new CkpMetadata(hoodieTableMetaClient.getFs(), hoodieTableMetaClient.getBasePath(), str);
    }

    public static CkpMetadata getInstance(FileSystem fileSystem, String str, String str2) {
        return new CkpMetadata(fileSystem, str, str2);
    }

    protected static String ckpMetaPath(String str, String str2) {
        String str3 = str + "/.hoodie/.aux/" + CKP_META;
        return StringUtils.isNullOrEmpty(str2) ? str3 : str3 + "_" + str2;
    }

    private Path fullPath(String str) {
        return new Path(this.path, str);
    }

    private List<CkpMessage> scanCkpMetadata(Path path) throws IOException {
        return !this.fs.exists(path) ? new ArrayList() : (List) ((Map) Arrays.stream(this.fs.listStatus(path)).map(CkpMessage::new).collect(Collectors.groupingBy((v0) -> {
            return v0.getInstant();
        }))).values().stream().map(list -> {
            return (CkpMessage) list.stream().reduce((ckpMessage, ckpMessage2) -> {
                return ckpMessage.getState().compareTo(ckpMessage2.getState()) >= 0 ? ckpMessage : ckpMessage2;
            }).get();
        }).sorted().collect(Collectors.toList());
    }
}
