package org.apache.beam.sdk.fn.data;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.CancellableQueue;

/* loaded from: input_file:org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver.class */
public class BeamFnDataInboundObserver implements CloseableFnDataReceiver<BeamFnApi.Elements> {
    private final Map<String, EndpointStatus<DataEndpoint<?>>> transformIdToDataEndpoint = new HashMap();
    private final Map<String, Map<String, EndpointStatus<TimerEndpoint<?>>>> transformIdToTimerFamilyIdToTimerEndpoint;
    private final CancellableQueue<BeamFnApi.Elements> queue;
    private final int totalNumEndpoints;
    private int numEndpointsThatAreIncomplete;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver$CloseException.class */
    public static class CloseException extends Exception {
        public static final CloseException INSTANCE = new CloseException();

        private CloseException() {
            super("Inbound observer closed.", null, false, false);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver$EndpointStatus.class */
    public static class EndpointStatus<T> {
        final T endpoint;
        boolean isDone;

        EndpointStatus(T t) {
            this.endpoint = t;
        }
    }

    public static BeamFnDataInboundObserver forConsumers(List<DataEndpoint<?>> list, List<TimerEndpoint<?>> list2) {
        return new BeamFnDataInboundObserver(list, list2);
    }

    private BeamFnDataInboundObserver(List<DataEndpoint<?>> list, List<TimerEndpoint<?>> list2) {
        for (DataEndpoint<?> dataEndpoint : list) {
            this.transformIdToDataEndpoint.put(dataEndpoint.getTransformId(), new EndpointStatus<>(dataEndpoint));
        }
        this.transformIdToTimerFamilyIdToTimerEndpoint = new HashMap();
        for (TimerEndpoint<?> timerEndpoint : list2) {
            this.transformIdToTimerFamilyIdToTimerEndpoint.computeIfAbsent(timerEndpoint.getTransformId(), str -> {
                return new HashMap();
            }).put(timerEndpoint.getTimerFamilyId(), new EndpointStatus<>(timerEndpoint));
        }
        this.queue = new CancellableQueue<>(100);
        this.totalNumEndpoints = list.size() + list2.size();
        this.numEndpointsThatAreIncomplete = this.totalNumEndpoints;
    }

    @Override // org.apache.beam.sdk.fn.data.FnDataReceiver
    public void accept(BeamFnApi.Elements elements) throws Exception {
        this.queue.put(elements);
    }

    @Override // org.apache.beam.sdk.fn.data.CloseableFnDataReceiver
    public void flush() throws Exception {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.beam.sdk.fn.data.CloseableFnDataReceiver, java.lang.AutoCloseable
    public void close() throws Exception {
        this.queue.cancel(CloseException.INSTANCE);
    }

    public void awaitCompletion() throws Exception {
        do {
            try {
                try {
                } catch (Exception e) {
                    this.queue.cancel(e);
                    throw e;
                }
            } finally {
                close();
            }
        } while (!multiplexElements(this.queue.take()));
    }

    public boolean multiplexElements(BeamFnApi.Elements elements) throws Exception {
        for (BeamFnApi.Elements.Data data : elements.getDataList()) {
            EndpointStatus<DataEndpoint<?>> endpointStatus = this.transformIdToDataEndpoint.get(data.getTransformId());
            if (endpointStatus == null) {
                throw new IllegalStateException(String.format("Unable to find inbound data receiver for instruction %s and transform %s.", data.getInstructionId(), data.getTransformId()));
            }
            if (endpointStatus.isDone) {
                throw new IllegalStateException(String.format("Received data after inbound data receiver is done for instruction %s and transform %s.", data.getInstructionId(), data.getTransformId()));
            }
            InputStream newInput = data.getData().newInput();
            Coder<?> coder = endpointStatus.endpoint.getCoder();
            FnDataReceiver<?> receiver = endpointStatus.endpoint.getReceiver();
            while (newInput.available() > 0) {
                receiver.accept(coder.decode(newInput));
            }
            if (data.getIsLast()) {
                endpointStatus.isDone = true;
                this.numEndpointsThatAreIncomplete--;
            }
        }
        for (BeamFnApi.Elements.Timers timers : elements.getTimersList()) {
            Map<String, EndpointStatus<TimerEndpoint<?>>> map = this.transformIdToTimerFamilyIdToTimerEndpoint.get(timers.getTransformId());
            if (map == null) {
                throw new IllegalStateException(String.format("Unable to find inbound timer receiver for instruction %s, transform %s, and timer family %s.", timers.getInstructionId(), timers.getTransformId(), timers.getTimerFamilyId()));
            }
            EndpointStatus<TimerEndpoint<?>> endpointStatus2 = map.get(timers.getTimerFamilyId());
            if (endpointStatus2 == null) {
                throw new IllegalStateException(String.format("Unable to find inbound timer receiver for instruction %s, transform %s, and timer family %s.", timers.getInstructionId(), timers.getTransformId(), timers.getTimerFamilyId()));
            }
            if (endpointStatus2.isDone) {
                throw new IllegalStateException(String.format("Received timer after inbound timer receiver is done for instruction %s, transform %s, and timer family %s.", timers.getInstructionId(), timers.getTransformId(), timers.getTimerFamilyId()));
            }
            InputStream newInput2 = timers.getTimers().newInput();
            Coder<?> coder2 = endpointStatus2.endpoint.getCoder();
            FnDataReceiver<?> receiver2 = endpointStatus2.endpoint.getReceiver();
            while (newInput2.available() > 0) {
                receiver2.accept(coder2.decode(newInput2));
            }
            if (timers.getIsLast()) {
                endpointStatus2.isDone = true;
                this.numEndpointsThatAreIncomplete--;
            }
        }
        return this.numEndpointsThatAreIncomplete == 0;
    }

    public void reset() {
        this.numEndpointsThatAreIncomplete = this.totalNumEndpoints;
        Iterator<EndpointStatus<DataEndpoint<?>>> it = this.transformIdToDataEndpoint.values().iterator();
        while (it.hasNext()) {
            it.next().isDone = false;
        }
        Iterator<Map<String, EndpointStatus<TimerEndpoint<?>>>> it2 = this.transformIdToTimerFamilyIdToTimerEndpoint.values().iterator();
        while (it2.hasNext()) {
            Iterator<EndpointStatus<TimerEndpoint<?>>> it3 = it2.next().values().iterator();
            while (it3.hasNext()) {
                it3.next().isDone = false;
            }
        }
        this.queue.reset();
    }

    public List<String> getUnfinishedEndpoints() {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<String, EndpointStatus<DataEndpoint<?>>> entry : this.transformIdToDataEndpoint.entrySet()) {
            if (!entry.getValue().isDone) {
                arrayList.add(String.format("%s:data", entry.getKey()));
            }
        }
        for (Map.Entry<String, Map<String, EndpointStatus<TimerEndpoint<?>>>> entry2 : this.transformIdToTimerFamilyIdToTimerEndpoint.entrySet()) {
            for (Map.Entry<String, EndpointStatus<TimerEndpoint<?>>> entry3 : entry2.getValue().entrySet()) {
                if (!entry3.getValue().isDone) {
                    arrayList.add(String.format("%s:timers:%s", entry2.getKey(), entry3.getKey()));
                }
            }
        }
        return arrayList;
    }
}
