/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kinesis.testutils;

import java.util.BitSet;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExactlyOnceValidatingConsumerThread {
    private static final Logger LOG = LoggerFactory.getLogger(ExactlyOnceValidatingConsumerThread.class);

    public static Thread create(final int totalEventCount, final int failAtRecordCount, final int parallelism, final int checkpointInterval, final long restartDelay, final String awsAccessKey, final String awsSecretKey, final String awsRegion, final String kinesisStreamName, final AtomicReference<Throwable> errorHandler, final int flinkPort, final Configuration flinkConfig) {
        Runnable exactlyOnceValidationConsumer = new Runnable(){

            @Override
            public void run() {
                try {
                    StreamExecutionEnvironment see = StreamExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)flinkPort, (Configuration)flinkConfig, (String[])new String[0]);
                    see.setParallelism(parallelism);
                    see.enableCheckpointing((long)checkpointInterval);
                    see.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)2, (long)restartDelay));
                    Properties consumerProps = new Properties();
                    consumerProps.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, awsAccessKey);
                    consumerProps.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, awsSecretKey);
                    consumerProps.setProperty("aws.region", awsRegion);
                    consumerProps.setProperty("flink.stream.initpos", ConsumerConfigConstants.InitialPosition.TRIM_HORIZON.name());
                    DataStreamSource consuming = see.addSource((SourceFunction)new FlinkKinesisConsumer(kinesisStreamName, (DeserializationSchema)new SimpleStringSchema(), consumerProps));
                    consuming.flatMap((FlatMapFunction)new ArtificialFailOnceFlatMapper(failAtRecordCount)).flatMap((FlatMapFunction)new ExactlyOnceValidatingMapper(totalEventCount)).setParallelism(1);
                    LOG.info("Starting consuming topology");
                    TestUtils.tryExecute((StreamExecutionEnvironment)see, (String)"Consuming topo");
                    LOG.info("Consuming topo finished");
                }
                catch (Exception e) {
                    LOG.warn("Error while running consuming topology", (Throwable)e);
                    errorHandler.set(e);
                }
            }
        };
        return new Thread(exactlyOnceValidationConsumer);
    }

    private static class ArtificialFailOnceFlatMapper
    extends RichFlatMapFunction<String, String> {
        int count = 0;
        private final int failAtRecordCount;

        public ArtificialFailOnceFlatMapper(int failAtRecordCount) {
            this.failAtRecordCount = failAtRecordCount;
        }

        public void flatMap(String value, Collector<String> out) throws Exception {
            if (this.count++ >= this.failAtRecordCount && this.getRuntimeContext().getAttemptNumber() == 0) {
                throw new RuntimeException("Artificial failure. Restart please.");
            }
            out.collect((Object)value);
        }
    }

    private static class ExactlyOnceValidatingMapper
    implements FlatMapFunction<String, String>,
    ListCheckpointed<BitSet> {
        private static final Logger LOG = LoggerFactory.getLogger(ExactlyOnceValidatingMapper.class);
        private final int totalEventCount;
        private BitSet validator;

        public ExactlyOnceValidatingMapper(int totalEventCount) {
            this.totalEventCount = totalEventCount;
            this.validator = new BitSet(totalEventCount);
        }

        public void flatMap(String value, Collector<String> out) throws Exception {
            LOG.info("Consumed {}", (Object)value);
            int id = Integer.parseInt(value.split("-")[0]);
            if (this.validator.get(id)) {
                throw new RuntimeException("Saw id " + id + " twice!");
            }
            this.validator.set(id);
            if (id > this.totalEventCount - 1) {
                throw new RuntimeException("Out of bounds ID observed");
            }
            if (this.validator.nextClearBit(0) == this.totalEventCount) {
                throw new SuccessException();
            }
        }

        public List<BitSet> snapshotState(long checkpointId, long timestamp) throws Exception {
            return Collections.singletonList(this.validator);
        }

        public void restoreState(List<BitSet> state) throws Exception {
            if (state.size() == 1) {
                this.validator = state.get(0);
            } else {
                Preconditions.checkState((boolean)state.isEmpty());
            }
        }
    }
}

