/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kinesis.testutils;

import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KinesisEventsGeneratorProducerThread {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisEventsGeneratorProducerThread.class);

    public static Thread create(final int totalEventCount, final int parallelism, final String awsAccessKey, final String awsSecretKey, final String awsRegion, final String kinesisStreamName, final AtomicReference<Throwable> errorHandler, final int flinkPort, final Configuration flinkConfig) {
        Runnable kinesisEventsGeneratorProducer = new Runnable(){

            @Override
            public void run() {
                try {
                    StreamExecutionEnvironment see = StreamExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)flinkPort, (Configuration)flinkConfig, (String[])new String[0]);
                    see.setParallelism(parallelism);
                    DataStreamSource simpleStringStream = see.addSource((SourceFunction)new EventsGenerator(totalEventCount)).setParallelism(1);
                    Properties producerProps = new Properties();
                    producerProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, awsAccessKey);
                    producerProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, awsSecretKey);
                    producerProps.setProperty("aws.region", awsRegion);
                    FlinkKinesisProducer kinesis = new FlinkKinesisProducer((SerializationSchema)new SimpleStringSchema(), producerProps);
                    kinesis.setFailOnError(true);
                    kinesis.setDefaultStream(kinesisStreamName);
                    kinesis.setDefaultPartition("0");
                    simpleStringStream.addSink((SinkFunction)kinesis);
                    LOG.info("Starting producing topology");
                    see.execute("Producing topology");
                    LOG.info("Producing topo finished");
                }
                catch (Exception e) {
                    LOG.warn("Error while running producing topology", (Throwable)e);
                    errorHandler.set(e);
                }
            }
        };
        return new Thread(kinesisEventsGeneratorProducer);
    }

    private static class EventsGenerator
    implements SourceFunction<String> {
        private static final Logger LOG = LoggerFactory.getLogger(EventsGenerator.class);
        private boolean running = true;
        private final long limit;

        public EventsGenerator(long limit) {
            this.limit = limit;
        }

        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
            long seq = 0L;
            while (this.running) {
                Thread.sleep(10L);
                String evt = seq++ + "-" + RandomStringUtils.randomAlphabetic((int)12);
                ctx.collect((Object)evt);
                LOG.info("Emitting event {}", (Object)evt);
                if (seq < this.limit) continue;
                break;
            }
            ctx.close();
            LOG.info("Stopping events generator");
        }

        public void cancel() {
            this.running = false;
        }
    }
}

