package org.apache.flink.runtime.blob;

import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.FileUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/blob/BlobCache.class */
public final class BlobCache implements BlobService {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) BlobCache.class);
    private final InetSocketAddress serverAddress;
    private final File storageDir;
    private final AtomicBoolean shutdownRequested = new AtomicBoolean();
    private final Thread shutdownHook;
    private final int numFetchRetries;

    public BlobCache(InetSocketAddress inetSocketAddress, Configuration configuration) {
        if (inetSocketAddress == null || configuration == null) {
            throw new NullPointerException();
        }
        this.serverAddress = inetSocketAddress;
        this.storageDir = BlobUtils.initStorageDirectory(configuration.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null));
        LOG.info("Created BLOB cache storage directory " + this.storageDir);
        int integer = configuration.getInteger(ConfigConstants.BLOB_FETCH_RETRIES_KEY, 5);
        if (integer >= 0) {
            this.numFetchRetries = integer;
        } else {
            LOG.warn("Invalid value for {}. System will attempt no retires on failed fetches of BLOBs.", ConfigConstants.BLOB_FETCH_RETRIES_KEY);
            this.numFetchRetries = 0;
        }
        this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
    }

    @Override // org.apache.flink.runtime.blob.BlobService
    public URL getURL(BlobKey blobKey) throws IOException {
        if (blobKey == null) {
            throw new IllegalArgumentException("BLOB key cannot be null.");
        }
        File storageLocation = BlobUtils.getStorageLocation(this.storageDir, blobKey);
        if (!storageLocation.exists()) {
            byte[] bArr = new byte[65536];
            int i = 0;
            while (true) {
                if (i == 0) {
                    LOG.info("Downloading {} from {}", blobKey, this.serverAddress);
                } else {
                    LOG.info("Downloading {} from {} (retry {})", blobKey, this.serverAddress, Integer.valueOf(i));
                }
                Closeable closeable = null;
                Closeable closeable2 = null;
                Closeable closeable3 = null;
                try {
                    try {
                        BlobClient blobClient = new BlobClient(this.serverAddress);
                        InputStream inputStream = blobClient.get(blobKey);
                        FileOutputStream fileOutputStream = new FileOutputStream(storageLocation);
                        while (true) {
                            int read = inputStream.read(bArr);
                            if (read < 0) {
                                break;
                            }
                            fileOutputStream.write(bArr, 0, read);
                        }
                        fileOutputStream.close();
                        closeable3 = null;
                        inputStream.close();
                        closeable2 = null;
                        blobClient.close();
                        closeable = null;
                        break;
                    } catch (Throwable th) {
                        closeSilently(closeable3);
                        closeSilently(closeable2);
                        closeSilently(closeable);
                        if (th instanceof IOException) {
                            throw ((IOException) th);
                        }
                        throw new IOException(th.getMessage(), th);
                    }
                } catch (IOException e) {
                    String str = "Failed to fetch BLOB " + blobKey + " from " + this.serverAddress + " and store it under " + storageLocation.getAbsolutePath();
                    if (i >= this.numFetchRetries) {
                        LOG.error(str + " No retries left.", (Throwable) e);
                        throw new IOException(str, e);
                    }
                    i++;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(str + " Retrying...", (Throwable) e);
                    } else {
                        LOG.error(str + " Retrying...");
                    }
                }
            }
        }
        return storageLocation.toURI().toURL();
    }

    @Override // org.apache.flink.runtime.blob.BlobService
    public void delete(BlobKey blobKey) throws IOException {
        File storageLocation = BlobUtils.getStorageLocation(this.storageDir, blobKey);
        if (!storageLocation.exists() || storageLocation.delete()) {
            return;
        }
        LOG.warn("Failed to delete locally cached BLOB " + blobKey + " at " + storageLocation.getAbsolutePath());
    }

    public void deleteGlobal(BlobKey blobKey) throws IOException {
        BlobClient createClient = createClient();
        try {
            delete(blobKey);
            createClient.delete(blobKey);
        } finally {
            createClient.close();
        }
    }

    @Override // org.apache.flink.runtime.blob.BlobService
    public int getPort() {
        return this.serverAddress.getPort();
    }

    @Override // org.apache.flink.runtime.blob.BlobService
    public void shutdown() {
        if (this.shutdownRequested.compareAndSet(false, true)) {
            LOG.info("Shutting down BlobCache");
            try {
                FileUtils.deleteDirectory(this.storageDir);
            } catch (IOException e) {
                LOG.error("BLOB cache failed to properly clean up its storage directory.");
            }
            if (this.shutdownHook == null || this.shutdownHook == Thread.currentThread()) {
                return;
            }
            try {
                Runtime.getRuntime().removeShutdownHook(this.shutdownHook);
            } catch (IllegalStateException e2) {
            } catch (Throwable th) {
                LOG.warn("Exception while unregistering BLOB cache's cleanup shutdown hook.");
            }
        }
    }

    @Override // org.apache.flink.runtime.blob.BlobService
    public BlobClient createClient() throws IOException {
        return new BlobClient(this.serverAddress);
    }

    public File getStorageDir() {
        return this.storageDir;
    }

    private void closeSilently(Closeable closeable) {
        if (closeable != null) {
            try {
                closeable.close();
            } catch (Throwable th) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Error while closing resource after BLOB transfer.", th);
                }
            }
        }
    }
}
