/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kinesis.testutils;

import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.kinesis.shaded.com.amazonaws.kinesis.agg.RecordAggregator;
import org.apache.flink.kinesis.shaded.org.reactivestreams.Subscriber;
import org.apache.flink.kinesis.shaded.org.reactivestreams.Subscription;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.SdkBytes;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.Consumer;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.ConsumerDescription;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.ConsumerStatus;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.DeregisterStreamConsumerResponse;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerResponse;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.Record;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerResponse;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.StartingPosition;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.StreamDescriptionSummary;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
import org.junit.Assert;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

public class FakeKinesisFanOutBehavioursFactory {
    public static final String STREAM_ARN = "stream-arn";
    public static final String STREAM_CONSUMER_ARN_EXISTING = "stream-consumer-arn";
    public static final String STREAM_CONSUMER_ARN_NEW = "stream-consumer-arn-new";

    public static SingleShardFanOutKinesisV2.Builder boundedShard() {
        return new SingleShardFanOutKinesisV2.Builder();
    }

    public static KinesisProxyV2Interface singletonShard(SubscribeToShardEvent event) {
        return new SingletonEventFanOutKinesisV2(event);
    }

    public static SingleShardFanOutKinesisV2 emptyShard() {
        return new SingleShardFanOutKinesisV2.Builder().withBatchCount(0).build();
    }

    public static KinesisProxyV2Interface resourceNotFoundWhenObtainingSubscription() {
        return new ExceptionalKinesisV2((RuntimeException)ResourceNotFoundException.builder().build());
    }

    public static SubscriptionErrorKinesisV2 errorDuringSubscription(Throwable ... throwables) {
        return new SubscriptionErrorKinesisV2(throwables);
    }

    public static SubscriptionErrorKinesisV2 alternatingSuccessErrorDuringSubscription() {
        return new AlternatingSubscriptionErrorKinesisV2((Throwable)LimitExceededException.builder().build());
    }

    public static KinesisProxyV2Interface failsToAcquireSubscription() {
        return new FailsToAcquireSubscriptionKinesis();
    }

    public static AbstractSingleShardFanOutKinesisV2 shardThatCreatesBackpressureOnQueue() {
        return new MultipleEventsForSingleRequest();
    }

    public static KinesisProxyV2Interface streamNotFound() {
        return new StreamConsumerFakeKinesis.Builder().withThrowsWhileDescribingStream((RuntimeException)ResourceNotFoundException.builder().build()).build();
    }

    public static StreamConsumerFakeKinesis streamConsumerNotFound() {
        return new StreamConsumerFakeKinesis.Builder().withStreamConsumerNotFound(true).build();
    }

    public static StreamConsumerFakeKinesis existingActiveConsumer() {
        return new StreamConsumerFakeKinesis.Builder().build();
    }

    public static StreamConsumerFakeKinesis registerExistingConsumerAndWaitToBecomeActive() {
        return new StreamConsumerFakeKinesis.Builder().withStreamConsumerStatus(ConsumerStatus.CREATING).build();
    }

    public static AbstractSingleShardFanOutKinesisV2 emptyBatchFollowedBySingleRecord() {
        return new AbstractSingleShardFanOutKinesisV2(2){
            private int subscriptionCount = 0;

            @Override
            List<SubscribeToShardEvent> getEventsToSend() {
                SubscribeToShardEvent.Builder builder = SubscribeToShardEvent.builder().continuationSequenceNumber(this.subscriptionCount == 0 ? "1" : null);
                if (this.subscriptionCount == 1) {
                    builder.records(new Record[]{FakeKinesisFanOutBehavioursFactory.createRecord(new AtomicInteger(1))});
                }
                ++this.subscriptionCount;
                return Collections.singletonList(builder.build());
            }
        };
    }

    private static Record createRecord(AtomicInteger sequenceNumber) {
        return FakeKinesisFanOutBehavioursFactory.createRecord(RandomStringUtils.randomAlphabetic((int)32).getBytes(StandardCharsets.UTF_8), sequenceNumber);
    }

    private static Record createRecord(byte[] data, AtomicInteger sequenceNumber) {
        return (Record)Record.builder().approximateArrivalTimestamp(Instant.now()).data(SdkBytes.fromByteArray((byte[])data)).sequenceNumber(String.valueOf(sequenceNumber.incrementAndGet())).partitionKey("pk").build();
    }

    private static Record createAggregatedRecord(int aggregationFactor, AtomicInteger sequenceNumber) {
        RecordAggregator recordAggregator = new RecordAggregator();
        for (int i = 0; i < aggregationFactor; ++i) {
            try {
                recordAggregator.addUserRecord("pk", RandomStringUtils.randomAlphabetic((int)32).getBytes(StandardCharsets.UTF_8));
                continue;
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        return FakeKinesisFanOutBehavioursFactory.createRecord(recordAggregator.clearAndGet().toRecordBytes(), sequenceNumber);
    }

    private static List<SubscribeToShardEvent> generateEvents(int numberOfEvents, AtomicInteger sequenceNumber) {
        return IntStream.range(0, numberOfEvents).mapToObj(i -> (SubscribeToShardEvent)SubscribeToShardEvent.builder().records(new Record[]{FakeKinesisFanOutBehavioursFactory.createRecord(sequenceNumber)}).continuationSequenceNumber(String.valueOf(i)).build()).collect(Collectors.toList());
    }

    private static class KinesisProxyV2InterfaceAdapter
    implements KinesisProxyV2Interface {
        private KinesisProxyV2InterfaceAdapter() {
        }

        public DescribeStreamSummaryResponse describeStreamSummary(String stream) throws InterruptedException, ExecutionException {
            throw new UnsupportedOperationException("This method is not implemented.");
        }

        public DescribeStreamConsumerResponse describeStreamConsumer(String streamConsumerArn) throws InterruptedException, ExecutionException {
            throw new UnsupportedOperationException("This method is not implemented.");
        }

        public DescribeStreamConsumerResponse describeStreamConsumer(String streamArn, String consumerName) throws InterruptedException, ExecutionException {
            throw new UnsupportedOperationException("This method is not implemented.");
        }

        public RegisterStreamConsumerResponse registerStreamConsumer(String streamArn, String consumerName) throws InterruptedException, ExecutionException {
            throw new UnsupportedOperationException("This method is not implemented.");
        }

        public DeregisterStreamConsumerResponse deregisterStreamConsumer(String consumerArn) throws InterruptedException, ExecutionException {
            throw new UnsupportedOperationException("This method is not implemented.");
        }

        public CompletableFuture<Void> subscribeToShard(SubscribeToShardRequest request, SubscribeToShardResponseHandler responseHandler) {
            throw new UnsupportedOperationException("This method is not implemented.");
        }
    }

    public static class StreamConsumerFakeKinesis
    extends KinesisProxyV2InterfaceAdapter {
        public static final int NUMBER_OF_DESCRIBE_REQUESTS_TO_ACTIVATE = 5;
        public static final int NUMBER_OF_DESCRIBE_REQUESTS_TO_DELETE = 5;
        private final RuntimeException throwsWhileDescribingStream;
        private String streamConsumerArn = "stream-consumer-arn";
        private ConsumerStatus streamConsumerStatus;
        private boolean streamConsumerNotFound;
        private int numberOfDescribeStreamConsumerInvocations = 0;

        private StreamConsumerFakeKinesis(Builder builder) {
            this.throwsWhileDescribingStream = builder.throwsWhileDescribingStream;
            this.streamConsumerStatus = builder.streamConsumerStatus;
            this.streamConsumerNotFound = builder.streamConsumerNotFound;
        }

        public int getNumberOfDescribeStreamConsumerInvocations() {
            return this.numberOfDescribeStreamConsumerInvocations;
        }

        @Override
        public DescribeStreamSummaryResponse describeStreamSummary(String stream) throws InterruptedException, ExecutionException {
            if (this.throwsWhileDescribingStream != null) {
                throw this.throwsWhileDescribingStream;
            }
            return (DescribeStreamSummaryResponse)DescribeStreamSummaryResponse.builder().streamDescriptionSummary((StreamDescriptionSummary)StreamDescriptionSummary.builder().streamARN(FakeKinesisFanOutBehavioursFactory.STREAM_ARN).build()).build();
        }

        @Override
        public RegisterStreamConsumerResponse registerStreamConsumer(String streamArn, String consumerName) throws InterruptedException, ExecutionException {
            Assert.assertEquals((Object)FakeKinesisFanOutBehavioursFactory.STREAM_ARN, (Object)streamArn);
            this.streamConsumerNotFound = false;
            this.streamConsumerArn = FakeKinesisFanOutBehavioursFactory.STREAM_CONSUMER_ARN_NEW;
            return (RegisterStreamConsumerResponse)RegisterStreamConsumerResponse.builder().consumer((Consumer)Consumer.builder().consumerARN(FakeKinesisFanOutBehavioursFactory.STREAM_CONSUMER_ARN_NEW).consumerStatus(this.streamConsumerStatus).build()).build();
        }

        @Override
        public DeregisterStreamConsumerResponse deregisterStreamConsumer(String consumerArn) throws InterruptedException, ExecutionException {
            this.streamConsumerStatus = ConsumerStatus.DELETING;
            return (DeregisterStreamConsumerResponse)DeregisterStreamConsumerResponse.builder().build();
        }

        @Override
        public DescribeStreamConsumerResponse describeStreamConsumer(String streamArn, String consumerName) throws InterruptedException, ExecutionException {
            Assert.assertEquals((Object)FakeKinesisFanOutBehavioursFactory.STREAM_ARN, (Object)streamArn);
            ++this.numberOfDescribeStreamConsumerInvocations;
            if (this.streamConsumerStatus == ConsumerStatus.DELETING && this.numberOfDescribeStreamConsumerInvocations == 5) {
                this.streamConsumerNotFound = true;
            } else if (this.numberOfDescribeStreamConsumerInvocations == 5) {
                this.streamConsumerStatus = ConsumerStatus.ACTIVE;
            }
            if (this.streamConsumerNotFound) {
                throw new ExecutionException((Throwable)ResourceNotFoundException.builder().build());
            }
            return (DescribeStreamConsumerResponse)DescribeStreamConsumerResponse.builder().consumerDescription((ConsumerDescription)ConsumerDescription.builder().consumerARN(this.streamConsumerArn).consumerName(consumerName).consumerStatus(this.streamConsumerStatus).build()).build();
        }

        @Override
        public DescribeStreamConsumerResponse describeStreamConsumer(String streamConsumerArn) throws InterruptedException, ExecutionException {
            Assert.assertEquals((Object)this.streamConsumerArn, (Object)streamConsumerArn);
            return this.describeStreamConsumer(FakeKinesisFanOutBehavioursFactory.STREAM_ARN, "consumer-name");
        }

        private static class Builder {
            private RuntimeException throwsWhileDescribingStream;
            private ConsumerStatus streamConsumerStatus = ConsumerStatus.ACTIVE;
            private boolean streamConsumerNotFound = false;

            private Builder() {
            }

            public StreamConsumerFakeKinesis build() {
                return new StreamConsumerFakeKinesis(this);
            }

            public Builder withStreamConsumerNotFound(boolean streamConsumerNotFound) {
                this.streamConsumerNotFound = streamConsumerNotFound;
                return this;
            }

            public Builder withThrowsWhileDescribingStream(RuntimeException throwsWhileDescribingStream) {
                this.throwsWhileDescribingStream = throwsWhileDescribingStream;
                return this;
            }

            public Builder withStreamConsumerStatus(ConsumerStatus streamConsumerStatus) {
                this.streamConsumerStatus = streamConsumerStatus;
                return this;
            }
        }
    }

    public static abstract class AbstractSingleShardFanOutKinesisV2
    extends KinesisProxyV2InterfaceAdapter {
        private final List<SubscribeToShardRequest> requests = new ArrayList<SubscribeToShardRequest>();
        private int remainingSubscriptions;

        private AbstractSingleShardFanOutKinesisV2(int remainingSubscriptions) {
            this.remainingSubscriptions = remainingSubscriptions;
        }

        public int getNumberOfSubscribeToShardInvocations() {
            return this.requests.size();
        }

        public StartingPosition getStartingPositionForSubscription(int subscriptionIndex) {
            Assert.assertTrue((subscriptionIndex >= 0 ? 1 : 0) != 0);
            Assert.assertTrue((subscriptionIndex < this.getNumberOfSubscribeToShardInvocations() ? 1 : 0) != 0);
            return this.requests.get(subscriptionIndex).startingPosition();
        }

        @Override
        public CompletableFuture<Void> subscribeToShard(SubscribeToShardRequest request, SubscribeToShardResponseHandler responseHandler) {
            this.requests.add(request);
            return CompletableFuture.supplyAsync(() -> {
                responseHandler.responseReceived(SubscribeToShardResponse.builder().build());
                responseHandler.onEventStream(subscriber -> {
                    List<Object> eventsToSend;
                    if (this.remainingSubscriptions > 0) {
                        eventsToSend = this.getEventsToSend();
                        --this.remainingSubscriptions;
                    } else {
                        eventsToSend = Collections.singletonList(SubscribeToShardEvent.builder().millisBehindLatest(Long.valueOf(0L)).continuationSequenceNumber(null).build());
                    }
                    Subscription subscription = (Subscription)Mockito.mock(Subscription.class);
                    Iterator<Object> iterator = eventsToSend.iterator();
                    ((Subscription)Mockito.doAnswer(a -> {
                        if (!iterator.hasNext()) {
                            this.completeSubscription((Subscriber<? super SubscribeToShardEventStream>)subscriber);
                        } else {
                            subscriber.onNext(iterator.next());
                        }
                        return null;
                    }).when((Object)subscription)).request(ArgumentMatchers.anyLong());
                    subscriber.onSubscribe(subscription);
                });
                return null;
            });
        }

        void completeSubscription(Subscriber<? super SubscribeToShardEventStream> subscriber) {
            subscriber.onComplete();
        }

        abstract List<SubscribeToShardEvent> getEventsToSend();
    }

    public static class SingleShardFanOutKinesisV2
    extends AbstractSingleShardFanOutKinesisV2 {
        private final int batchesPerSubscription;
        private final int recordsPerBatch;
        private final long millisBehindLatest;
        private final int totalRecords;
        private final int aggregationFactor;
        private final AtomicInteger sequenceNumber = new AtomicInteger();

        private SingleShardFanOutKinesisV2(Builder builder) {
            super(builder.getSubscriptionCount());
            this.batchesPerSubscription = builder.batchesPerSubscription;
            this.recordsPerBatch = builder.recordsPerBatch;
            this.millisBehindLatest = builder.millisBehindLatest;
            this.aggregationFactor = builder.aggregationFactor;
            this.totalRecords = builder.getTotalRecords();
        }

        @Override
        List<SubscribeToShardEvent> getEventsToSend() {
            ArrayList<SubscribeToShardEvent> events = new ArrayList<SubscribeToShardEvent>();
            SubscribeToShardEvent.Builder eventBuilder = SubscribeToShardEvent.builder().millisBehindLatest(Long.valueOf(this.millisBehindLatest));
            for (int batchIndex = 0; batchIndex < this.batchesPerSubscription && this.sequenceNumber.get() < this.totalRecords; ++batchIndex) {
                ArrayList<Record> records = new ArrayList<Record>();
                for (int i = 0; i < this.recordsPerBatch; ++i) {
                    Record record = this.aggregationFactor == 1 ? FakeKinesisFanOutBehavioursFactory.createRecord(this.sequenceNumber) : FakeKinesisFanOutBehavioursFactory.createAggregatedRecord(this.aggregationFactor, this.sequenceNumber);
                    records.add(record);
                }
                eventBuilder.records(records);
                String continuation = this.sequenceNumber.get() < this.totalRecords ? String.valueOf(this.sequenceNumber.get() + 1) : null;
                eventBuilder.continuationSequenceNumber(continuation);
                events.add((SubscribeToShardEvent)eventBuilder.build());
            }
            return events;
        }

        public static class Builder {
            private int batchesPerSubscription = 100000;
            private int recordsPerBatch = 10;
            private long millisBehindLatest = 0L;
            private int batchCount = 1;
            private int aggregationFactor = 1;

            public int getSubscriptionCount() {
                return (int)Math.ceil((double)this.getTotalRecords() / (double)this.batchesPerSubscription / (double)this.recordsPerBatch);
            }

            public int getTotalRecords() {
                return this.batchCount * this.recordsPerBatch;
            }

            public Builder withBatchesPerSubscription(int batchesPerSubscription) {
                this.batchesPerSubscription = batchesPerSubscription;
                return this;
            }

            public Builder withRecordsPerBatch(int recordsPerBatch) {
                this.recordsPerBatch = recordsPerBatch;
                return this;
            }

            public Builder withBatchCount(int batchCount) {
                this.batchCount = batchCount;
                return this;
            }

            public Builder withMillisBehindLatest(long millisBehindLatest) {
                this.millisBehindLatest = millisBehindLatest;
                return this;
            }

            public Builder withAggregationFactor(int aggregationFactor) {
                this.aggregationFactor = aggregationFactor;
                return this;
            }

            public SingleShardFanOutKinesisV2 build() {
                return new SingleShardFanOutKinesisV2(this);
            }
        }
    }

    private static class MultipleEventsForSingleRequest
    extends AbstractSingleShardFanOutKinesisV2 {
        private MultipleEventsForSingleRequest() {
            super(1);
        }

        @Override
        List<SubscribeToShardEvent> getEventsToSend() {
            return FakeKinesisFanOutBehavioursFactory.generateEvents(2, new AtomicInteger(1));
        }

        @Override
        void completeSubscription(Subscriber<? super SubscribeToShardEventStream> subscriber) {
            FakeKinesisFanOutBehavioursFactory.generateEvents(3, new AtomicInteger(2)).forEach(arg_0 -> subscriber.onNext(arg_0));
            super.completeSubscription(subscriber);
        }
    }

    private static class SingletonEventFanOutKinesisV2
    extends AbstractSingleShardFanOutKinesisV2 {
        private final SubscribeToShardEvent event;

        private SingletonEventFanOutKinesisV2(SubscribeToShardEvent event) {
            super(1);
            this.event = event;
        }

        @Override
        List<SubscribeToShardEvent> getEventsToSend() {
            return Collections.singletonList(this.event);
        }
    }

    private static class ExceptionalKinesisV2
    extends KinesisProxyV2InterfaceAdapter {
        private final RuntimeException exception;

        private ExceptionalKinesisV2(RuntimeException exception) {
            this.exception = exception;
        }

        @Override
        public CompletableFuture<Void> subscribeToShard(SubscribeToShardRequest request, SubscribeToShardResponseHandler responseHandler) {
            responseHandler.exceptionOccurred((Throwable)this.exception);
            return CompletableFuture.completedFuture(null);
        }
    }

    public static class SubscriptionErrorKinesisV2
    extends AbstractSingleShardFanOutKinesisV2 {
        public static final int NUMBER_OF_SUBSCRIPTIONS = 5;
        public static final int NUMBER_OF_EVENTS_PER_SUBSCRIPTION = 5;
        private final Throwable[] throwables;
        AtomicInteger sequenceNumber = new AtomicInteger();

        private SubscriptionErrorKinesisV2(Throwable ... throwables) {
            super(5);
            this.throwables = throwables;
        }

        @Override
        List<SubscribeToShardEvent> getEventsToSend() {
            return FakeKinesisFanOutBehavioursFactory.generateEvents(5, this.sequenceNumber);
        }

        @Override
        void completeSubscription(Subscriber<? super SubscribeToShardEventStream> subscriber) {
            try {
                Thread.sleep(200L);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            for (Throwable throwable : this.throwables) {
                subscriber.onError(throwable);
            }
        }
    }

    private static class AlternatingSubscriptionErrorKinesisV2
    extends SubscriptionErrorKinesisV2 {
        int index = 0;

        private AlternatingSubscriptionErrorKinesisV2(Throwable throwable) {
            super(new Throwable[]{throwable});
        }

        @Override
        void completeSubscription(Subscriber<? super SubscribeToShardEventStream> subscriber) {
            if (this.index++ % 2 == 0) {
                super.completeSubscription(subscriber);
            } else {
                subscriber.onComplete();
            }
        }
    }

    private static class FailsToAcquireSubscriptionKinesis
    extends KinesisProxyV2InterfaceAdapter {
        private FailsToAcquireSubscriptionKinesis() {
        }

        @Override
        public CompletableFuture<Void> subscribeToShard(SubscribeToShardRequest request, SubscribeToShardResponseHandler responseHandler) {
            return CompletableFuture.supplyAsync(() -> null);
        }
    }
}

