/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.transaction.lock;

import java.net.URI;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.hudi.client.transaction.lock.StorageLockClient;
import org.apache.hudi.client.transaction.lock.models.HeartbeatManager;
import org.apache.hudi.client.transaction.lock.models.LockGetResult;
import org.apache.hudi.client.transaction.lock.models.LockProviderHeartbeatManager;
import org.apache.hudi.client.transaction.lock.models.LockUpsertResult;
import org.apache.hudi.client.transaction.lock.models.StorageLockData;
import org.apache.hudi.client.transaction.lock.models.StorageLockFile;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.lock.LockProvider;
import org.apache.hudi.common.lock.LockState;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.StorageBasedLockConfig;
import org.apache.hudi.exception.HoodieLockException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StorageSchemes;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class StorageBasedLockProvider
implements LockProvider<StorageLockFile> {
    public static final String DEFAULT_TABLE_LOCK_FILE_NAME = "table_lock.json";
    private static final long CLOCK_DRIFT_BUFFER_MS = 500L;
    private static final Logger LOGGER = LoggerFactory.getLogger(StorageBasedLockProvider.class);
    private final Logger logger;
    private final StorageLockClient storageLockClient;
    private final long lockValiditySecs;
    private final String ownerId;
    private final String lockFilePath;
    private final HeartbeatManager heartbeatManager;
    private final transient Thread shutdownThread;
    @GuardedBy(value="this")
    private StorageLockFile currentLockObj = null;
    @GuardedBy(value="this")
    private boolean isClosed = false;
    private static final String LOCK_STATE_LOGGER_MSG = "Owner {}: Lock file path {}, Thread {}, Storage based lock state {}";
    private static final String LOCK_STATE_LOGGER_MSG_WITH_INFO = "Owner {}: Lock file path {}, Thread {}, Storage based lock state {}, {}";

    private synchronized void setLock(StorageLockFile lockObj) {
        if (lockObj != null && !Objects.equals(lockObj.getOwner(), this.ownerId)) {
            throw new HoodieLockException("Owners do not match. Current lock owner: " + this.ownerId + " lock path: " + this.lockFilePath + " owner: " + lockObj.getOwner());
        }
        this.currentLockObj = lockObj;
    }

    public StorageBasedLockProvider(LockConfiguration lockConfiguration, StorageConfiguration<?> conf) {
        this(UUID.randomUUID().toString(), lockConfiguration.getConfig(), LockProviderHeartbeatManager::new, StorageBasedLockProvider.getStorageLockClientClassName(), LOGGER);
    }

    private static Functions.Function3<String, String, TypedProperties, StorageLockClient> getStorageLockClientClassName() {
        return (ownerId, lockFilePath, lockConfig) -> {
            try {
                return (StorageLockClient)ReflectionUtils.loadClass(StorageBasedLockProvider.getLockServiceClassName(new URI((String)lockFilePath).getScheme()), new Class[]{String.class, String.class, Properties.class}, ownerId, lockFilePath, lockConfig);
            }
            catch (Throwable e) {
                throw new HoodieLockException("Failed to load and initialize StorageLock", e);
            }
        };
    }

    @NotNull
    private static String getLockServiceClassName(String scheme2) {
        Option<StorageSchemes> schemeOptional = StorageSchemes.getStorageLockImplementationIfExists(scheme2);
        if (schemeOptional.isPresent()) {
            return schemeOptional.get().getStorageLockClass();
        }
        throw new HoodieNotSupportedException("No implementation of StorageLock supports this scheme: " + scheme2);
    }

    @VisibleForTesting
    StorageBasedLockProvider(String ownerId, TypedProperties properties2, Functions.Function3<String, Long, Supplier<Boolean>, HeartbeatManager> heartbeatManagerLoader, Functions.Function3<String, String, TypedProperties, StorageLockClient> storageLockClientLoader, Logger logger) {
        StorageBasedLockConfig config = new StorageBasedLockConfig.Builder().fromProperties(properties2).build();
        long heartbeatPollSeconds = config.getHeartbeatPollSeconds();
        this.lockValiditySecs = config.getValiditySeconds();
        this.lockFilePath = String.format("%s%s%s%s%s", config.getHudiTableBasePath(), "/", ".hoodie/.locks", "/", DEFAULT_TABLE_LOCK_FILE_NAME);
        this.heartbeatManager = heartbeatManagerLoader.apply(ownerId, heartbeatPollSeconds * 1000L, this::renewLock);
        this.storageLockClient = storageLockClientLoader.apply(ownerId, this.lockFilePath, properties2);
        this.ownerId = ownerId;
        this.logger = logger;
        this.shutdownThread = new Thread(() -> this.shutdown(true));
        Runtime.getRuntime().addShutdownHook(this.shutdownThread);
        logger.info("Instantiated new storage-based lock provider, owner: {}, lockfilePath: {}", (Object)ownerId, (Object)this.lockFilePath);
    }

    @Override
    public synchronized StorageLockFile getLock() {
        return this.currentLockObj;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) {
        long deadlineNanos = System.nanoTime() + unit.toNanos(time);
        while (System.nanoTime() < deadlineNanos) {
            try {
                this.logDebugLockState(LockState.ACQUIRING);
                if (this.tryLock()) {
                    return true;
                }
                Thread.sleep(Long.parseLong(LockConfiguration.DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS));
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new HoodieLockException(this.generateLockStateMessage(LockState.FAILED_TO_ACQUIRE), e);
            }
        }
        return false;
    }

    @Override
    public synchronized void close() {
        this.shutdown(false);
    }

    private synchronized void shutdown(boolean fromShutdownHook) {
        if (fromShutdownHook) {
            if (!this.isClosed && this.actuallyHoldsLock()) {
                this.tryExpireCurrentLock(true);
            }
            return;
        }
        Runtime.getRuntime().removeShutdownHook(this.shutdownThread);
        try {
            this.unlock();
        }
        catch (Exception e) {
            this.logger.error("Owner {}: Failed to unlock current lock.", (Object)this.ownerId, (Object)e);
        }
        try {
            this.storageLockClient.close();
        }
        catch (Exception e) {
            this.logger.error("Owner {}: Lock service failed to close.", (Object)this.ownerId, (Object)e);
        }
        try {
            this.heartbeatManager.close();
        }
        catch (Exception e) {
            this.logger.error("Owner {}: Heartbeat manager failed to close.", (Object)this.ownerId, (Object)e);
        }
        this.isClosed = true;
    }

    private synchronized boolean isLockStillValid(StorageLockFile lock) {
        return !lock.isExpired() && !this.isCurrentTimeCertainlyOlderThanDistributedTime(lock.getValidUntilMs());
    }

    @Override
    public synchronized boolean tryLock() {
        this.assertHeartbeatManagerExists();
        this.assertUnclosed();
        this.logDebugLockState(LockState.ACQUIRING);
        if (this.actuallyHoldsLock()) {
            return true;
        }
        if (this.heartbeatManager.hasActiveHeartbeat()) {
            this.logger.error("Detected broken invariant: there is an active heartbeat without a lock being held.");
            throw new HoodieLockException(this.generateLockStateMessage(LockState.FAILED_TO_ACQUIRE));
        }
        Pair<LockGetResult, Option<StorageLockFile>> latestLock = this.storageLockClient.readCurrentLockFile();
        if (latestLock.getLeft() == LockGetResult.UNKNOWN_ERROR) {
            this.logInfoLockState(LockState.FAILED_TO_ACQUIRE, "Failed to get the latest lock status");
            return false;
        }
        if (latestLock.getLeft() == LockGetResult.SUCCESS && this.isLockStillValid(latestLock.getRight().get())) {
            String msg = String.format("Lock already held by %s", latestLock.getRight().get().getOwner());
            this.logInfoLockState(LockState.FAILED_TO_ACQUIRE, msg);
            return false;
        }
        StorageLockData newLockData = new StorageLockData(false, System.currentTimeMillis() + this.lockValiditySecs, this.ownerId);
        Pair<LockUpsertResult, Option<StorageLockFile>> lockUpdateStatus = this.storageLockClient.tryUpsertLockFile(newLockData, latestLock.getRight());
        if (lockUpdateStatus.getLeft() != LockUpsertResult.SUCCESS) {
            this.logInfoLockState(LockState.FAILED_TO_ACQUIRE);
            return false;
        }
        this.setLock(lockUpdateStatus.getRight().get());
        if (!this.heartbeatManager.startHeartbeatForThread(Thread.currentThread())) {
            this.logErrorLockState(LockState.RELEASING, "We were unable to start the heartbeat!");
            this.tryExpireCurrentLock(false);
            return false;
        }
        this.logInfoLockState(LockState.ACQUIRED);
        return true;
    }

    private boolean actuallyHoldsLock() {
        return this.believesLockMightBeHeld() && this.isLockStillValid(this.getLock());
    }

    private boolean believesLockMightBeHeld() {
        return this.getLock() != null;
    }

    @Override
    public synchronized void unlock() {
        this.assertHeartbeatManagerExists();
        if (!this.believesLockMightBeHeld()) {
            return;
        }
        boolean believesNoLongerHoldsLock = true;
        if (this.heartbeatManager.hasActiveHeartbeat()) {
            this.logger.debug("Owner {}: Gracefully shutting down heartbeat.", (Object)this.ownerId);
            believesNoLongerHoldsLock &= this.heartbeatManager.stopHeartbeat(true);
        }
        if (!(believesNoLongerHoldsLock &= this.tryExpireCurrentLock(false))) {
            throw new HoodieLockException(this.generateLockStateMessage(LockState.FAILED_TO_RELEASE));
        }
    }

    private void assertHeartbeatManagerExists() {
        if (this.heartbeatManager == null) {
            throw new HoodieLockException("Unexpected null heartbeatManager");
        }
    }

    private void assertUnclosed() {
        if (this.isClosed) {
            throw new HoodieLockException("Lock provider already closed");
        }
    }

    private synchronized boolean tryExpireCurrentLock(boolean fromShutdownHook) {
        if (!fromShutdownHook && this.heartbeatManager.hasActiveHeartbeat()) {
            throw new HoodieLockException("Must stop heartbeat before expire lock file");
        }
        this.logDebugLockState(LockState.RELEASING);
        StorageLockData expiredLockData = new StorageLockData(true, this.getLock().getValidUntilMs(), this.ownerId);
        Pair<LockUpsertResult, Option<StorageLockFile>> result2 = this.storageLockClient.tryUpsertLockFile(expiredLockData, Option.of(this.getLock()));
        switch (result2.getLeft()) {
            case UNKNOWN_ERROR: {
                this.logErrorLockState(LockState.FAILED_TO_RELEASE, "Lock state is unknown.");
                return false;
            }
            case SUCCESS: {
                this.logInfoLockState(LockState.RELEASED);
                this.setLock(null);
                return true;
            }
            case ACQUIRED_BY_OTHERS: {
                this.logWarnLockState(LockState.RELEASED, "lock should not have been acquired by others.");
                this.setLock(null);
                return true;
            }
        }
        throw new HoodieLockException("Unexpected lock update result: " + (Object)((Object)result2.getLeft()));
    }

    @VisibleForTesting
    protected synchronized boolean renewLock() {
        try {
            if (!this.believesLockMightBeHeld()) {
                this.logger.warn("Owner {}: Cannot renew, no lock held by this process", (Object)this.ownerId);
                return false;
            }
            long oldExpirationMs = this.getLock().getValidUntilMs();
            Pair<LockUpsertResult, Option<StorageLockFile>> currentLock = this.storageLockClient.tryUpsertLockFile(new StorageLockData(false, System.currentTimeMillis() + this.lockValiditySecs, this.ownerId), Option.of(this.getLock()));
            switch (currentLock.getLeft()) {
                case ACQUIRED_BY_OTHERS: {
                    this.logger.error("Owner {}: Unable to renew lock as it is acquired by others.", (Object)this.ownerId);
                    return false;
                }
                case UNKNOWN_ERROR: {
                    this.logger.warn("Owner {}: Unable to renew lock due to unknown error, could be transient.", (Object)this.ownerId);
                    return true;
                }
                case SUCCESS: {
                    this.setLock(currentLock.getRight().get());
                    this.logger.info("Owner {}: Lock renewal successful. The renewal completes {} ms before expiration for lock {}.", new Object[]{this.ownerId, oldExpirationMs - System.currentTimeMillis(), this.lockFilePath});
                    return true;
                }
            }
            throw new HoodieLockException("Unexpected lock update result: " + (Object)((Object)currentLock.getLeft()));
        }
        catch (Exception e) {
            this.logger.error("Owner {}: Exception occurred while renewing lock", (Object)this.ownerId, (Object)e);
            return false;
        }
    }

    protected boolean isCurrentTimeCertainlyOlderThanDistributedTime(long epoch) {
        return System.currentTimeMillis() > epoch + 500L;
    }

    private String generateLockStateMessage(LockState state) {
        String threadName = Thread.currentThread().getName();
        return String.format("Owner %s: Lock file path %s, Thread %s, Storage based lock state %s", this.ownerId, this.lockFilePath, threadName, state.toString());
    }

    private void logDebugLockState(LockState state) {
        this.logger.debug(LOCK_STATE_LOGGER_MSG, new Object[]{this.ownerId, this.lockFilePath, Thread.currentThread(), state});
    }

    private void logInfoLockState(LockState state) {
        this.logger.info(LOCK_STATE_LOGGER_MSG, new Object[]{this.ownerId, this.lockFilePath, Thread.currentThread(), state});
    }

    private void logInfoLockState(LockState state, String msg) {
        this.logger.info(LOCK_STATE_LOGGER_MSG_WITH_INFO, new Object[]{this.ownerId, this.lockFilePath, Thread.currentThread(), state, msg});
    }

    private void logWarnLockState(LockState state, String msg) {
        this.logger.warn(LOCK_STATE_LOGGER_MSG_WITH_INFO, new Object[]{this.ownerId, this.lockFilePath, Thread.currentThread(), state, msg});
    }

    private void logErrorLockState(LockState state, String msg) {
        this.logger.error(LOCK_STATE_LOGGER_MSG_WITH_INFO, new Object[]{this.ownerId, this.lockFilePath, Thread.currentThread(), state, msg});
    }
}

