package org.apache.flink.runtime.operators.sort;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.resettable.NonReusingBlockResettableIterator;
import org.apache.flink.runtime.operators.resettable.SpillingResettableIterator;
import org.apache.flink.runtime.operators.util.JoinTaskIterator;
import org.apache.flink.runtime.util.KeyGroupedIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/operators/sort/AbstractMergeIterator.class */
public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O> {
    private final Logger LOG = LoggerFactory.getLogger(getClass());
    protected TypePairComparator<T1, T2> pairComparator;
    protected KeyGroupedIterator<T1> iterator1;
    protected KeyGroupedIterator<T2> iterator2;
    protected final TypeSerializer<T1> serializer1;
    protected final TypeSerializer<T2> serializer2;
    private final NonReusingBlockResettableIterator<T2> blockIt;
    private final IOManager ioManager;
    private final MemoryManager memoryManager;
    private final List<MemorySegment> memoryForSpillingIterator;
    protected T1 copy1;
    protected T1 spillHeadCopy;
    protected T2 copy2;
    protected T2 blockHeadCopy;

    /* JADX WARN: Multi-variable type inference failed */
    public AbstractMergeIterator(MutableObjectIterator<T1> mutableObjectIterator, MutableObjectIterator<T2> mutableObjectIterator2, TypeSerializer<T1> typeSerializer, TypeComparator<T1> typeComparator, TypeSerializer<T2> typeSerializer2, TypeComparator<T2> typeComparator2, TypePairComparator<T1, T2> typePairComparator, MemoryManager memoryManager, IOManager iOManager, int i, AbstractInvokable abstractInvokable) throws MemoryAllocationException {
        if (i < 2) {
            throw new IllegalArgumentException("Merger needs at least 2 memory pages.");
        }
        this.pairComparator = typePairComparator;
        this.serializer1 = typeSerializer;
        this.serializer2 = typeSerializer2;
        this.memoryManager = memoryManager;
        this.ioManager = iOManager;
        this.iterator1 = (KeyGroupedIterator<T1>) createKeyGroupedIterator(mutableObjectIterator, typeSerializer, typeComparator.duplicate());
        this.iterator2 = (KeyGroupedIterator<T2>) createKeyGroupedIterator(mutableObjectIterator2, typeSerializer2, typeComparator2.duplicate());
        int i2 = i > 20 ? 2 : 1;
        this.blockIt = new NonReusingBlockResettableIterator<>(this.memoryManager, this.serializer2, i - i2, abstractInvokable);
        this.memoryForSpillingIterator = memoryManager.allocatePages(abstractInvokable, i2);
    }

    @Override // org.apache.flink.runtime.operators.util.JoinTaskIterator
    public void open() throws IOException {
    }

    @Override // org.apache.flink.runtime.operators.util.JoinTaskIterator
    public void close() {
        if (this.blockIt != null) {
            try {
                this.blockIt.close();
            } catch (Throwable th) {
                this.LOG.error("Error closing block memory iterator: " + th.getMessage(), th);
            }
        }
        this.memoryManager.release(this.memoryForSpillingIterator);
    }

    @Override // org.apache.flink.runtime.operators.util.JoinTaskIterator
    public void abort() {
        close();
    }

    @Override // org.apache.flink.runtime.operators.util.JoinTaskIterator
    public abstract boolean callWithNextKey(FlatJoinFunction<T1, T2, O> flatJoinFunction, Collector<O> collector) throws Exception;

    /* JADX INFO: Access modifiers changed from: protected */
    public void crossMatchingGroup(Iterator<T1> it, Iterator<T2> it2, FlatJoinFunction<T1, T2, O> flatJoinFunction, Collector<O> collector) throws Exception {
        T1 next = it.next();
        T2 next2 = it2.next();
        boolean hasNext = it.hasNext();
        boolean hasNext2 = it2.hasNext();
        if (hasNext) {
            if (hasNext2) {
                crossMwithNValues(next, it, next2, it2, flatJoinFunction, collector);
                return;
            } else {
                crossSecond1withNValues(next2, next, it, flatJoinFunction, collector);
                return;
            }
        }
        if (hasNext2) {
            crossFirst1withNValues(next, next2, it2, flatJoinFunction, collector);
        } else {
            flatJoinFunction.join(next, next2, collector);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void crossFirst1withNValues(T1 t1, T2 t2, Iterator<T2> it, FlatJoinFunction<T1, T2, O> flatJoinFunction, Collector<O> collector) throws Exception {
        flatJoinFunction.join(createCopy(this.serializer1, t1, this.copy1), t2, collector);
        boolean z = true;
        do {
            T2 next = it.next();
            if (it.hasNext()) {
                flatJoinFunction.join(createCopy(this.serializer1, t1, this.copy1), next, collector);
            } else {
                flatJoinFunction.join(t1, next, collector);
                z = false;
            }
        } while (z);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void crossSecond1withNValues(T2 t2, T1 t1, Iterator<T1> it, FlatJoinFunction<T1, T2, O> flatJoinFunction, Collector<O> collector) throws Exception {
        flatJoinFunction.join(t1, createCopy(this.serializer2, t2, this.copy2), collector);
        boolean z = true;
        do {
            T1 next = it.next();
            if (it.hasNext()) {
                flatJoinFunction.join(next, createCopy(this.serializer2, t2, this.copy2), collector);
            } else {
                flatJoinFunction.join(next, t2, collector);
                z = false;
            }
        } while (z);
    }

    /* JADX WARN: Finally extract failed */
    /* JADX WARN: Multi-variable type inference failed */
    private void crossMwithNValues(T1 t1, Iterator<T1> it, T2 t2, Iterator<T2> it2, FlatJoinFunction<T1, T2, O> flatJoinFunction, Collector<O> collector) throws Exception {
        Iterator<T1> it3;
        Object createCopy = createCopy(this.serializer1, t1, this.copy1);
        Object createCopy2 = createCopy(this.serializer2, t2, this.blockHeadCopy);
        Object obj = null;
        flatJoinFunction.join(createCopy, t2, collector);
        SpillingResettableIterator spillingResettableIterator = null;
        try {
            this.blockIt.reopen(it2);
            while (this.blockIt.hasNext()) {
                flatJoinFunction.join(createCopy(this.serializer1, t1, this.copy1), this.blockIt.next(), collector);
            }
            this.blockIt.reset();
            boolean hasFurtherInput = this.blockIt.hasFurtherInput();
            if (hasFurtherInput) {
                spillingResettableIterator = new SpillingResettableIterator(it, this.serializer1, this.memoryManager, this.ioManager, this.memoryForSpillingIterator);
                it3 = spillingResettableIterator;
                spillingResettableIterator.open();
                obj = createCopy(this.serializer1, t1, this.spillHeadCopy);
            } else {
                it3 = it;
            }
            while (it3.hasNext()) {
                T1 next = it3.next();
                flatJoinFunction.join(createCopy(this.serializer1, next, this.copy1), createCopy(this.serializer2, createCopy2, this.copy2), collector);
                while (this.blockIt.hasNext()) {
                    flatJoinFunction.join(createCopy(this.serializer1, next, this.copy1), this.blockIt.next(), collector);
                }
                this.blockIt.reset();
            }
            if (!hasFurtherInput) {
                if (spillingResettableIterator != null) {
                    this.memoryForSpillingIterator.addAll(spillingResettableIterator.close());
                    return;
                }
                return;
            }
            while (this.blockIt.nextBlock()) {
                spillingResettableIterator.reset();
                while (this.blockIt.hasNext()) {
                    flatJoinFunction.join(createCopy(this.serializer1, obj, this.copy1), this.blockIt.next(), collector);
                }
                this.blockIt.reset();
                while (spillingResettableIterator.hasNext()) {
                    Object next2 = spillingResettableIterator.next();
                    while (this.blockIt.hasNext()) {
                        flatJoinFunction.join(createCopy(this.serializer1, next2, this.copy1), this.blockIt.next(), collector);
                    }
                    this.blockIt.reset();
                }
                spillingResettableIterator.reset();
            }
            if (spillingResettableIterator != null) {
                this.memoryForSpillingIterator.addAll(spillingResettableIterator.close());
            }
        } catch (Throwable th) {
            if (spillingResettableIterator != null) {
                this.memoryForSpillingIterator.addAll(spillingResettableIterator.close());
            }
            throw th;
        }
    }

    protected abstract <T> KeyGroupedIterator<T> createKeyGroupedIterator(MutableObjectIterator<T> mutableObjectIterator, TypeSerializer<T> typeSerializer, TypeComparator<T> typeComparator);

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract <T> T createCopy(TypeSerializer<T> typeSerializer, T t, T t2);
}
