/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kinesis.testutils;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.kinesis.shaded.com.amazonaws.AmazonClientException;
import org.apache.flink.kinesis.shaded.com.amazonaws.auth.AWSCredentialsProvider;
import org.apache.flink.kinesis.shaded.com.amazonaws.client.builder.AwsClientBuilder;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesis;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.GetRecordsResult;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.PutRecordsRequest;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.PutRecordsResult;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.PutRecordsResultEntry;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Record;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KinesisPubsubClient {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisPubsubClient.class);
    private final AmazonKinesis kinesisClient;
    private final Properties properties;

    public KinesisPubsubClient(Properties properties) {
        this.kinesisClient = KinesisPubsubClient.createClientWithCredentials(properties);
        this.properties = properties;
    }

    public void createTopic(String stream, int shards, Properties props) throws Exception {
        try {
            this.kinesisClient.describeStream(stream);
            this.kinesisClient.deleteStream(stream);
        }
        catch (ResourceNotFoundException resourceNotFoundException) {
            // empty catch block
        }
        this.kinesisClient.createStream(stream, Integer.valueOf(shards));
        Deadline deadline = Deadline.fromNow((Duration)Duration.ofSeconds(5L));
        while (deadline.hasTimeLeft()) {
            try {
                Thread.sleep(250L);
                if (this.kinesisClient.describeStream(stream).getStreamDescription().getShards().size() == shards) break;
            }
            catch (ResourceNotFoundException resourceNotFoundException) {}
        }
    }

    public void sendMessage(String topic, String ... messages) {
        this.sendMessage(topic, (byte[][])Arrays.stream(messages).map(String::getBytes).toArray(x$0 -> new byte[x$0][]));
    }

    public void sendMessage(String topic, byte[] ... messages) {
        for (List partition : Lists.partition(Arrays.asList(messages), (int)500)) {
            List entries = partition.stream().map(msg -> new PutRecordsRequestEntry().withPartitionKey("fakePartitionKey").withData(ByteBuffer.wrap(msg))).collect(Collectors.toList());
            PutRecordsRequest requests = new PutRecordsRequest().withStreamName(topic).withRecords(entries);
            PutRecordsResult putRecordResult = this.kinesisClient.putRecords(requests);
            for (PutRecordsResultEntry result : putRecordResult.getRecords()) {
                LOG.debug("added record: {}", (Object)result.getSequenceNumber());
            }
        }
    }

    public List<String> readAllMessages(String streamName) throws Exception {
        return this.readAllMessages(streamName, String::new);
    }

    public <T> List<T> readAllMessages(String streamName, Function<byte[], T> deserialiser) throws Exception {
        KinesisProxyInterface kinesisProxy = KinesisProxy.create((Properties)this.properties);
        HashMap<String, Object> streamNamesWithLastSeenShardIds = new HashMap<String, Object>();
        streamNamesWithLastSeenShardIds.put(streamName, null);
        GetShardListResult shardListResult = kinesisProxy.getShardList(streamNamesWithLastSeenShardIds);
        int maxRecordsToFetch = 10;
        ArrayList<T> messages = new ArrayList<T>();
        for (StreamShardHandle ssh : shardListResult.getRetrievedShardListOfStream(streamName)) {
            String shardIterator = kinesisProxy.getShardIterator(ssh, "TRIM_HORIZON", null);
            GetRecordsResult getRecordsResult = kinesisProxy.getRecords(shardIterator, maxRecordsToFetch);
            List aggregatedRecords = getRecordsResult.getRecords();
            for (Record record : aggregatedRecords) {
                messages.add(deserialiser.apply(record.getData().array()));
            }
        }
        return messages;
    }

    private static AmazonKinesis createClientWithCredentials(Properties props) throws AmazonClientException {
        AWSCredentialsProvider credentialsProvider = AWSUtil.getCredentialsProvider((Properties)props);
        return (AmazonKinesis)((AmazonKinesisClientBuilder)((AmazonKinesisClientBuilder)AmazonKinesisClientBuilder.standard().withCredentials(credentialsProvider)).withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(props.getProperty("aws.endpoint"), "us-east-1"))).build();
    }
}

