package org.apache.flink.table.runtime.hashtable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.compression.BlockCompressionFactory;
import org.apache.flink.runtime.io.disk.iomanager.AbstractChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.runtime.util.FileChannelUtil;
import org.apache.flink.table.runtime.util.LazyMemorySegmentPool;
import org.apache.flink.table.runtime.util.MemorySegmentPool;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/hashtable/BaseHybridHashTable.class */
public abstract class BaseHybridHashTable implements MemorySegmentPool {
    protected static final Logger LOG = LoggerFactory.getLogger(BaseHybridHashTable.class);
    protected static final int MAX_RECURSION_DEPTH = 3;
    protected static final int MAX_NUM_PARTITIONS = 127;
    private static final int MIN_NUM_MEMORY_SEGMENTS = 33;
    protected final int initPartitionFanOut;
    private final int avgRecordLen;
    protected final long buildRowCount;
    protected final int totalNumBuffers;
    protected final LazyMemorySegmentPool internalPool;
    protected final IOManager ioManager;
    protected final int segmentSize;
    protected final LinkedBlockingQueue<MemorySegment> buildSpillReturnBuffers;
    public final int segmentSizeBits;
    public final int segmentSizeMask;
    protected AtomicBoolean closed = new AtomicBoolean();
    public final boolean tryDistinctBuildRow;
    protected int currentRecursionDepth;
    protected int buildSpillRetBufferNumbers;
    protected HeaderlessChannelReaderInputView currentSpilledBuildSide;
    protected AbstractChannelReaderInputView currentSpilledProbeSide;
    protected FileIOChannel.Enumerator currentEnumerator;
    protected final boolean compressionEnable;
    protected final BlockCompressionFactory compressionCodecFactory;
    protected final int compressionBlockSize;
    protected transient long numSpillFiles;
    protected transient long spillInBytes;

    public BaseHybridHashTable(Configuration configuration, Object obj, MemoryManager memoryManager, long j, IOManager iOManager, int i, long j2, boolean z) {
        this.compressionEnable = configuration.getBoolean(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED);
        this.compressionCodecFactory = this.compressionEnable ? BlockCompressionFactory.createBlockCompressionFactory(BlockCompressionFactory.CompressionFactoryName.LZ4.toString()) : null;
        this.compressionBlockSize = (int) ((MemorySize) configuration.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)).getBytes();
        this.avgRecordLen = i;
        this.buildRowCount = j2;
        this.tryDistinctBuildRow = z;
        this.totalNumBuffers = (int) (j / memoryManager.getPageSize());
        Preconditions.checkArgument(this.totalNumBuffers >= MIN_NUM_MEMORY_SEGMENTS);
        this.internalPool = new LazyMemorySegmentPool(obj, memoryManager, this.totalNumBuffers);
        this.ioManager = iOManager;
        this.segmentSize = memoryManager.getPageSize();
        Preconditions.checkArgument(MathUtils.isPowerOf2(this.segmentSize));
        this.buildSpillReturnBuffers = new LinkedBlockingQueue<>();
        this.segmentSizeBits = MathUtils.log2strict(this.segmentSize);
        this.segmentSizeMask = this.segmentSize - 1;
        this.currentRecursionDepth = 0;
        this.initPartitionFanOut = Math.min(getPartitioningFanOutNoEstimates(), maxNumPartition());
        this.closed.set(false);
        LOG.info(String.format("Initialize hash table with %d memory segments, each size [%d], the memory %d MB.", Integer.valueOf(this.totalNumBuffers), Integer.valueOf(this.segmentSize), Long.valueOf(((this.totalNumBuffers * this.segmentSize) / 1024) / 1024)));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int maxNumPartition() {
        return (this.internalPool.freePages() + this.buildSpillRetBufferNumbers) / 2;
    }

    private int getPartitioningFanOutNoEstimates() {
        return Math.max(11, findSmallerPrime((int) Math.min((this.buildRowCount * this.avgRecordLen) / (10 * this.segmentSize), 127L)));
    }

    private static int findSmallerPrime(int i) {
        while (i > 1 && !isPrimeNumber(i)) {
            i--;
        }
        return i;
    }

    private static boolean isPrimeNumber(int i) {
        if (i == 2) {
            return true;
        }
        if (i < 2 || i % 2 == 0) {
            return false;
        }
        for (int i2 = MAX_RECURSION_DEPTH; i2 <= Math.sqrt(i); i2 += 2) {
            if (i % i2 == 0) {
                return false;
            }
        }
        return true;
    }

    public MemorySegment getNextBuffer() {
        MemorySegment poll;
        MemorySegment nextSegment = this.internalPool.nextSegment();
        if (nextSegment != null) {
            return nextSegment;
        }
        if (this.buildSpillRetBufferNumbers <= 0) {
            return null;
        }
        try {
            MemorySegment take = this.buildSpillReturnBuffers.take();
            this.buildSpillRetBufferNumbers--;
            while (this.buildSpillRetBufferNumbers > 0 && (poll = this.buildSpillReturnBuffers.poll()) != null) {
                returnPage(poll);
                this.buildSpillRetBufferNumbers--;
            }
            return take;
        } catch (InterruptedException e) {
            throw new RuntimeException("Hybrid Hash Join was interrupted while taking a buffer.");
        }
    }

    public MemorySegment[] getNextBuffers(int i) {
        MemorySegment[] memorySegmentArr = new MemorySegment[i];
        for (int i2 = 0; i2 < i; i2++) {
            MemorySegment nextBuffer = getNextBuffer();
            if (nextBuffer == null) {
                throw new RuntimeException("No enough buffers!");
            }
            memorySegmentArr[i2] = nextBuffer;
        }
        return memorySegmentArr;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MemorySegment getNotNullNextBuffer() {
        MemorySegment nextBuffer = getNextBuffer();
        if (nextBuffer == null) {
            throw new RuntimeException("Bug in HybridHashJoin: No memory became available.");
        }
        return nextBuffer;
    }

    public MemorySegment nextSegment() {
        MemorySegment nextBuffer = getNextBuffer();
        if (nextBuffer != null) {
            return nextBuffer;
        }
        try {
            spillPartition();
            MemorySegment nextBuffer2 = getNextBuffer();
            if (nextBuffer2 == null) {
                throw new RuntimeException("BUG in Hybrid Hash Join: Spilling did not free a buffer.");
            }
            return nextBuffer2;
        } catch (IOException e) {
            throw new RuntimeException("Error spilling Hash Join Partition" + (e.getMessage() == null ? "." : ": " + e.getMessage()), e);
        }
    }

    @Override // org.apache.flink.table.runtime.util.MemorySegmentPool
    public int freePages() {
        throw new UnsupportedOperationException("Contains spill memories, it is hard to estimate free pages.");
    }

    @Override // org.apache.flink.table.runtime.util.MemorySegmentPool
    public int pageSize() {
        return this.segmentSize;
    }

    @Override // org.apache.flink.table.runtime.util.MemorySegmentPool
    public void returnAll(List<MemorySegment> list) {
        for (MemorySegment memorySegment : list) {
            if (memorySegment != null) {
                returnPage(memorySegment);
            }
        }
    }

    protected abstract int spillPartition() throws IOException;

    public void ensureNumBuffersReturned(int i) {
        if (i > this.internalPool.freePages() + this.buildSpillRetBufferNumbers) {
            throw new IllegalArgumentException("More buffers requested available than totally available.");
        }
        while (this.internalPool.freePages() < i) {
            try {
                returnPage(this.buildSpillReturnBuffers.take());
                this.buildSpillRetBufferNumbers--;
            } catch (InterruptedException e) {
                throw new RuntimeException("Hash Join was interrupted.");
            }
        }
    }

    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            if (this.currentSpilledProbeSide != null) {
                try {
                    this.currentSpilledProbeSide.getChannel().closeAndDelete();
                } catch (Throwable th) {
                    LOG.warn("Could not close and delete the temp file for the current spilled partition probe side.", th);
                }
            }
            clearPartitions();
            for (int i = 0; i < this.buildSpillRetBufferNumbers; i++) {
                try {
                    returnPage(this.buildSpillReturnBuffers.take());
                } catch (InterruptedException e) {
                    throw new RuntimeException("Hashtable closing was interrupted");
                }
            }
            this.buildSpillRetBufferNumbers = 0;
        }
    }

    protected abstract void clearPartitions();

    public void free() {
        if (!this.closed.get()) {
            throw new IllegalStateException("Cannot release memory until BinaryHashTable is closed!");
        }
        freeCurrent();
    }

    public void freeCurrent() {
        this.internalPool.cleanCache();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LazyMemorySegmentPool getInternalPool() {
        return this.internalPool;
    }

    public void returnPage(MemorySegment memorySegment) {
        this.internalPool.returnPage(memorySegment);
    }

    public int remainBuffers() {
        return this.internalPool.freePages() + this.buildSpillRetBufferNumbers;
    }

    public long getUsedMemoryInBytes() {
        return (this.totalNumBuffers - this.internalPool.freePages()) * this.internalPool.pageSize();
    }

    public long getNumSpillFiles() {
        return this.numSpillFiles;
    }

    public long getSpillInBytes() {
        return this.spillInBytes;
    }

    public int maxInitBufferOfBucketArea(int i) {
        return Math.max(1, ((this.totalNumBuffers - 2) / 6) / i);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<MemorySegment> readAllBuffers(FileIOChannel.ID id, int i) throws IOException {
        ensureNumBuffersReturned(i);
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        BlockChannelReader<MemorySegment> createBlockChannelReader = FileChannelUtil.createBlockChannelReader(this.ioManager, id, linkedBlockingQueue, this.compressionEnable, this.compressionCodecFactory, this.compressionBlockSize, this.segmentSize);
        for (int i2 = 0; i2 < i; i2++) {
            createBlockChannelReader.readBlock(this.internalPool.nextSegment());
        }
        createBlockChannelReader.closeAndDelete();
        ArrayList arrayList = new ArrayList();
        linkedBlockingQueue.drainTo(arrayList);
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public HeaderlessChannelReaderInputView createInputView(FileIOChannel.ID id, int i, int i2) throws IOException {
        return new HeaderlessChannelReaderInputView(FileChannelUtil.createBlockChannelReader(this.ioManager, id, new LinkedBlockingQueue(), this.compressionEnable, this.compressionCodecFactory, this.compressionBlockSize, this.segmentSize), Arrays.asList(MemorySegmentFactory.allocateUnpooledSegment(this.segmentSize), MemorySegmentFactory.allocateUnpooledSegment(this.segmentSize)), i, i2, false);
    }

    public static int hash(int i, int i2) {
        int rotateLeft = Integer.rotateLeft(i, i2 * 11);
        return rotateLeft >= 0 ? rotateLeft : -(rotateLeft + 1);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static int partitionLevelHash(int i) {
        return i ^ (i >>> 16);
    }
}
