package com.facebook.presto.hive;

import com.facebook.airlift.concurrent.MoreFutures;
import com.facebook.airlift.log.Logger;
import com.facebook.airlift.stats.CounterStat;
import com.facebook.presto.hive.InternalHiveSplit;
import com.facebook.presto.hive.util.AsyncQueue;
import com.facebook.presto.hive.util.SizeBasedSplitWeightProvider;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.connector.ConnectorPartitionHandle;
import com.facebook.presto.spi.connector.NotPartitionedPartitionHandle;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.DataSize;
import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import javax.annotation.concurrent.GuardedBy;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/facebook/presto/hive/HiveSplitSource.class */
public class HiveSplitSource implements ConnectorSplitSource {
    private static final Logger log = Logger.get((Class<?>) HiveSplit.class);
    private final String queryId;
    private final String databaseName;
    private final String tableName;
    private final CacheQuotaRequirement cacheQuotaRequirement;
    private final PerBucket queues;
    private final long maxOutstandingSplitsBytes;
    private final DataSize maxSplitSize;
    private final DataSize maxInitialSplitSize;
    private final boolean useRewindableSplitSource;
    private final AtomicInteger remainingInitialSplits;
    private final HiveSplitLoader splitLoader;
    private final CounterStat highMemorySplitSourceCounter;
    private final HiveSplitWeightProvider splitWeightProvider;
    private final AtomicInteger bufferedInternalSplitCount = new AtomicInteger();
    private final AtomicReference<State> stateReference = new AtomicReference<>(State.initial());
    private final AtomicLong estimatedSplitSizeInBytes = new AtomicLong();
    private final AtomicBoolean loggedHighMemoryWarning = new AtomicBoolean();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/facebook/presto/hive/HiveSplitSource$PerBucket.class */
    public interface PerBucket {
        ListenableFuture<?> offer(OptionalInt optionalInt, InternalHiveSplit internalHiveSplit);

        ListenableFuture<List<ConnectorSplit>> borrowBatchAsync(OptionalInt optionalInt, int i, Function<List<InternalHiveSplit>, AsyncQueue.BorrowResult<InternalHiveSplit, List<ConnectorSplit>>> function);

        void noMoreSplits();

        boolean isFinished(OptionalInt optionalInt);

        int rewind(OptionalInt optionalInt);

        default int decrementAndGetPartitionReferences(InternalHiveSplit internalHiveSplit) {
            return internalHiveSplit.getPartitionInfo().decrementAndGetReferences();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/facebook/presto/hive/HiveSplitSource$State.class */
    public static class State {
        private final StateKind kind;
        private final Throwable throwable;

        private State(StateKind stateKind, Throwable th) {
            this.kind = stateKind;
            this.throwable = th;
        }

        public StateKind getKind() {
            return this.kind;
        }

        public Throwable getThrowable() {
            Preconditions.checkState(this.throwable != null);
            return this.throwable;
        }

        public static State initial() {
            return new State(StateKind.INITIAL, null);
        }

        public static State noMoreSplits() {
            return new State(StateKind.NO_MORE_SPLITS, null);
        }

        public static State failed(Throwable th) {
            return new State(StateKind.FAILED, th);
        }

        public static State closed() {
            return new State(StateKind.CLOSED, null);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/facebook/presto/hive/HiveSplitSource$StateKind.class */
    public enum StateKind {
        INITIAL,
        NO_MORE_SPLITS,
        FAILED,
        CLOSED
    }

    private HiveSplitSource(ConnectorSession connectorSession, String str, String str2, CacheQuotaRequirement cacheQuotaRequirement, PerBucket perBucket, int i, DataSize dataSize, HiveSplitLoader hiveSplitLoader, CounterStat counterStat, boolean z) {
        Objects.requireNonNull(connectorSession, "session is null");
        this.queryId = connectorSession.getQueryId();
        this.databaseName = (String) Objects.requireNonNull(str, "databaseName is null");
        this.cacheQuotaRequirement = (CacheQuotaRequirement) Objects.requireNonNull(cacheQuotaRequirement, "cacheQuotaRequirement is null");
        this.tableName = (String) Objects.requireNonNull(str2, "tableName is null");
        this.queues = (PerBucket) Objects.requireNonNull(perBucket, "queues is null");
        this.maxOutstandingSplitsBytes = ((DataSize) Objects.requireNonNull(dataSize, "maxOutstandingSplitsSize is null")).toBytes();
        this.splitLoader = (HiveSplitLoader) Objects.requireNonNull(hiveSplitLoader, "splitLoader is null");
        this.highMemorySplitSourceCounter = (CounterStat) Objects.requireNonNull(counterStat, "highMemorySplitSourceCounter is null");
        this.maxSplitSize = HiveSessionProperties.getMaxSplitSize(connectorSession);
        this.maxInitialSplitSize = HiveSessionProperties.getMaxInitialSplitSize(connectorSession);
        this.useRewindableSplitSource = z;
        this.remainingInitialSplits = new AtomicInteger(i);
        this.splitWeightProvider = HiveSessionProperties.isSizeBasedSplitWeightsEnabled(connectorSession) ? new SizeBasedSplitWeightProvider(HiveSessionProperties.getMinimumAssignedSplitWeight(connectorSession), this.maxSplitSize) : HiveSplitWeightProvider.uniformStandardWeightProvider();
    }

    public static HiveSplitSource allAtOnce(ConnectorSession connectorSession, String str, String str2, CacheQuotaRequirement cacheQuotaRequirement, int i, final int i2, DataSize dataSize, HiveSplitLoader hiveSplitLoader, final Executor executor, CounterStat counterStat) {
        return new HiveSplitSource(connectorSession, str, str2, cacheQuotaRequirement, new PerBucket() { // from class: com.facebook.presto.hive.HiveSplitSource.1
            private final AsyncQueue<InternalHiveSplit> queue;

            {
                this.queue = new AsyncQueue<>(i2, executor);
            }

            @Override // com.facebook.presto.hive.HiveSplitSource.PerBucket
            public ListenableFuture<?> offer(OptionalInt optionalInt, InternalHiveSplit internalHiveSplit) {
                return this.queue.offer(internalHiveSplit);
            }

            @Override // com.facebook.presto.hive.HiveSplitSource.PerBucket
            public ListenableFuture<List<ConnectorSplit>> borrowBatchAsync(OptionalInt optionalInt, int i3, Function<List<InternalHiveSplit>, AsyncQueue.BorrowResult<InternalHiveSplit, List<ConnectorSplit>>> function) {
                Preconditions.checkArgument(!optionalInt.isPresent());
                return this.queue.borrowBatchAsync(i3, function);
            }

            @Override // com.facebook.presto.hive.HiveSplitSource.PerBucket
            public void noMoreSplits() {
                this.queue.finish();
            }

            @Override // com.facebook.presto.hive.HiveSplitSource.PerBucket
            public boolean isFinished(OptionalInt optionalInt) {
                Preconditions.checkArgument(!optionalInt.isPresent());
                return this.queue.isFinished();
            }

            @Override // com.facebook.presto.hive.HiveSplitSource.PerBucket
            public int rewind(OptionalInt optionalInt) {
                throw new UnsupportedOperationException("rewind is not supported for non bucketed split source");
            }
        }, i, dataSize, hiveSplitLoader, counterStat, false);
    }

    public static HiveSplitSource bucketed(ConnectorSession connectorSession, String str, String str2, CacheQuotaRequirement cacheQuotaRequirement, int i, final int i2, DataSize dataSize, HiveSplitLoader hiveSplitLoader, final Executor executor, CounterStat counterStat) {
        return new HiveSplitSource(connectorSession, str, str2, cacheQuotaRequirement, new PerBucket() { // from class: com.facebook.presto.hive.HiveSplitSource.2
            private final Map<Integer, AsyncQueue<InternalHiveSplit>> queues = new ConcurrentHashMap();
            private final AtomicBoolean finished = new AtomicBoolean();

            @Override // com.facebook.presto.hive.HiveSplitSource.PerBucket
            public ListenableFuture<?> offer(OptionalInt optionalInt, InternalHiveSplit internalHiveSplit) {
                queueFor(optionalInt).offer(internalHiveSplit);
                return Futures.immediateFuture(null);
            }

            /* JADX WARN: Multi-variable type inference failed */
            @Override // com.facebook.presto.hive.HiveSplitSource.PerBucket
            public ListenableFuture<List<ConnectorSplit>> borrowBatchAsync(OptionalInt optionalInt, int i3, Function<List<InternalHiveSplit>, AsyncQueue.BorrowResult<InternalHiveSplit, List<ConnectorSplit>>> function) {
                return queueFor(optionalInt).borrowBatchAsync(i3, function);
            }

            @Override // com.facebook.presto.hive.HiveSplitSource.PerBucket
            public void noMoreSplits() {
                if (this.finished.compareAndSet(false, true)) {
                    this.queues.values().forEach((v0) -> {
                        v0.finish();
                    });
                }
            }

            @Override // com.facebook.presto.hive.HiveSplitSource.PerBucket
            public boolean isFinished(OptionalInt optionalInt) {
                return queueFor(optionalInt).isFinished();
            }

            @Override // com.facebook.presto.hive.HiveSplitSource.PerBucket
            public int rewind(OptionalInt optionalInt) {
                throw new UnsupportedOperationException("rewind is not supported for unrewindable split source");
            }

            private AsyncQueue<InternalHiveSplit> queueFor(OptionalInt optionalInt) {
                Preconditions.checkArgument(optionalInt.isPresent());
                AtomicBoolean atomicBoolean = new AtomicBoolean();
                Map<Integer, AsyncQueue<InternalHiveSplit>> map = this.queues;
                Integer valueOf = Integer.valueOf(optionalInt.getAsInt());
                int i3 = i2;
                Executor executor2 = executor;
                AsyncQueue<InternalHiveSplit> computeIfAbsent = map.computeIfAbsent(valueOf, num -> {
                    atomicBoolean.set(true);
                    return new AsyncQueue(i3, executor2);
                });
                if (atomicBoolean.get() && this.finished.get()) {
                    computeIfAbsent.finish();
                }
                return computeIfAbsent;
            }
        }, i, dataSize, hiveSplitLoader, counterStat, false);
    }

    public static HiveSplitSource bucketedRewindable(ConnectorSession connectorSession, String str, String str2, CacheQuotaRequirement cacheQuotaRequirement, int i, DataSize dataSize, HiveSplitLoader hiveSplitLoader, final Executor executor, CounterStat counterStat) {
        return new HiveSplitSource(connectorSession, str, str2, cacheQuotaRequirement, new PerBucket() { // from class: com.facebook.presto.hive.HiveSplitSource.3

            @GuardedBy("this")
            private final Map<Integer, List<InternalHiveSplit>> splits = new HashMap();
            private final SettableFuture<?> allSplitLoaded = SettableFuture.create();

            @Override // com.facebook.presto.hive.HiveSplitSource.PerBucket
            public synchronized ListenableFuture<?> offer(OptionalInt optionalInt, InternalHiveSplit internalHiveSplit) {
                Preconditions.checkArgument(optionalInt.isPresent(), "bucketNumber must be present");
                this.splits.computeIfAbsent(Integer.valueOf(optionalInt.getAsInt()), num -> {
                    return new ArrayList();
                }).add(internalHiveSplit);
                return Futures.immediateFuture(null);
            }

            @Override // com.facebook.presto.hive.HiveSplitSource.PerBucket
            public synchronized ListenableFuture<List<ConnectorSplit>> borrowBatchAsync(OptionalInt optionalInt, int i2, Function<List<InternalHiveSplit>, AsyncQueue.BorrowResult<InternalHiveSplit, List<ConnectorSplit>>> function) {
                Preconditions.checkArgument(optionalInt.isPresent(), "bucketNumber must be present");
                return !this.allSplitLoaded.isDone() ? this.allSplitLoaded.transform(obj -> {
                    return ImmutableList.of();
                }, executor) : Futures.immediateFuture(function.apply(getSplits(optionalInt.getAsInt(), i2)).getResult());
            }

            private List<InternalHiveSplit> getSplits(int i2, int i3) {
                return (List) this.splits.getOrDefault(Integer.valueOf(i2), ImmutableList.of()).stream().filter(internalHiveSplit -> {
                    return !internalHiveSplit.isDone();
                }).limit(i3).collect(ImmutableList.toImmutableList());
            }

            @Override // com.facebook.presto.hive.HiveSplitSource.PerBucket
            public void noMoreSplits() {
                this.allSplitLoaded.set(null);
            }

            @Override // com.facebook.presto.hive.HiveSplitSource.PerBucket
            public boolean isFinished(OptionalInt optionalInt) {
                Preconditions.checkArgument(optionalInt.isPresent(), "bucketNumber must be present");
                return this.allSplitLoaded.isDone() && (!this.splits.containsKey(Integer.valueOf(optionalInt.getAsInt())) || this.splits.get(Integer.valueOf(optionalInt.getAsInt())).stream().allMatch((v0) -> {
                    return v0.isDone();
                }));
            }

            @Override // com.facebook.presto.hive.HiveSplitSource.PerBucket
            public synchronized int rewind(OptionalInt optionalInt) {
                Preconditions.checkArgument(optionalInt.isPresent(), "bucketNumber must be present");
                Preconditions.checkState(this.allSplitLoaded.isDone(), "splits cannot be rewound before splits enumeration is finished");
                int i2 = 0;
                for (InternalHiveSplit internalHiveSplit : this.splits.getOrDefault(Integer.valueOf(optionalInt.getAsInt()), ImmutableList.of())) {
                    if (internalHiveSplit.isDone()) {
                        i2++;
                    }
                    internalHiveSplit.reset();
                }
                return i2;
            }

            @Override // com.facebook.presto.hive.HiveSplitSource.PerBucket
            public int decrementAndGetPartitionReferences(InternalHiveSplit internalHiveSplit) {
                throw new UnsupportedOperationException("decrementPartitionReferences is not supported for rewindable split sources");
            }
        }, i, dataSize, hiveSplitLoader, counterStat, true);
    }

    @VisibleForTesting
    int getBufferedInternalSplitCount() {
        return this.bufferedInternalSplitCount.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ListenableFuture<?> addToQueue(List<? extends InternalHiveSplit> list) {
        ListenableFuture<?> immediateFuture = Futures.immediateFuture(null);
        Iterator<? extends InternalHiveSplit> it = list.iterator();
        while (it.hasNext()) {
            immediateFuture = addToQueue(it.next());
        }
        return immediateFuture;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ListenableFuture<?> addToQueue(InternalHiveSplit internalHiveSplit) {
        if (this.stateReference.get().getKind() != StateKind.INITIAL) {
            return Futures.immediateFuture(null);
        }
        if (internalHiveSplit.getPartitionInfo().incrementAndGetReferences() == 1) {
            this.estimatedSplitSizeInBytes.addAndGet(internalHiveSplit.getPartitionInfo().getEstimatedSizeInBytes());
        }
        if (this.estimatedSplitSizeInBytes.addAndGet(internalHiveSplit.getEstimatedSizeInBytes()) <= this.maxOutstandingSplitsBytes) {
            this.bufferedInternalSplitCount.incrementAndGet();
            return this.queues.offer(internalHiveSplit.getReadBucketNumber(), internalHiveSplit);
        }
        if (this.loggedHighMemoryWarning.compareAndSet(false, true)) {
            this.highMemorySplitSourceCounter.update(1L);
            log.warn("Split buffering for %s.%s in query %s exceeded memory limit (%s). %s splits are buffered.", this.databaseName, this.tableName, this.queryId, DataSize.succinctBytes(this.maxOutstandingSplitsBytes), Integer.valueOf(getBufferedInternalSplitCount()));
        }
        throw new PrestoException(HiveErrorCode.HIVE_EXCEEDED_SPLIT_BUFFERING_LIMIT, String.format("Split buffering for %s.%s exceeded memory limit (%s). %s splits are buffered.", this.databaseName, this.tableName, DataSize.succinctBytes(this.maxOutstandingSplitsBytes), Integer.valueOf(getBufferedInternalSplitCount())));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void noMoreSplits() {
        if (setIf(this.stateReference, State.noMoreSplits(), state -> {
            return state.getKind() == StateKind.INITIAL;
        })) {
            this.splitLoader.stop();
            this.queues.noMoreSplits();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void fail(Throwable th) {
        if (setIf(this.stateReference, State.failed(th), state -> {
            return state.getKind() == StateKind.INITIAL;
        })) {
            this.splitLoader.stop();
            this.queues.noMoreSplits();
        }
    }

    public CompletableFuture<ConnectorSplitSource.ConnectorSplitBatch> getNextBatch(ConnectorPartitionHandle connectorPartitionHandle, int i) {
        boolean z;
        State state = this.stateReference.get();
        switch (state.getKind()) {
            case INITIAL:
                z = false;
                break;
            case NO_MORE_SPLITS:
                z = true;
                break;
            case FAILED:
                return MoreFutures.failedFuture(state.getThrowable());
            case CLOSED:
                throw new IllegalStateException("HiveSplitSource is already closed");
            default:
                throw new UnsupportedOperationException();
        }
        OptionalInt bucketNumber = toBucketNumber(connectorPartitionHandle);
        boolean z2 = z;
        return MoreFutures.toCompletableFuture(Futures.transform(this.queues.borrowBatchAsync(bucketNumber, i, list -> {
            long end;
            ImmutableList.Builder builder = ImmutableList.builder();
            ImmutableList.Builder builder2 = ImmutableList.builder();
            int i2 = 0;
            Iterator it = list.iterator();
            while (it.hasNext()) {
                InternalHiveSplit internalHiveSplit = (InternalHiveSplit) it.next();
                long bytes = this.maxSplitSize.toBytes();
                if (this.remainingInitialSplits.get() > 0 && this.remainingInitialSplits.getAndDecrement() > 0) {
                    bytes = this.maxInitialSplitSize.toBytes();
                }
                InternalHiveSplit.InternalHiveBlock currentBlock = internalHiveSplit.currentBlock();
                if (internalHiveSplit.isSplittable()) {
                    long end2 = currentBlock.getEnd() - internalHiveSplit.getStart();
                    end = end2 <= bytes ? end2 : bytes * 2 >= end2 ? end2 / 2 : bytes;
                } else {
                    end = internalHiveSplit.getEnd() - internalHiveSplit.getStart();
                }
                builder2.add((ImmutableList.Builder) new HiveSplit(this.databaseName, this.tableName, internalHiveSplit.getPartitionName(), internalHiveSplit.getPath(), internalHiveSplit.getStart(), end, internalHiveSplit.getFileSize(), internalHiveSplit.getFileModifiedTime(), internalHiveSplit.getPartitionInfo().getStorage(), internalHiveSplit.getPartitionKeys(), currentBlock.getAddresses(), internalHiveSplit.getReadBucketNumber(), internalHiveSplit.getTableBucketNumber(), internalHiveSplit.getNodeSelectionStrategy(), internalHiveSplit.getPartitionInfo().getPartitionDataColumnCount(), internalHiveSplit.getTableToPartitionMapping(), internalHiveSplit.getBucketConversion(), internalHiveSplit.isS3SelectPushdownEnabled(), internalHiveSplit.getExtraFileInfo(), this.cacheQuotaRequirement, internalHiveSplit.getEncryptionInformation(), internalHiveSplit.getCustomSplitInfo(), internalHiveSplit.getPartitionInfo().getRedundantColumnDomains(), this.splitWeightProvider.weightForSplitSizeInBytes(end)));
                internalHiveSplit.increaseStart(end);
                if (internalHiveSplit.isDone()) {
                    i2 += internalHiveSplit.getEstimatedSizeInBytes();
                    if (!this.useRewindableSplitSource && this.queues.decrementAndGetPartitionReferences(internalHiveSplit) == 0) {
                        i2 += internalHiveSplit.getPartitionInfo().getEstimatedSizeInBytes();
                    }
                } else {
                    builder.add((ImmutableList.Builder) internalHiveSplit);
                }
            }
            if (!this.useRewindableSplitSource) {
                this.estimatedSplitSizeInBytes.addAndGet(-i2);
            }
            ImmutableList build = builder.build();
            ImmutableList build2 = builder2.build();
            this.bufferedInternalSplitCount.addAndGet(build.size() - build2.size());
            return new AsyncQueue.BorrowResult(build, build2);
        }), list2 -> {
            Objects.requireNonNull(list2, "splits is null");
            if (z2) {
                return new ConnectorSplitSource.ConnectorSplitBatch(list2, list2.isEmpty() && this.queues.isFinished(bucketNumber));
            }
            return new ConnectorSplitSource.ConnectorSplitBatch(list2, false);
        }, MoreExecutors.directExecutor()));
    }

    public void rewind(ConnectorPartitionHandle connectorPartitionHandle) {
        this.bufferedInternalSplitCount.addAndGet(this.queues.rewind(toBucketNumber(connectorPartitionHandle)));
    }

    public boolean isFinished() {
        State state = this.stateReference.get();
        switch (state.getKind()) {
            case INITIAL:
                return false;
            case NO_MORE_SPLITS:
                return this.bufferedInternalSplitCount.get() == 0;
            case FAILED:
                throw propagatePrestoException(state.getThrowable());
            case CLOSED:
                throw new IllegalStateException("HiveSplitSource is already closed");
            default:
                throw new UnsupportedOperationException();
        }
    }

    public void close() {
        if (setIf(this.stateReference, State.closed(), state -> {
            return state.getKind() == StateKind.INITIAL || state.getKind() == StateKind.NO_MORE_SPLITS;
        })) {
            this.splitLoader.stop();
            this.queues.noMoreSplits();
        }
    }

    private static OptionalInt toBucketNumber(ConnectorPartitionHandle connectorPartitionHandle) {
        return connectorPartitionHandle == NotPartitionedPartitionHandle.NOT_PARTITIONED ? OptionalInt.empty() : OptionalInt.of(((HivePartitionHandle) connectorPartitionHandle).getBucket());
    }

    private static <T> boolean setIf(AtomicReference<T> atomicReference, T t, Predicate<T> predicate) {
        T t2;
        do {
            t2 = atomicReference.get();
            if (!predicate.test(t2)) {
                return false;
            }
        } while (!atomicReference.compareAndSet(t2, t));
        return true;
    }

    private static RuntimeException propagatePrestoException(Throwable th) {
        if (th instanceof PrestoException) {
            throw ((PrestoException) th);
        }
        if (th instanceof FileNotFoundException) {
            throw new PrestoException(HiveErrorCode.HIVE_FILE_NOT_FOUND, th);
        }
        throw new PrestoException(HiveErrorCode.HIVE_UNKNOWN_ERROR, th);
    }
}
