package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.util.event.NotificationListener;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.class */
public class LocalInputChannel extends InputChannel implements NotificationListener {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) LocalInputChannel.class);
    private final Object requestLock;
    private final ResultPartitionManager partitionManager;
    private final TaskEventDispatcher taskEventDispatcher;
    private volatile ResultSubpartitionView subpartitionView;
    private volatile boolean isReleased;
    private volatile Buffer lookAhead;

    LocalInputChannel(SingleInputGate singleInputGate, int i, ResultPartitionID resultPartitionID, ResultPartitionManager resultPartitionManager, TaskEventDispatcher taskEventDispatcher, IOMetricGroup iOMetricGroup) {
        this(singleInputGate, i, resultPartitionID, resultPartitionManager, taskEventDispatcher, new Tuple2(0, 0), iOMetricGroup);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LocalInputChannel(SingleInputGate singleInputGate, int i, ResultPartitionID resultPartitionID, ResultPartitionManager resultPartitionManager, TaskEventDispatcher taskEventDispatcher, Tuple2<Integer, Integer> tuple2, IOMetricGroup iOMetricGroup) {
        super(singleInputGate, i, resultPartitionID, tuple2, iOMetricGroup.getNumBytesInLocalCounter());
        this.requestLock = new Object();
        this.partitionManager = (ResultPartitionManager) Preconditions.checkNotNull(resultPartitionManager);
        this.taskEventDispatcher = (TaskEventDispatcher) Preconditions.checkNotNull(taskEventDispatcher);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public void requestSubpartition(int i) throws IOException, InterruptedException {
        synchronized (this.requestLock) {
            if (this.subpartitionView == null) {
                LOG.debug("{}: Requesting LOCAL subpartition {} of partition {}.", this, Integer.valueOf(i), this.partitionId);
                try {
                    this.subpartitionView = this.partitionManager.createSubpartitionView(this.partitionId, i, this.inputGate.getBufferProvider());
                    if (this.subpartitionView == null) {
                        throw new IOException("Error requesting subpartition.");
                    }
                    getNextLookAhead();
                } catch (PartitionNotFoundException e) {
                    if (!increaseBackoff()) {
                        throw e;
                    }
                    this.inputGate.retriggerPartitionRequest(this.partitionId.getPartitionId());
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void retriggerSubpartitionRequest(Timer timer, final int i) throws IOException, InterruptedException {
        synchronized (this.requestLock) {
            Preconditions.checkState(this.subpartitionView == null, "Already requested partition.");
            timer.schedule(new TimerTask() { // from class: org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.1
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    try {
                        LocalInputChannel.this.requestSubpartition(i);
                    } catch (Throwable th) {
                        LocalInputChannel.this.setError(th);
                    }
                }
            }, getCurrentBackoff());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public Buffer getNextBuffer() throws IOException, InterruptedException {
        checkError();
        Preconditions.checkState(this.subpartitionView != null, "Queried for a buffer before requesting the subpartition.");
        if (this.lookAhead == null) {
            this.lookAhead = this.subpartitionView.getNextBuffer();
        }
        Buffer buffer = this.lookAhead;
        this.lookAhead = null;
        if (!buffer.isBuffer() && EventSerializer.fromBuffer(buffer, getClass().getClassLoader()).getClass() == EndOfPartitionEvent.class) {
            return buffer;
        }
        getNextLookAhead();
        this.numBytesIn.inc(buffer.getSize());
        return buffer;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public void sendTaskEvent(TaskEvent taskEvent) throws IOException {
        checkError();
        Preconditions.checkState(this.subpartitionView != null, "Tried to send task event to producer before requesting the subpartition.");
        if (!this.taskEventDispatcher.publish(this.partitionId, taskEvent)) {
            throw new IOException("Error while publishing event " + taskEvent + " to producer. The producer could not be found.");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public boolean isReleased() {
        return this.isReleased;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public void notifySubpartitionConsumed() throws IOException {
        if (this.subpartitionView != null) {
            this.subpartitionView.notifySubpartitionConsumed();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public void releaseAllResources() throws IOException {
        if (this.isReleased) {
            return;
        }
        if (this.lookAhead != null) {
            this.lookAhead.recycle();
            this.lookAhead = null;
        }
        if (this.subpartitionView != null) {
            this.subpartitionView.releaseAllResources();
            this.subpartitionView = null;
        }
        this.isReleased = true;
    }

    public String toString() {
        return "LocalInputChannel [" + this.partitionId + "]";
    }

    @Override // org.apache.flink.runtime.util.event.NotificationListener
    public void onNotification() {
        if (this.isReleased) {
            return;
        }
        try {
            getNextLookAhead();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void getNextLookAhead() throws IOException, InterruptedException {
        ResultSubpartitionView resultSubpartitionView = this.subpartitionView;
        if (resultSubpartitionView == null) {
            return;
        }
        do {
            this.lookAhead = resultSubpartitionView.getNextBuffer();
            if (this.lookAhead != null) {
                notifyAvailableBuffer();
                return;
            } else if (resultSubpartitionView.registerListener(this)) {
                return;
            }
        } while (!resultSubpartitionView.isReleased());
        Throwable failureCause = resultSubpartitionView.getFailureCause();
        if (failureCause != null) {
            setError(new ProducerFailedException(failureCause));
        }
    }
}
