/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kinesis.testutils;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.GetRecordsResult;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.HashKeyRange;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Record;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.SequenceNumberRange;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Shard;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
import org.apache.flink.util.Preconditions;

public class FakeKinesisBehavioursFactory {
    public static KinesisProxyInterface noShardsFoundForRequestedStreamsBehaviour() {
        return new KinesisProxyInterface(){

            public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) {
                return new GetShardListResult();
            }

            public String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object startingMarker) {
                return null;
            }

            public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
                return null;
            }
        };
    }

    public static KinesisProxyInterface nonReshardedStreamsBehaviour(Map<String, Integer> streamsToShardCount) {
        return new NonReshardedStreamsKinesis(streamsToShardCount);
    }

    public static KinesisProxyInterface emptyShard(int numberOfIterations) {
        return new SingleShardEmittingZeroRecords(numberOfIterations);
    }

    public static KinesisProxyInterface totalNumOfRecordsAfterNumOfGetRecordsCalls(int numOfRecords, int numOfGetRecordsCalls, long millisBehindLatest) {
        return new SingleShardEmittingFixNumOfRecordsKinesis(numOfRecords, numOfGetRecordsCalls, millisBehindLatest);
    }

    public static KinesisProxyInterface totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(int numOfRecords, int numOfGetRecordsCall, int orderOfCallToExpire, long millisBehindLatest) {
        return new SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis(numOfRecords, numOfGetRecordsCall, orderOfCallToExpire, millisBehindLatest);
    }

    public static KinesisProxyInterface initialNumOfRecordsAfterNumOfGetRecordsCallsWithAdaptiveReads(int numOfRecords, int numOfGetRecordsCalls, long millisBehindLatest) {
        return new SingleShardEmittingAdaptiveNumOfRecordsKinesis(numOfRecords, numOfGetRecordsCalls, millisBehindLatest);
    }

    public static KinesisProxyInterface aggregatedRecords(int numOfAggregatedRecords, int numOfChildRecords, int numOfGetRecordsCalls) {
        return new SingleShardEmittingAggregatedRecordsKinesis(numOfAggregatedRecords, numOfChildRecords, numOfGetRecordsCalls);
    }

    public static KinesisProxyInterface blockingQueueGetRecords(Map<String, List<BlockingQueue<String>>> streamsToShardQueues) {
        return new BlockingQueueKinesis(streamsToShardQueues);
    }

    private static class BlockingQueueKinesis
    implements KinesisProxyInterface {
        private final Map<String, List<StreamShardHandle>> streamsWithListOfShards = new HashMap<String, List<StreamShardHandle>>();
        private final Map<String, BlockingQueue<String>> shardIteratorToQueueMap = new HashMap<String, BlockingQueue<String>>();

        private static String getShardIterator(StreamShardHandle shardHandle) {
            return shardHandle.getStreamName() + "-" + shardHandle.getShard().getShardId();
        }

        public BlockingQueueKinesis(Map<String, List<BlockingQueue<String>>> streamsToShardCount) {
            for (Map.Entry<String, List<BlockingQueue<String>>> streamToShardQueues : streamsToShardCount.entrySet()) {
                String streamName = streamToShardQueues.getKey();
                int shardCount = streamToShardQueues.getValue().size();
                if (shardCount == 0) continue;
                ArrayList<StreamShardHandle> shardsOfStream = new ArrayList<StreamShardHandle>(shardCount);
                for (int i = 0; i < shardCount; ++i) {
                    StreamShardHandle shardHandle = new StreamShardHandle(streamName, new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i)).withSequenceNumberRange(new SequenceNumberRange().withStartingSequenceNumber("0")).withHashKeyRange(new HashKeyRange().withStartingHashKey("0").withEndingHashKey("0")));
                    shardsOfStream.add(shardHandle);
                    this.shardIteratorToQueueMap.put(BlockingQueueKinesis.getShardIterator(shardHandle), streamToShardQueues.getValue().get(i));
                }
                this.streamsWithListOfShards.put(streamName, shardsOfStream);
            }
        }

        public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) {
            GetShardListResult result = new GetShardListResult();
            for (Map.Entry<String, List<StreamShardHandle>> streamsWithShards : this.streamsWithListOfShards.entrySet()) {
                String streamName = streamsWithShards.getKey();
                for (StreamShardHandle shard : streamsWithShards.getValue()) {
                    if (streamNamesWithLastSeenShardIds.get(streamName) == null) {
                        result.addRetrievedShardToStream(streamName, shard);
                        continue;
                    }
                    if (StreamShardHandle.compareShardIds((String)shard.getShard().getShardId(), (String)streamNamesWithLastSeenShardIds.get(streamName)) <= 0) continue;
                    result.addRetrievedShardToStream(streamName, shard);
                }
            }
            return result;
        }

        public String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object startingMarker) {
            return BlockingQueueKinesis.getShardIterator(shard);
        }

        public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
            BlockingQueue queue = (BlockingQueue)Preconditions.checkNotNull(this.shardIteratorToQueueMap.get(shardIterator), (String)"no queue for iterator %s", (Object[])new Object[]{shardIterator});
            List<Object> records = Collections.emptyList();
            try {
                String data = (String)queue.take();
                Record record = new Record().withData(ByteBuffer.wrap(data.getBytes(ConfigConstants.DEFAULT_CHARSET))).withPartitionKey(UUID.randomUUID().toString()).withApproximateArrivalTimestamp(new Date(System.currentTimeMillis())).withSequenceNumber(String.valueOf(0));
                records = Collections.singletonList(record);
            }
            catch (InterruptedException e) {
                shardIterator = null;
            }
            return new GetRecordsResult().withRecords(records).withMillisBehindLatest(Long.valueOf(0L)).withNextShardIterator(shardIterator);
        }
    }

    private static class NonReshardedStreamsKinesis
    implements KinesisProxyInterface {
        private final Map<String, List<StreamShardHandle>> streamsWithListOfShards = new HashMap<String, List<StreamShardHandle>>();

        public NonReshardedStreamsKinesis(Map<String, Integer> streamsToShardCount) {
            for (Map.Entry<String, Integer> streamToShardCount : streamsToShardCount.entrySet()) {
                String streamName = streamToShardCount.getKey();
                int shardCount = streamToShardCount.getValue();
                if (shardCount == 0) continue;
                ArrayList<StreamShardHandle> shardsOfStream = new ArrayList<StreamShardHandle>(shardCount);
                for (int i = 0; i < shardCount; ++i) {
                    shardsOfStream.add(TestUtils.createDummyStreamShardHandle(streamName, KinesisShardIdGenerator.generateFromShardOrder(i)));
                }
                this.streamsWithListOfShards.put(streamName, shardsOfStream);
            }
        }

        public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) {
            GetShardListResult result = new GetShardListResult();
            for (Map.Entry<String, List<StreamShardHandle>> streamsWithShards : this.streamsWithListOfShards.entrySet()) {
                String streamName = streamsWithShards.getKey();
                for (StreamShardHandle shard : streamsWithShards.getValue()) {
                    if (streamNamesWithLastSeenShardIds.get(streamName) == null) {
                        result.addRetrievedShardToStream(streamName, shard);
                        continue;
                    }
                    if (NonReshardedStreamsKinesis.compareShardIds(shard.getShard().getShardId(), streamNamesWithLastSeenShardIds.get(streamName)) <= 0) continue;
                    result.addRetrievedShardToStream(streamName, shard);
                }
            }
            return result;
        }

        public String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object startingMarker) {
            return null;
        }

        public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
            return null;
        }

        private static int compareShardIds(String firstShardId, String secondShardId) {
            if (!NonReshardedStreamsKinesis.isValidShardId(firstShardId)) {
                throw new IllegalArgumentException("The first shard id has invalid format.");
            }
            if (!NonReshardedStreamsKinesis.isValidShardId(secondShardId)) {
                throw new IllegalArgumentException("The second shard id has invalid format.");
            }
            return Long.compare(Long.parseLong(firstShardId.substring(8)), Long.parseLong(secondShardId.substring(8)));
        }

        private static boolean isValidShardId(String shardId) {
            if (shardId == null) {
                return false;
            }
            return shardId.matches("^shardId-\\d{12}");
        }
    }

    private static abstract class SingleShardEmittingKinesis
    implements KinesisProxyInterface {
        private final long millisBehindLatest;
        private final Map<String, List<Record>> shardItrToRecordBatch;

        protected SingleShardEmittingKinesis(Map<String, List<Record>> shardItrToRecordBatch) {
            this(shardItrToRecordBatch, 0L);
        }

        protected SingleShardEmittingKinesis(Map<String, List<Record>> shardItrToRecordBatch, long millisBehindLatest) {
            this.millisBehindLatest = millisBehindLatest;
            this.shardItrToRecordBatch = shardItrToRecordBatch;
        }

        public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
            int index = Integer.parseInt(shardIterator);
            String nextShardIterator = index == this.shardItrToRecordBatch.size() - 1 ? null : String.valueOf(index + 1);
            return new GetRecordsResult().withRecords((Collection)this.shardItrToRecordBatch.get(shardIterator)).withNextShardIterator(nextShardIterator).withMillisBehindLatest(Long.valueOf(this.millisBehindLatest));
        }

        public String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object startingMarker) {
            return "0";
        }

        public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) {
            return null;
        }
    }

    private static class SingleShardEmittingAggregatedRecordsKinesis
    extends SingleShardEmittingKinesis {
        public SingleShardEmittingAggregatedRecordsKinesis(int numOfAggregatedRecords, int numOfChildRecords, int numOfGetRecordsCalls) {
            super(SingleShardEmittingAggregatedRecordsKinesis.initShardItrToRecordBatch(numOfAggregatedRecords, numOfChildRecords, numOfGetRecordsCalls));
        }

        private static Map<String, List<Record>> initShardItrToRecordBatch(int numOfAggregatedRecords, int numOfChildRecords, int numOfGetRecordsCalls) {
            HashMap<String, List<Record>> shardToRecordBatch = new HashMap<String, List<Record>>();
            AtomicInteger sequenceNumber = new AtomicInteger();
            for (int batch = 0; batch < numOfGetRecordsCalls; ++batch) {
                List<Record> recordBatch = TestUtils.createAggregatedRecordBatch(numOfAggregatedRecords, numOfChildRecords, sequenceNumber);
                shardToRecordBatch.put(String.valueOf(batch), recordBatch);
            }
            return shardToRecordBatch;
        }
    }

    private static class SingleShardEmittingAdaptiveNumOfRecordsKinesis
    extends SingleShardEmittingKinesis {
        protected static long averageRecordSizeBytes = 0L;
        private static final long KINESIS_SHARD_BYTES_PER_SECOND_LIMIT = 0x200000L;

        public SingleShardEmittingAdaptiveNumOfRecordsKinesis(int numOfRecords, int numOfGetRecordsCalls, long millisBehindLatest) {
            super(SingleShardEmittingAdaptiveNumOfRecordsKinesis.initShardItrToRecordBatch(numOfRecords, numOfGetRecordsCalls), millisBehindLatest);
        }

        private static Map<String, List<Record>> initShardItrToRecordBatch(int numOfRecords, int numOfGetRecordsCalls) {
            HashMap<String, List<Record>> shardItrToRecordBatch = new HashMap<String, List<Record>>();
            int numOfAlreadyPartitionedRecords = 0;
            int numOfRecordsPerBatch = numOfRecords;
            for (int batch = 0; batch < numOfGetRecordsCalls; ++batch) {
                shardItrToRecordBatch.put(String.valueOf(batch), SingleShardEmittingAdaptiveNumOfRecordsKinesis.createRecordBatchWithRange(numOfAlreadyPartitionedRecords, numOfAlreadyPartitionedRecords + numOfRecordsPerBatch));
                numOfAlreadyPartitionedRecords += numOfRecordsPerBatch;
                numOfRecordsPerBatch = (int)(0x200000L / (averageRecordSizeBytes * 1000L / 200L));
            }
            return shardItrToRecordBatch;
        }

        private static List<Record> createRecordBatchWithRange(int min, int max) {
            LinkedList<Record> batch = new LinkedList<Record>();
            long sumRecordBatchBytes = 0L;
            String data = SingleShardEmittingAdaptiveNumOfRecordsKinesis.createDataSize(10240L);
            for (int i = min; i < max; ++i) {
                Record record = new Record().withData(ByteBuffer.wrap(data.getBytes(ConfigConstants.DEFAULT_CHARSET))).withPartitionKey(UUID.randomUUID().toString()).withApproximateArrivalTimestamp(new Date(System.currentTimeMillis())).withSequenceNumber(String.valueOf(i));
                batch.add(record);
                sumRecordBatchBytes += (long)record.getData().remaining();
            }
            if (batch.size() != 0) {
                averageRecordSizeBytes = sumRecordBatchBytes / (long)batch.size();
            }
            return batch;
        }

        private static String createDataSize(long msgSize) {
            char[] data = new char[(int)msgSize];
            return new String(data);
        }
    }

    private static class SingleShardEmittingFixNumOfRecordsKinesis
    implements KinesisProxyInterface {
        protected final int totalNumOfGetRecordsCalls;
        protected final int totalNumOfRecords;
        private final long millisBehindLatest;
        protected final Map<String, List<Record>> shardItrToRecordBatch;

        public SingleShardEmittingFixNumOfRecordsKinesis(int numOfRecords, int numOfGetRecordsCalls, long millistBehindLatest) {
            this.totalNumOfRecords = numOfRecords;
            this.totalNumOfGetRecordsCalls = numOfGetRecordsCalls;
            this.millisBehindLatest = millistBehindLatest;
            this.shardItrToRecordBatch = new HashMap<String, List<Record>>();
            int numOfAlreadyPartitionedRecords = 0;
            int numOfRecordsPerBatch = numOfRecords / numOfGetRecordsCalls + 1;
            for (int batch = 0; batch < this.totalNumOfGetRecordsCalls; ++batch) {
                if (batch != this.totalNumOfGetRecordsCalls - 1) {
                    this.shardItrToRecordBatch.put(String.valueOf(batch), SingleShardEmittingFixNumOfRecordsKinesis.createRecordBatchWithRange(numOfAlreadyPartitionedRecords, numOfAlreadyPartitionedRecords + numOfRecordsPerBatch));
                    numOfAlreadyPartitionedRecords += numOfRecordsPerBatch;
                    continue;
                }
                this.shardItrToRecordBatch.put(String.valueOf(batch), SingleShardEmittingFixNumOfRecordsKinesis.createRecordBatchWithRange(numOfAlreadyPartitionedRecords, this.totalNumOfRecords));
            }
        }

        public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
            return new GetRecordsResult().withRecords((Collection)this.shardItrToRecordBatch.get(shardIterator)).withMillisBehindLatest(Long.valueOf(this.millisBehindLatest)).withNextShardIterator(Integer.parseInt(shardIterator) == this.totalNumOfGetRecordsCalls - 1 ? null : String.valueOf(Integer.parseInt(shardIterator) + 1));
        }

        public String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object startingMarker) {
            return "0";
        }

        public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) {
            return null;
        }

        public static List<Record> createRecordBatchWithRange(int min, int max) {
            LinkedList<Record> batch = new LinkedList<Record>();
            for (int i = min; i < max; ++i) {
                batch.add(new Record().withData(ByteBuffer.wrap(String.valueOf(i).getBytes(ConfigConstants.DEFAULT_CHARSET))).withPartitionKey(UUID.randomUUID().toString()).withApproximateArrivalTimestamp(new Date(System.currentTimeMillis())).withSequenceNumber(String.valueOf(i)));
            }
            return batch;
        }
    }

    private static class SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis
    extends SingleShardEmittingFixNumOfRecordsKinesis {
        private final long millisBehindLatest;
        private final int orderOfCallToExpire;
        private boolean expiredOnceAlready = false;
        private boolean expiredIteratorRefreshed = false;

        public SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis(int numOfRecords, int numOfGetRecordsCalls, int orderOfCallToExpire, long millisBehindLatest) {
            super(numOfRecords, numOfGetRecordsCalls, millisBehindLatest);
            Preconditions.checkArgument((orderOfCallToExpire <= numOfGetRecordsCalls ? 1 : 0) != 0, (Object)"can not test unexpected expired iterator if orderOfCallToExpire is larger than numOfGetRecordsCalls");
            this.millisBehindLatest = millisBehindLatest;
            this.orderOfCallToExpire = orderOfCallToExpire;
        }

        @Override
        public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
            if (Integer.parseInt(shardIterator) == this.orderOfCallToExpire - 1 && !this.expiredOnceAlready) {
                this.expiredOnceAlready = true;
                throw new ExpiredIteratorException("Artificial expired shard iterator");
            }
            if (this.expiredOnceAlready && !this.expiredIteratorRefreshed) {
                throw new RuntimeException("expired shard iterator was not refreshed on the next getRecords() call");
            }
            return new GetRecordsResult().withRecords((Collection)this.shardItrToRecordBatch.get(shardIterator)).withMillisBehindLatest(Long.valueOf(this.millisBehindLatest)).withNextShardIterator(Integer.parseInt(shardIterator) == this.totalNumOfGetRecordsCalls - 1 ? null : String.valueOf(Integer.parseInt(shardIterator) + 1));
        }

        @Override
        public String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object startingMarker) {
            if (!this.expiredOnceAlready) {
                return "0";
            }
            this.expiredIteratorRefreshed = true;
            return String.valueOf(this.orderOfCallToExpire - 1);
        }
    }

    private static class SingleShardEmittingZeroRecords
    implements KinesisProxyInterface {
        private int remainingIterators;

        private SingleShardEmittingZeroRecords(int remainingIterators) {
            this.remainingIterators = remainingIterators;
        }

        public String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object startingMarker) throws InterruptedException {
            return String.valueOf(this.remainingIterators--);
        }

        public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException {
            return new GetRecordsResult().withMillisBehindLatest(Long.valueOf(0L)).withNextShardIterator(this.remainingIterators == 0 ? null : String.valueOf(this.remainingIterators--));
        }

        public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) throws InterruptedException {
            return null;
        }
    }
}

