package org.apache.flink.api.connector.source.mocks;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import javax.annotation.Nonnull;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.util.ThrowableCatchingRunnable;

/* loaded from: input_file:org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorContext.class */
public class MockSplitEnumeratorContext<SplitT extends SourceSplit> implements SplitEnumeratorContext<SplitT>, AutoCloseable {
    private final int parallelism;
    private final Map<Integer, List<SourceEvent>> sentSourceEvent = new HashMap();
    private final ConcurrentMap<Integer, ReaderInfo> registeredReaders = new ConcurrentHashMap();
    private final List<SplitsAssignment<SplitT>> splitsAssignmentSequence = new ArrayList();
    private final AtomicReference<Throwable> errorInWorkerThread = new AtomicReference<>();
    private final AtomicReference<Throwable> errorInMainThread = new AtomicReference<>();
    private final BlockingQueue<Callable<Future<?>>> oneTimeCallables = new ArrayBlockingQueue(100);
    private final List<Callable<Future<?>>> periodicCallables = Collections.synchronizedList(new ArrayList());
    private final TestingExecutorThreadFactory mainThreadFactory = getThreadFactory("SplitEnumerator-main", this.errorInMainThread);
    private final ExecutorService workerExecutor = getExecutor(getThreadFactory("SplitEnumerator-worker", this.errorInWorkerThread));
    private final ExecutorService mainExecutor = getExecutor(this.mainThreadFactory);
    private final AtomicBoolean stoppedAcceptAsyncCalls = new AtomicBoolean(false);

    /* loaded from: input_file:org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorContext$TestingExecutorThreadFactory.class */
    public static class TestingExecutorThreadFactory implements ThreadFactory {
        private final String coordinatorThreadName;
        private final AtomicReference<Throwable> error;
        private Thread t = null;

        TestingExecutorThreadFactory(String str, AtomicReference<Throwable> atomicReference) {
            this.coordinatorThreadName = str;
            this.error = atomicReference;
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(@Nonnull Runnable runnable) {
            if (this.t != null) {
                throw new IllegalStateException("Should never happen. This factory should only be used by a SingleThreadExecutor.");
            }
            this.t = new Thread(runnable, this.coordinatorThreadName);
            this.t.setUncaughtExceptionHandler((thread, th) -> {
                if (this.error.compareAndSet(null, th)) {
                    return;
                }
                this.error.get().addSuppressed(th);
            });
            return this.t;
        }

        boolean isCurrentThreadMainExecutorThread() {
            return Thread.currentThread() == this.t;
        }
    }

    public MockSplitEnumeratorContext(int i) {
        this.parallelism = i;
    }

    public SplitEnumeratorMetricGroup metricGroup() {
        return UnregisteredMetricsGroup.createSplitEnumeratorMetricGroup();
    }

    public void sendEventToSourceReader(int i, SourceEvent sourceEvent) {
        try {
            if (this.mainThreadFactory.isCurrentThreadMainExecutorThread()) {
                this.sentSourceEvent.computeIfAbsent(Integer.valueOf(i), num -> {
                    return new ArrayList();
                }).add(sourceEvent);
            } else {
                this.mainExecutor.submit(() -> {
                    return Boolean.valueOf(this.sentSourceEvent.computeIfAbsent(Integer.valueOf(i), num2 -> {
                        return new ArrayList();
                    }).add(sourceEvent));
                }).get();
            }
        } catch (Exception e) {
            throw new RuntimeException("Failed to assign splits", e);
        }
    }

    public int currentParallelism() {
        return this.parallelism;
    }

    public Map<Integer, ReaderInfo> registeredReaders() {
        return this.registeredReaders;
    }

    public void assignSplits(SplitsAssignment<SplitT> splitsAssignment) {
        this.splitsAssignmentSequence.add(splitsAssignment);
    }

    public void signalNoMoreSplits(int i) {
    }

    public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> biConsumer) {
        if (this.stoppedAcceptAsyncCalls.get()) {
            return;
        }
        this.oneTimeCallables.add(() -> {
            return this.workerExecutor.submit((Runnable) wrap(this.errorInWorkerThread, () -> {
                try {
                    Object call = callable.call();
                    this.mainExecutor.submit((Runnable) wrap(this.errorInMainThread, () -> {
                        biConsumer.accept(call, null);
                    })).get();
                } catch (Throwable th) {
                    biConsumer.accept(null, th);
                }
            }));
        });
    }

    public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> biConsumer, long j, long j2) {
        if (this.stoppedAcceptAsyncCalls.get()) {
            return;
        }
        this.periodicCallables.add(() -> {
            return this.workerExecutor.submit((Runnable) wrap(this.errorInWorkerThread, () -> {
                try {
                    Object call = callable.call();
                    this.mainExecutor.submit((Runnable) wrap(this.errorInMainThread, () -> {
                        biConsumer.accept(call, null);
                    })).get();
                } catch (Throwable th) {
                    biConsumer.accept(null, th);
                }
            }));
        });
    }

    public void runInCoordinatorThread(Runnable runnable) {
        this.mainExecutor.execute(runnable);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.stoppedAcceptAsyncCalls.set(true);
        this.workerExecutor.shutdownNow();
        this.mainExecutor.shutdownNow();
    }

    public void runNextOneTimeCallable() throws Throwable {
        this.oneTimeCallables.take().call().get();
        checkError();
    }

    public void runPeriodicCallable(int i) throws Throwable {
        this.periodicCallables.get(i).call().get();
        checkError();
    }

    public Map<Integer, List<SourceEvent>> getSentSourceEvent() throws Exception {
        return (Map) this.workerExecutor.submit(() -> {
            return new HashMap(this.sentSourceEvent);
        }).get();
    }

    public void registerReader(ReaderInfo readerInfo) {
        this.registeredReaders.put(Integer.valueOf(readerInfo.getSubtaskId()), readerInfo);
    }

    public void unregisterReader(int i) {
        this.registeredReaders.remove(Integer.valueOf(i));
    }

    public List<Callable<Future<?>>> getPeriodicCallables() {
        return this.periodicCallables;
    }

    public BlockingQueue<Callable<Future<?>>> getOneTimeCallables() {
        return this.oneTimeCallables;
    }

    public List<SplitsAssignment<SplitT>> getSplitsAssignmentSequence() {
        return this.splitsAssignmentSequence;
    }

    private void checkError() throws Throwable {
        if (this.errorInMainThread.get() != null) {
            throw this.errorInMainThread.get();
        }
        if (this.errorInWorkerThread.get() != null) {
            throw this.errorInWorkerThread.get();
        }
    }

    private static TestingExecutorThreadFactory getThreadFactory(String str, AtomicReference<Throwable> atomicReference) {
        return new TestingExecutorThreadFactory(str, atomicReference);
    }

    private static ExecutorService getExecutor(TestingExecutorThreadFactory testingExecutorThreadFactory) {
        return Executors.newSingleThreadScheduledExecutor(testingExecutorThreadFactory);
    }

    private static ThrowableCatchingRunnable wrap(AtomicReference<Throwable> atomicReference, Runnable runnable) {
        return new ThrowableCatchingRunnable(th -> {
            if (atomicReference.compareAndSet(null, th)) {
                return;
            }
            ((Throwable) atomicReference.get()).addSuppressed(th);
        }, runnable);
    }
}
