/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.fs.azure;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.azure.AzureFileSystemThreadTask;
import org.apache.hadoop.fs.azure.FileMetadata;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class AzureFileSystemThreadPoolExecutor {
    public static final Logger LOG = LoggerFactory.getLogger(AzureFileSystemThreadPoolExecutor.class);
    private int threadCount;
    private String threadNamePrefix;
    private String operation;
    private String key;
    private String config;

    public AzureFileSystemThreadPoolExecutor(int threadCount, String threadNamePrefix, String operation, String key, String config) {
        this.threadCount = threadCount;
        this.threadNamePrefix = threadNamePrefix;
        this.operation = operation;
        this.key = key;
        this.config = config;
    }

    @VisibleForTesting
    ThreadPoolExecutor getThreadPool(int threadCount) throws Exception {
        return new ThreadPoolExecutor(threadCount, threadCount, 2L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new AzureFileSystemThreadFactory(this.threadNamePrefix));
    }

    boolean executeParallel(FileMetadata[] contents, AzureFileSystemThreadTask threadOperation) throws IOException {
        boolean operationStatus = false;
        boolean threadsEnabled = false;
        int threadCount = this.threadCount;
        ThreadPoolExecutor ioThreadPool = null;
        long start = Time.monotonicNow();
        if ((threadCount = Math.min(contents.length, threadCount)) > 1) {
            try {
                ioThreadPool = this.getThreadPool(threadCount);
                threadsEnabled = true;
            }
            catch (Exception e) {
                LOG.warn("Failed to create thread pool with threads {} for operation {} on blob {}. Use config {} to set less number of threads. Setting config value to <= 1 will disable threads.", new Object[]{threadCount, this.operation, this.key, this.config});
            }
        } else {
            LOG.warn("Disabling threads for {} operation as thread count {} is <= 1", (Object)this.operation, (Object)threadCount);
        }
        if (threadsEnabled) {
            LOG.debug("Using thread pool for {} operation with threads {}", (Object)this.operation, (Object)threadCount);
            boolean started = false;
            AzureFileSystemThreadRunnable runnable = new AzureFileSystemThreadRunnable(contents, threadOperation, this.operation);
            for (int i = 0; i < threadCount && runnable.lastException == null && runnable.operationStatus; ++i) {
                try {
                    ioThreadPool.execute(runnable);
                    started = true;
                    continue;
                }
                catch (RejectedExecutionException ex) {
                    LOG.error("Rejected execution of thread for {} operation on blob {}. Continuing with existing threads. Use config {} to set less number of threads to avoid this error", new Object[]{this.operation, this.key, this.config});
                }
            }
            ioThreadPool.shutdown();
            try {
                ioThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
            }
            catch (InterruptedException intrEx) {
                ioThreadPool.shutdownNow();
                Thread.currentThread().interrupt();
                LOG.error("Threads got interrupted {} blob operation for {} ", (Object)this.operation, (Object)this.key);
            }
            int threadsNotUsed = threadCount - runnable.threadsUsed.get();
            if (threadsNotUsed > 0) {
                LOG.warn("{} threads not used for {} operation on blob {}", new Object[]{threadsNotUsed, this.operation, this.key});
            }
            if (!started) {
                threadsEnabled = false;
                LOG.info("Not able to schedule threads to {} blob {}. Fall back to {} blob serially.", new Object[]{this.operation, this.key, this.operation});
            } else {
                IOException lastException = runnable.lastException;
                if (lastException == null && runnable.operationStatus && runnable.filesProcessed.get() < contents.length) {
                    LOG.error("{} failed as operation on subfolders and files failed.", (Object)this.operation);
                    lastException = new IOException(this.operation + " failed as operation on subfolders and files failed.");
                }
                if (lastException != null) {
                    throw lastException;
                }
                operationStatus = runnable.operationStatus;
            }
        }
        if (!threadsEnabled) {
            LOG.debug("Serializing the {} operation", (Object)this.operation);
            for (int i = 0; i < contents.length; ++i) {
                if (threadOperation.execute(contents[i])) continue;
                LOG.warn("Failed to {} file {}", (Object)this.operation, (Object)contents[i]);
                return false;
            }
            operationStatus = true;
        }
        long end = Time.monotonicNow();
        LOG.info("Time taken for {} operation is: {} ms with threads: {}", new Object[]{this.operation, end - start, threadCount});
        return operationStatus;
    }

    static class AzureFileSystemThreadRunnable
    implements Runnable {
        private volatile IOException lastException = null;
        private volatile boolean operationStatus = true;
        private AtomicInteger fileIndex = new AtomicInteger(0);
        private AtomicInteger filesProcessed = new AtomicInteger(0);
        private AtomicInteger threadsUsed = new AtomicInteger(0);
        private String operation = "Unknown";
        private final FileMetadata[] files;
        private AzureFileSystemThreadTask task;

        public AzureFileSystemThreadRunnable(FileMetadata[] files, AzureFileSystemThreadTask task, String operation) {
            this.operation = operation;
            this.files = files;
            this.task = task;
        }

        @Override
        public void run() {
            int currentIndex;
            long start = Time.monotonicNow();
            int processedFilesCount = 0;
            while ((currentIndex = this.fileIndex.getAndIncrement()) < this.files.length) {
                ++processedFilesCount;
                FileMetadata file = this.files[currentIndex];
                try {
                    if (!this.task.execute(file)) {
                        LOG.error("{} operation failed for file {}", (Object)this.operation, (Object)file.getKey());
                        this.operationStatus = false;
                    } else {
                        this.filesProcessed.getAndIncrement();
                    }
                }
                catch (Exception e) {
                    LOG.error("Encountered Exception for {} operation for file {}", (Object)this.operation, (Object)file.getKey());
                    this.lastException = new IOException("Encountered Exception for " + this.operation + " operation for file " + file.getKey(), e);
                }
                if (this.lastException == null && this.operationStatus) continue;
                LOG.warn("Terminating execution of {} operation now as some other thread already got exception or operation failed", (Object)this.operation, (Object)file.getKey());
                break;
            }
            long end = Time.monotonicNow();
            LOG.debug("Time taken to process {} files count for {} operation: {} ms", new Object[]{processedFilesCount, this.operation, end - start});
            if (processedFilesCount > 0) {
                this.threadsUsed.getAndIncrement();
            }
        }
    }

    static class AzureFileSystemThreadFactory
    implements ThreadFactory {
        private String threadIdPrefix = "AzureFileSystemThread";
        private AtomicInteger threadSequenceNumber = new AtomicInteger(0);

        public AzureFileSystemThreadFactory(String prefix) {
            this.threadIdPrefix = prefix;
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName(String.format("%s-%s-%d", this.threadIdPrefix, Thread.currentThread().getName(), this.threadSequenceNumber.getAndIncrement()));
            return t;
        }
    }
}

