/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kinesis.testutils;

import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

public class TestableKinesisDataFetcher<T>
extends KinesisDataFetcher<T> {
    private final OneShotLatch runWaiter;
    private final Semaphore discoveryWaiter = new Semaphore(0);
    private final OneShotLatch shutdownWaiter;
    private volatile boolean running = true;
    private volatile boolean executorServiceShutdownNowCalled;

    public TestableKinesisDataFetcher(List<String> fakeStreams, SourceFunction.SourceContext<T> sourceContext, Properties fakeConfiguration, KinesisDeserializationSchema<T> deserializationSchema, int fakeTotalCountOfSubtasks, int fakeIndexOfThisSubtask, AtomicReference<Throwable> thrownErrorUnderTest, LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest, HashMap<String, String> subscribedStreamsToLastDiscoveredShardIdsStateUnderTest, KinesisProxyInterface fakeKinesis) {
        this(fakeStreams, sourceContext, fakeConfiguration, deserializationSchema, fakeTotalCountOfSubtasks, fakeIndexOfThisSubtask, thrownErrorUnderTest, subscribedShardsStateUnderTest, subscribedStreamsToLastDiscoveredShardIdsStateUnderTest, fakeKinesis, null);
    }

    public TestableKinesisDataFetcher(List<String> fakeStreams, SourceFunction.SourceContext<T> sourceContext, Properties fakeConfiguration, KinesisDeserializationSchema<T> deserializationSchema, int fakeTotalCountOfSubtasks, int fakeIndexOfThisSubtask, AtomicReference<Throwable> thrownErrorUnderTest, LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest, HashMap<String, String> subscribedStreamsToLastDiscoveredShardIdsStateUnderTest, KinesisProxyInterface fakeKinesis, KinesisProxyV2Interface fakeKinesisV2) {
        super(fakeStreams, sourceContext, sourceContext.getCheckpointLock(), TestUtils.getMockedRuntimeContext(fakeTotalCountOfSubtasks, fakeIndexOfThisSubtask), fakeConfiguration, deserializationSchema, DEFAULT_SHARD_ASSIGNER, null, null, thrownErrorUnderTest, subscribedShardsStateUnderTest, subscribedStreamsToLastDiscoveredShardIdsStateUnderTest, properties -> fakeKinesis, properties -> fakeKinesisV2);
        this.runWaiter = new OneShotLatch();
        this.shutdownWaiter = new OneShotLatch();
    }

    public void runFetcher() throws Exception {
        this.runWaiter.trigger();
        super.runFetcher();
    }

    public void waitUntilRun() throws Exception {
        this.runWaiter.await();
    }

    public void waitUntilShutdown(long timeout, TimeUnit timeUnit) throws Exception {
        this.shutdownWaiter.await(timeout, timeUnit);
    }

    protected ExecutorService createShardConsumersThreadPool(String subtaskName) {
        ExecutorService mockExecutorService = (ExecutorService)Mockito.mock(ExecutorService.class);
        Mockito.when((Object)mockExecutorService.isTerminated()).thenAnswer(invocation -> !this.running);
        Mockito.when(mockExecutorService.shutdownNow()).thenAnswer(invocationOnMock -> {
            this.executorServiceShutdownNowCalled = true;
            return Collections.emptyList();
        });
        try {
            Mockito.when((Object)mockExecutorService.awaitTermination(ArgumentMatchers.anyLong(), (TimeUnit)((Object)ArgumentMatchers.any()))).thenAnswer(invocationOnMock -> !this.running && this.executorServiceShutdownNowCalled);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        return mockExecutorService;
    }

    public void awaitTermination() throws InterruptedException {
        this.running = false;
        super.awaitTermination();
    }

    public void shutdownFetcher() {
        super.shutdownFetcher();
        this.shutdownWaiter.trigger();
    }

    public List<StreamShardHandle> discoverNewShardsToSubscribe() throws InterruptedException {
        List newShards = super.discoverNewShardsToSubscribe();
        this.discoveryWaiter.release();
        return newShards;
    }

    public void waitUntilInitialDiscovery() throws InterruptedException {
        this.discoveryWaiter.acquire();
    }

    public void waitUntilDiscovery(int number) throws InterruptedException {
        this.discoveryWaiter.acquire(number);
    }
}

