/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.source.testutils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.connector.kafka.source.testutils.KafkaPartitionDataWriter;
import org.apache.flink.connectors.test.common.external.ExternalContext;
import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;

public class KafkaSingleTopicExternalContext
implements ExternalContext<String> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSingleTopicExternalContext.class);
    private static final String TOPIC_NAME_PREFIX = "kafka-single-topic";
    private static final int DEFAULT_TIMEOUT = 30;
    private static final int NUM_RECORDS_UPPER_BOUND = 500;
    private static final int NUM_RECORDS_LOWER_BOUND = 100;
    protected String bootstrapServers;
    private final String topicName;
    private final Map<Integer, SourceSplitDataWriter<String>> partitionToSplitWriter = new HashMap<Integer, SourceSplitDataWriter<String>>();
    private int numSplits = 0;
    protected final AdminClient kafkaAdminClient;

    public KafkaSingleTopicExternalContext(String bootstrapServers) {
        this.bootstrapServers = bootstrapServers;
        this.topicName = "kafka-single-topic-" + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
        this.kafkaAdminClient = this.createAdminClient();
    }

    protected void createTopic(String topicName, int numPartitions, short replicationFactor) {
        LOG.debug("Creating new Kafka topic {} with {} partitions and {} replicas", new Object[]{topicName, numPartitions, replicationFactor});
        NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor);
        try {
            this.kafkaAdminClient.createTopics(Collections.singletonList(newTopic)).all().get(30L, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            throw new RuntimeException(String.format("Cannot create topic '%s'", topicName), e);
        }
    }

    protected void deleteTopic(String topicName) {
        block2: {
            LOG.debug("Deleting Kafka topic {}", (Object)topicName);
            try {
                this.kafkaAdminClient.deleteTopics(Collections.singletonList(topicName)).all().get(30L, TimeUnit.SECONDS);
            }
            catch (Exception e) {
                if (!(ExceptionUtils.getRootCause((Throwable)e) instanceof UnknownTopicOrPartitionException)) break block2;
                throw new RuntimeException(String.format("Cannot delete topic '%s'", topicName), e);
            }
        }
    }

    private AdminClient createAdminClient() {
        Properties config = new Properties();
        config.setProperty("bootstrap.servers", this.bootstrapServers);
        return AdminClient.create((Properties)config);
    }

    public Source<String, ?, ?> createSource(Boundedness boundedness) {
        KafkaSourceBuilder builder = KafkaSource.builder();
        if (boundedness == Boundedness.BOUNDED) {
            builder = builder.setBounded(OffsetsInitializer.latest());
        }
        return builder.setGroupId("flink-kafka-test").setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class)).setTopics(new String[]{this.topicName}).setBootstrapServers(this.bootstrapServers).build();
    }

    public SourceSplitDataWriter<String> createSourceSplitDataWriter() {
        if (this.numSplits == 0) {
            this.createTopic(this.topicName, 1, (short)1);
            ++this.numSplits;
        } else {
            LOG.debug("Creating new partition for topic {}", (Object)this.topicName);
            this.kafkaAdminClient.createPartitions(Collections.singletonMap(this.topicName, NewPartitions.increaseTo((int)(++this.numSplits))));
        }
        KafkaPartitionDataWriter splitWriter = new KafkaPartitionDataWriter(this.getKafkaProducerProperties(this.numSplits - 1), new TopicPartition(this.topicName, this.numSplits - 1));
        this.partitionToSplitWriter.put(this.numSplits - 1, splitWriter);
        return splitWriter;
    }

    public List<String> generateTestData(int splitIndex, long seed) {
        Random random = new Random(seed);
        ArrayList<String> randomStringRecords = new ArrayList<String>();
        int recordNum = random.nextInt(400) + 100;
        for (int i = 0; i < recordNum; ++i) {
            int stringLength = random.nextInt(50) + 1;
            randomStringRecords.add(this.generateRandomString(splitIndex, stringLength, random));
        }
        return randomStringRecords;
    }

    private String generateRandomString(int splitIndex, int length, Random random) {
        String alphaNumericString = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
        StringBuilder sb = new StringBuilder().append(splitIndex).append("-");
        for (int i = 0; i < length; ++i) {
            sb.append(alphaNumericString.charAt(random.nextInt(alphaNumericString.length())));
        }
        return sb.toString();
    }

    protected Properties getKafkaProducerProperties(int producerId) {
        Properties kafkaProducerProperties = new Properties();
        kafkaProducerProperties.setProperty("bootstrap.servers", this.bootstrapServers);
        kafkaProducerProperties.setProperty("client.id", String.join((CharSequence)"-", "flink-kafka-split-writer", Integer.toString(producerId), Long.toString(ThreadLocalRandom.current().nextLong(Long.MAX_VALUE))));
        kafkaProducerProperties.setProperty("key.serializer", ByteArraySerializer.class.getName());
        kafkaProducerProperties.setProperty("value.serializer", ByteArraySerializer.class.getName());
        return kafkaProducerProperties;
    }

    public void close() {
        this.deleteTopic(this.topicName);
        this.partitionToSplitWriter.forEach((partitionId, splitWriter) -> {
            try {
                splitWriter.close();
            }
            catch (Exception e) {
                this.kafkaAdminClient.close();
                throw new RuntimeException("Cannot close split writer", e);
            }
        });
        this.partitionToSplitWriter.clear();
        this.kafkaAdminClient.close();
    }

    public String toString() {
        return "Single-topic Kafka";
    }

    public static class Factory
    implements ExternalContext.Factory<String> {
        private final KafkaContainer kafkaContainer;

        public Factory(KafkaContainer kafkaContainer) {
            this.kafkaContainer = kafkaContainer;
        }

        protected String getBootstrapServer() {
            String internalEndpoints = this.kafkaContainer.getNetworkAliases().stream().map(host -> String.join((CharSequence)":", host, Integer.toString(9092))).collect(Collectors.joining(","));
            return String.join((CharSequence)",", this.kafkaContainer.getBootstrapServers(), internalEndpoints);
        }

        public ExternalContext<String> createExternalContext() {
            return new KafkaSingleTopicExternalContext(this.getBootstrapServer());
        }
    }
}

