package org.apache.flink.connector.kafka.sink.testutils;

import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.class */
public class KafkaSinkExternalContext implements DataStreamSinkV2ExternalContext<String> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSinkExternalContext.class);
    private static final String TOPIC_NAME_PREFIX = "kafka-single-topic";
    private static final int RANDOM_STRING_MAX_LENGTH = 50;
    private static final int NUM_RECORDS_UPPER_BOUND = 500;
    private static final int NUM_RECORDS_LOWER_BOUND = 100;
    private static final int DEFAULT_TRANSACTION_TIMEOUT_IN_MS = 900000;
    protected String bootstrapServers;
    private List<URL> connectorJarPaths;
    private final List<ExternalSystemDataReader<String>> readers = new ArrayList();
    protected int numSplits = 0;
    protected final String topicName = "kafka-single-topic-" + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
    protected final AdminClient kafkaAdminClient = createAdminClient();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.connector.kafka.sink.testutils.KafkaSinkExternalContext$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$streaming$api$CheckpointingMode = new int[CheckpointingMode.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$streaming$api$CheckpointingMode[CheckpointingMode.EXACTLY_ONCE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$api$CheckpointingMode[CheckpointingMode.AT_LEAST_ONCE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public KafkaSinkExternalContext(String str, List<URL> list) {
        this.bootstrapServers = str;
        this.connectorJarPaths = list;
    }

    private void createTopic(String str, int i, short s) {
        LOG.debug("Creating new Kafka topic {} with {} partitions and {} replicas", new Object[]{str, Integer.valueOf(i), Short.valueOf(s)});
        try {
            this.kafkaAdminClient.createTopics(Collections.singletonList(new NewTopic(str, i, s))).all().get();
        } catch (Exception e) {
            throw new RuntimeException(String.format("Cannot create topic '%s'", str), e);
        }
    }

    private void deleteTopic(String str) {
        LOG.debug("Deleting Kafka topic {}", str);
        try {
            this.kafkaAdminClient.deleteTopics(Collections.singletonList(str)).all().get();
        } catch (Exception e) {
            if (ExceptionUtils.getRootCause(e) instanceof UnknownTopicOrPartitionException) {
                throw new RuntimeException(String.format("Cannot delete unknown Kafka topic '%s'", str), e);
            }
        }
    }

    private AdminClient createAdminClient() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", this.bootstrapServers);
        return AdminClient.create(properties);
    }

    public Sink<String> createSink(TestingSinkSettings testingSinkSettings) {
        if (!topicExists(this.topicName)) {
            createTopic(this.topicName, 4, (short) 1);
        }
        KafkaSinkBuilder builder = KafkaSink.builder();
        Properties properties = new Properties();
        properties.put("transaction.timeout.ms", Integer.valueOf(DEFAULT_TRANSACTION_TIMEOUT_IN_MS));
        builder.setBootstrapServers(this.bootstrapServers).setDeliveryGuarantee(toDeliveryGuarantee(testingSinkSettings.getCheckpointingMode())).setTransactionalIdPrefix("testingFramework").setKafkaProducerConfig(properties).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(this.topicName).setValueSerializationSchema(new SimpleStringSchema()).build());
        return builder.build();
    }

    public ExternalSystemDataReader<String> createSinkDataReader(TestingSinkSettings testingSinkSettings) {
        LOG.info("Fetching information for topic: {}", this.topicName);
        Map<String, TopicDescription> topicMetadata = getTopicMetadata(Arrays.asList(this.topicName));
        HashSet hashSet = new HashSet();
        for (TopicDescription topicDescription : topicMetadata.values()) {
            Iterator it = topicDescription.partitions().iterator();
            while (it.hasNext()) {
                hashSet.add(new TopicPartition(topicDescription.name(), ((TopicPartitionInfo) it.next()).partition()));
            }
        }
        Properties properties = new Properties();
        properties.setProperty("group.id", "flink-kafka-test" + hashSet.hashCode());
        properties.setProperty("bootstrap.servers", this.bootstrapServers);
        properties.setProperty("key.deserializer", StringDeserializer.class.getCanonicalName());
        properties.setProperty("value.deserializer", StringDeserializer.class.getCanonicalName());
        if (CheckpointingMode.EXACTLY_ONCE.equals(testingSinkSettings.getCheckpointingMode())) {
            properties.setProperty("isolation.level", "read_committed");
        }
        properties.setProperty("auto.offset.reset", "earliest");
        this.readers.add(new KafkaDataReader(properties, hashSet));
        return this.readers.get(this.readers.size() - 1);
    }

    public List<String> generateTestData(TestingSinkSettings testingSinkSettings, long j) {
        Random random = new Random(j);
        ArrayList arrayList = new ArrayList();
        int nextInt = random.nextInt(400) + NUM_RECORDS_LOWER_BOUND;
        for (int i = 0; i < nextInt; i++) {
            arrayList.add(RandomStringUtils.random(random.nextInt(RANDOM_STRING_MAX_LENGTH) + 1, true, true));
        }
        return arrayList;
    }

    protected Map<String, TopicDescription> getTopicMetadata(List<String> list) {
        try {
            return (Map) this.kafkaAdminClient.describeTopics(list).allTopicNames().get();
        } catch (Exception e) {
            throw new RuntimeException(String.format("Failed to get metadata for topics %s.", list), e);
        }
    }

    private boolean topicExists(String str) {
        try {
            this.kafkaAdminClient.describeTopics(Arrays.asList(str)).allTopicNames().get();
            return true;
        } catch (Exception e) {
            return false;
        }
    }

    public void close() {
        if (this.numSplits != 0) {
            deleteTopic(this.topicName);
        }
        this.readers.stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).forEach(externalSystemDataReader -> {
            try {
                externalSystemDataReader.close();
            } catch (Exception e) {
                if (this.kafkaAdminClient != null) {
                    this.kafkaAdminClient.close();
                }
                throw new RuntimeException("Cannot close split writer", e);
            }
        });
        this.readers.clear();
        if (this.kafkaAdminClient != null) {
            this.kafkaAdminClient.close();
        }
    }

    public String toString() {
        return "Single-topic Kafka";
    }

    public List<URL> getConnectorJarPaths() {
        return this.connectorJarPaths;
    }

    public TypeInformation<String> getProducedType() {
        return TypeInformation.of(String.class);
    }

    private DeliveryGuarantee toDeliveryGuarantee(CheckpointingMode checkpointingMode) {
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$streaming$api$CheckpointingMode[checkpointingMode.ordinal()]) {
            case 1:
                return DeliveryGuarantee.EXACTLY_ONCE;
            case 2:
                return DeliveryGuarantee.AT_LEAST_ONCE;
            default:
                throw new IllegalArgumentException(String.format("Only exactly-once and al-least-once checkpointing mode are supported, but actual is %s.", checkpointingMode));
        }
    }
}
