/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.source.testutils;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Pattern;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.connector.kafka.source.testutils.KafkaPartitionDataWriter;
import org.apache.flink.connector.kafka.source.testutils.KafkaSingleTopicExternalContext;
import org.apache.flink.connectors.test.common.external.ExternalContext;
import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.testcontainers.containers.KafkaContainer;

public class KafkaMultipleTopicExternalContext
extends KafkaSingleTopicExternalContext {
    private int numTopics = 0;
    private final String topicPattern;
    private final Map<String, SourceSplitDataWriter<String>> topicNameToSplitWriters = new HashMap<String, SourceSplitDataWriter<String>>();

    public KafkaMultipleTopicExternalContext(String bootstrapServers) {
        super(bootstrapServers);
        this.topicPattern = "kafka-multiple-topic-[0-9]+-" + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
    }

    @Override
    public SourceSplitDataWriter<String> createSourceSplitDataWriter() {
        String topicName = this.getTopicName();
        this.createTopic(topicName, 1, (short)1);
        KafkaPartitionDataWriter splitWriter = new KafkaPartitionDataWriter(this.getKafkaProducerProperties(this.numTopics), new TopicPartition(topicName, 0));
        this.topicNameToSplitWriters.put(topicName, splitWriter);
        ++this.numTopics;
        return splitWriter;
    }

    @Override
    public Source<String, ?, ?> createSource(Boundedness boundedness) {
        KafkaSourceBuilder builder = KafkaSource.builder();
        if (boundedness == Boundedness.BOUNDED) {
            builder = builder.setBounded(OffsetsInitializer.latest());
        }
        return builder.setGroupId("flink-kafka-multiple-topic-test").setBootstrapServers(this.bootstrapServers).setTopicPattern(Pattern.compile(this.topicPattern)).setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class)).build();
    }

    @Override
    public void close() {
        this.topicNameToSplitWriters.forEach((topicName, splitWriter) -> {
            try {
                splitWriter.close();
                this.deleteTopic((String)topicName);
            }
            catch (Exception e) {
                this.kafkaAdminClient.close();
                throw new RuntimeException("Cannot close split writer", e);
            }
        });
        this.topicNameToSplitWriters.clear();
        this.kafkaAdminClient.close();
    }

    private String getTopicName() {
        return this.topicPattern.replace("[0-9]+", String.valueOf(this.numTopics));
    }

    @Override
    public String toString() {
        return "Multiple-topics Kafka";
    }

    public static class Factory
    extends KafkaSingleTopicExternalContext.Factory {
        public Factory(KafkaContainer kafkaContainer) {
            super(kafkaContainer);
        }

        @Override
        public ExternalContext<String> createExternalContext() {
            return new KafkaMultipleTopicExternalContext(this.getBootstrapServer());
        }
    }
}

