package org.apache.flink.kubernetes.highavailability;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
import org.apache.flink.runtime.persistence.ResourceVersion;
import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper;
import org.apache.flink.runtime.persistence.StateHandleStore;
import org.apache.flink.runtime.persistence.StringResourceVersion;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.util.StateHandleStoreUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.class */
public class KubernetesStateHandleStore<T extends Serializable> implements StateHandleStore<T, StringResourceVersion> {
    private static final Logger LOG = LoggerFactory.getLogger(KubernetesStateHandleStore.class);
    private final FlinkKubeClient kubeClient;
    private final String configMapName;
    private final RetrievableStateStorageHelper<T> storage;
    private final Predicate<String> configMapKeyFilter;

    @Nullable
    private final String lockIdentity;

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore$StateHandleWithDeleteMarker.class */
    public static class StateHandleWithDeleteMarker<T extends Serializable> implements StateObject {
        private final RetrievableStateHandle<T> inner;
        private final boolean markedForDeletion;

        StateHandleWithDeleteMarker(RetrievableStateHandle<T> retrievableStateHandle) {
            this(retrievableStateHandle, false);
        }

        private StateHandleWithDeleteMarker(RetrievableStateHandle<T> retrievableStateHandle, boolean z) {
            this.inner = retrievableStateHandle;
            this.markedForDeletion = z;
        }

        public void discardState() throws Exception {
            this.inner.discardState();
        }

        public long getStateSize() {
            return this.inner.getStateSize();
        }

        RetrievableStateHandle<T> getInner() {
            return this.inner;
        }

        boolean isMarkedForDeletion() {
            return this.markedForDeletion;
        }

        StateHandleWithDeleteMarker<T> toDeleting() {
            return new StateHandleWithDeleteMarker<>(this.inner, true);
        }
    }

    private static <T extends Serializable> StateHandleWithDeleteMarker<T> deserializeStateHandle(String str) throws IOException {
        Preconditions.checkNotNull(str, "Content should not be null.");
        try {
            return (StateHandleWithDeleteMarker) StateHandleStoreUtils.deserialize(Base64.getDecoder().decode(str));
        } catch (IOException | ClassNotFoundException e) {
            throw new IOException(String.format("Failed to deserialize state handle from ConfigMap data %s.", str), e);
        }
    }

    private static String toBase64(byte[] bArr) {
        return Base64.getEncoder().encodeToString(bArr);
    }

    @VisibleForTesting
    static String serializeStateHandle(StateHandleWithDeleteMarker<?> stateHandleWithDeleteMarker) throws IOException {
        return toBase64(InstantiationUtil.serializeObject(stateHandleWithDeleteMarker));
    }

    public KubernetesStateHandleStore(FlinkKubeClient flinkKubeClient, String str, RetrievableStateStorageHelper<T> retrievableStateStorageHelper, Predicate<String> predicate, @Nullable String str2) {
        this.kubeClient = (FlinkKubeClient) Preconditions.checkNotNull(flinkKubeClient, "Kubernetes client");
        this.storage = (RetrievableStateStorageHelper) Preconditions.checkNotNull(retrievableStateStorageHelper, "State storage");
        this.configMapName = (String) Preconditions.checkNotNull(str, "ConfigMap name");
        this.configMapKeyFilter = (Predicate) Preconditions.checkNotNull(predicate);
        this.lockIdentity = str2;
    }

    public RetrievableStateHandle<T> addAndLock(String str, T t) throws PossibleInconsistentStateException, Exception {
        Preconditions.checkNotNull(str, "Key in ConfigMap.");
        Preconditions.checkNotNull(t, "State.");
        RetrievableStateHandle<T> store = this.storage.store(t);
        byte[] serializeOrDiscard = StateHandleStoreUtils.serializeOrDiscard(new StateHandleWithDeleteMarker(store));
        try {
            try {
                if (!updateConfigMap(kubernetesConfigMap -> {
                    try {
                        return addEntry(kubernetesConfigMap, str, serializeOrDiscard);
                    } catch (Exception e) {
                        throw new CompletionException(e);
                    }
                }).get().booleanValue()) {
                    store.discardState();
                }
                return store;
            } catch (Exception e) {
                Optional findThrowable = ExceptionUtils.findThrowable(e, PossibleInconsistentStateException.class);
                if (findThrowable.isPresent()) {
                    throw ((PossibleInconsistentStateException) findThrowable.get());
                }
                throw ((StateHandleStore.AlreadyExistException) ExceptionUtils.findThrowable(e, StateHandleStore.AlreadyExistException.class).orElseThrow(() -> {
                    return e;
                }));
            }
        } catch (Throwable th) {
            if (1 != 0) {
                store.discardState();
            }
            throw th;
        }
    }

    public void replace(String str, StringResourceVersion stringResourceVersion, T t) throws Exception {
        Preconditions.checkNotNull(str, "Key in ConfigMap.");
        Preconditions.checkNotNull(t, "State.");
        RetrievableStateHandle store = this.storage.store(t);
        byte[] serializeOrDiscard = StateHandleStoreUtils.serializeOrDiscard(new StateHandleWithDeleteMarker(store));
        boolean z = false;
        AtomicReference atomicReference = new AtomicReference();
        try {
            try {
                boolean booleanValue = updateConfigMap(kubernetesConfigMap -> {
                    try {
                        return replaceEntry(kubernetesConfigMap, str, serializeOrDiscard, atomicReference);
                    } catch (StateHandleStore.NotExistException e) {
                        throw new CompletionException((Throwable) e);
                    }
                }).get().booleanValue();
                z = booleanValue;
                if (!booleanValue) {
                    store.discardState();
                }
                if (z) {
                    ((RetrievableStateHandle) Objects.requireNonNull(atomicReference.get(), "state handle should have been set on success")).discardState();
                }
            } catch (Exception e) {
                Optional findThrowable = ExceptionUtils.findThrowable(e, PossibleInconsistentStateException.class);
                if (!findThrowable.isPresent()) {
                    throw ((StateHandleStore.NotExistException) ExceptionUtils.findThrowable(e, StateHandleStore.NotExistException.class).orElseThrow(() -> {
                        return e;
                    }));
                }
                throw ((PossibleInconsistentStateException) findThrowable.get());
            }
        } catch (Throwable th) {
            if (1 != 0) {
                store.discardState();
            }
            if (z) {
                ((RetrievableStateHandle) Objects.requireNonNull(atomicReference.get(), "state handle should have been set on success")).discardState();
            }
            throw th;
        }
    }

    /* renamed from: exists, reason: merged with bridge method [inline-methods] */
    public StringResourceVersion m26exists(String str) throws Exception {
        Preconditions.checkNotNull(str, "Key in ConfigMap.");
        return (StringResourceVersion) this.kubeClient.getConfigMap(this.configMapName).map(kubernetesConfigMap -> {
            String str2 = kubernetesConfigMap.getData().get(str);
            if (str2 == null) {
                return StringResourceVersion.notExisting();
            }
            try {
                return deserializeStateHandle(str2).isMarkedForDeletion() ? StringResourceVersion.notExisting() : StringResourceVersion.valueOf(kubernetesConfigMap.getResourceVersion());
            } catch (IOException e) {
                return StringResourceVersion.notExisting();
            }
        }).orElseThrow(this::getConfigMapNotExistException);
    }

    public RetrievableStateHandle<T> getAndLock(String str) throws Exception {
        Preconditions.checkNotNull(str, "Key in ConfigMap.");
        Optional<KubernetesConfigMap> configMap = this.kubeClient.getConfigMap(this.configMapName);
        if (!configMap.isPresent()) {
            throw getConfigMapNotExistException();
        }
        KubernetesConfigMap kubernetesConfigMap = configMap.get();
        if (!kubernetesConfigMap.getData().containsKey(str)) {
            throw getKeyNotExistException(str);
        }
        StateHandleWithDeleteMarker deserializeStateHandle = deserializeStateHandle(kubernetesConfigMap.getData().get(str));
        if (deserializeStateHandle.isMarkedForDeletion()) {
            throw getKeyMarkedAsDeletedException(str);
        }
        return deserializeStateHandle.getInner();
    }

    public List<Tuple2<RetrievableStateHandle<T>, String>> getAllAndLock() {
        return (List) this.kubeClient.getConfigMap(this.configMapName).map(kubernetesConfigMap -> {
            ArrayList arrayList = new ArrayList();
            kubernetesConfigMap.getData().entrySet().stream().filter(entry -> {
                return this.configMapKeyFilter.test(entry.getKey());
            }).forEach(entry2 -> {
                try {
                    StateHandleWithDeleteMarker deserializeStateHandle = deserializeStateHandle((String) entry2.getValue());
                    if (!deserializeStateHandle.isMarkedForDeletion()) {
                        arrayList.add(new Tuple2(deserializeStateHandle.getInner(), entry2.getKey()));
                    }
                } catch (IOException e) {
                    LOG.warn("ConfigMap {} contained corrupted data. Ignoring the key {}.", this.configMapName, entry2.getKey());
                }
            });
            return arrayList;
        }).orElse(Collections.emptyList());
    }

    public Collection<String> getAllHandles() throws Exception {
        return (Collection) this.kubeClient.getConfigMap(this.configMapName).map(kubernetesConfigMap -> {
            return (List) kubernetesConfigMap.getData().keySet().stream().filter(this.configMapKeyFilter).filter(str -> {
                try {
                    return !deserializeStateHandle((String) Objects.requireNonNull(kubernetesConfigMap.getData().get(str))).isMarkedForDeletion();
                } catch (IOException e) {
                    return false;
                }
            }).collect(Collectors.toList());
        }).orElseThrow(this::getConfigMapNotExistException);
    }

    public boolean releaseAndTryRemove(String str) throws Exception {
        Preconditions.checkNotNull(str, "Key in ConfigMap.");
        AtomicReference atomicReference = new AtomicReference();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        return ((Boolean) updateConfigMap(kubernetesConfigMap -> {
            String str2 = kubernetesConfigMap.getData().get(str);
            if (str2 == null) {
                atomicBoolean.set(true);
                return Optional.empty();
            }
            try {
                StateHandleWithDeleteMarker deserializeStateHandle = deserializeStateHandle(str2);
                if (!deserializeStateHandle.isMarkedForDeletion()) {
                    kubernetesConfigMap.getData().put(str, serializeStateHandle(deserializeStateHandle.toDeleting()));
                }
                atomicReference.set(deserializeStateHandle.getInner());
            } catch (IOException e) {
                logInvalidEntry(str, this.configMapName, e);
                Objects.requireNonNull(kubernetesConfigMap.getData().remove(str));
            }
            return Optional.of(kubernetesConfigMap);
        }).thenCompose(bool -> {
            if (!bool.booleanValue() || atomicReference.get() == null) {
                return CompletableFuture.completedFuture(Boolean.valueOf(atomicBoolean.get() || bool.booleanValue()));
            }
            try {
                ((RetrievableStateHandle) atomicReference.get()).discardState();
                return updateConfigMap(kubernetesConfigMap2 -> {
                    kubernetesConfigMap2.getData().remove(str);
                    return Optional.of(kubernetesConfigMap2);
                });
            } catch (Exception e) {
                throw new CompletionException(e);
            }
        }).get()).booleanValue();
    }

    public void releaseAndTryRemoveAll() throws Exception {
        HashMap hashMap = new HashMap();
        updateConfigMap(kubernetesConfigMap -> {
            HashMap hashMap2 = new HashMap(kubernetesConfigMap.getData());
            for (String str : kubernetesConfigMap.getData().keySet()) {
                if (this.configMapKeyFilter.test(str)) {
                    try {
                        StateHandleWithDeleteMarker deserializeStateHandle = deserializeStateHandle((String) Objects.requireNonNull(kubernetesConfigMap.getData().get(str)));
                        hashMap.put(str, deserializeStateHandle.getInner());
                        hashMap2.put(str, serializeStateHandle(deserializeStateHandle.toDeleting()));
                    } catch (IOException e) {
                        logInvalidEntry(str, this.configMapName, e);
                    }
                }
            }
            kubernetesConfigMap.getData().clear();
            kubernetesConfigMap.getData().putAll(hashMap2);
            return Optional.of(kubernetesConfigMap);
        }).thenCompose(bool -> {
            if (!bool.booleanValue() || hashMap.isEmpty()) {
                return CompletableFuture.completedFuture(bool);
            }
            Exception exc = null;
            Iterator it = hashMap.values().iterator();
            while (it.hasNext()) {
                try {
                    ((RetrievableStateHandle) it.next()).discardState();
                } catch (Exception e) {
                    exc = (Exception) ExceptionUtils.firstOrSuppressed(e, exc);
                }
            }
            if (exc != null) {
                throw new CompletionException((Throwable) new KubernetesException("Could not properly remove all state handles.", exc));
            }
            return updateConfigMap(kubernetesConfigMap2 -> {
                Iterator it2 = hashMap.keySet().iterator();
                while (it2.hasNext()) {
                    kubernetesConfigMap2.getData().remove((String) it2.next());
                }
                return Optional.of(kubernetesConfigMap2);
            });
        }).get();
    }

    public void clearEntries() throws Exception {
        updateConfigMap(kubernetesConfigMap -> {
            kubernetesConfigMap.getData().keySet().removeIf(this.configMapKeyFilter);
            return Optional.of(kubernetesConfigMap);
        }).get();
    }

    public void release(String str) {
    }

    public void releaseAll() {
    }

    public String toString() {
        return getClass().getSimpleName() + "{configMapName='" + this.configMapName + "'}";
    }

    private boolean isValidOperation(KubernetesConfigMap kubernetesConfigMap) {
        return this.lockIdentity == null || KubernetesLeaderElector.hasLeadership(kubernetesConfigMap, this.lockIdentity);
    }

    @VisibleForTesting
    CompletableFuture<Boolean> updateConfigMap(Function<KubernetesConfigMap, Optional<KubernetesConfigMap>> function) {
        return this.kubeClient.checkAndUpdateConfigMap(this.configMapName, kubernetesConfigMap -> {
            return isValidOperation(kubernetesConfigMap) ? (Optional) function.apply(kubernetesConfigMap) : Optional.empty();
        });
    }

    private Optional<KubernetesConfigMap> addEntry(KubernetesConfigMap kubernetesConfigMap, String str, byte[] bArr) throws Exception {
        String str2 = kubernetesConfigMap.getData().get(str);
        String base64 = toBase64(bArr);
        if (str2 != null) {
            try {
                if (!deserializeStateHandle(str2).isMarkedForDeletion()) {
                    if (str2.equals(base64)) {
                        return Optional.of(kubernetesConfigMap);
                    }
                    throw getKeyAlreadyExistException(str);
                }
                if (!releaseAndTryRemove(str)) {
                    throw new IllegalStateException("Unable to remove the marked as deleting entry.");
                }
            } catch (IOException e) {
                logInvalidEntry(str, this.configMapName, e);
            }
        }
        kubernetesConfigMap.getData().put(str, base64);
        return Optional.of(kubernetesConfigMap);
    }

    private Optional<KubernetesConfigMap> replaceEntry(KubernetesConfigMap kubernetesConfigMap, String str, byte[] bArr, AtomicReference<RetrievableStateHandle<T>> atomicReference) throws StateHandleStore.NotExistException {
        StateHandleWithDeleteMarker deserializeStateHandle;
        String str2 = kubernetesConfigMap.getData().get(str);
        if (str2 == null) {
            throw getKeyNotExistException(str);
        }
        try {
            deserializeStateHandle = deserializeStateHandle(str2);
            atomicReference.set(deserializeStateHandle.getInner());
        } catch (IOException e) {
            logInvalidEntry(str, this.configMapName, e);
        }
        if (!deserializeStateHandle.isMarkedForDeletion()) {
            kubernetesConfigMap.getData().put(str, toBase64(bArr));
            return Optional.of(kubernetesConfigMap);
        }
        StateHandleStore.NotExistException keyNotExistException = getKeyNotExistException(str);
        try {
            releaseAndTryRemove(str);
        } catch (Exception e2) {
            keyNotExistException.addSuppressed(e2);
        }
        throw keyNotExistException;
    }

    private KubernetesException getConfigMapNotExistException() {
        return new KubernetesException("ConfigMap " + this.configMapName + " does not exists. It may be deleted externally.");
    }

    private StateHandleStore.NotExistException getKeyNotExistException(String str) {
        return new StateHandleStore.NotExistException("Could not find " + str + " in ConfigMap " + this.configMapName);
    }

    private StateHandleStore.NotExistException getKeyMarkedAsDeletedException(String str) {
        return new StateHandleStore.NotExistException("Already marked for deletion " + str + " in ConfigMap " + this.configMapName);
    }

    private StateHandleStore.AlreadyExistException getKeyAlreadyExistException(String str) {
        return new StateHandleStore.AlreadyExistException(str + " already exists in ConfigMap " + this.configMapName);
    }

    private static void logInvalidEntry(String str, String str2, Throwable th) {
        LOG.warn("Could not retrieve the state handle of '{}' from ConfigMap '{}'. Removing the entry as we don't have any way to recover.", new Object[]{str, str2, th});
    }

    /* JADX WARN: Multi-variable type inference failed */
    public /* bridge */ /* synthetic */ void replace(String str, ResourceVersion resourceVersion, Serializable serializable) throws PossibleInconsistentStateException, Exception {
        replace(str, (StringResourceVersion) resourceVersion, (StringResourceVersion) serializable);
    }
}
