/*
 * Decompiled with CFR 0.152.
 */
package kafka.tools;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import kafka.utils.Exit$;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import scala.Array$;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.StringContext;
import scala.collection.JavaConverters$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.DoubleRef;
import scala.runtime.RichInt$;
import scala.util.Random;

public final class EndToEndLatency$ {
    public static final EndToEndLatency$ MODULE$;
    private final long kafka$tools$EndToEndLatency$$timeout;

    static {
        new EndToEndLatency$();
    }

    public long kafka$tools$EndToEndLatency$$timeout() {
        return this.kafka$tools$EndToEndLatency$$timeout;
    }

    public void main(String[] args) {
        None$ propsFile;
        if (args.length != 5 && args.length != 6) {
            System.err.println(new StringBuilder().append((Object)"USAGE: java ").append((Object)this.getClass().getName()).append((Object)" broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file").toString());
            throw Exit$.MODULE$.exit(1, Exit$.MODULE$.exit$default$2());
        }
        String brokerList = args[0];
        String topic = args[1];
        int numMessages = new StringOps(Predef$.MODULE$.augmentString(args[2])).toInt();
        String producerAcks = args[3];
        int messageLen = new StringOps(Predef$.MODULE$.augmentString(args[4])).toInt();
        Option option = propsFile = args.length > 5 ? new Some<String>(args[5]).filter((Function1<String, Object>)((Object)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final boolean apply(String x$1) {
                return new StringOps(Predef$.MODULE$.augmentString(x$1)).nonEmpty();
            }
        })) : None$.MODULE$;
        if (((List)List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1", "all"}))).contains(producerAcks)) {
            Properties consumerProps = this.loadProps$1(propsFile);
            consumerProps.put("bootstrap.servers", brokerList);
            consumerProps.put("group.id", new StringBuilder().append((Object)"test-group-").append(BoxesRunTime.boxToLong(System.currentTimeMillis())).toString());
            consumerProps.put("enable.auto.commit", "false");
            consumerProps.put("auto.offset.reset", "latest");
            consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            consumerProps.put("fetch.max.wait.ms", "0");
            KafkaConsumer consumer2 = new KafkaConsumer(consumerProps);
            consumer2.subscribe(Collections.singletonList(topic));
            Properties producerProps = this.loadProps$1(propsFile);
            producerProps.put("bootstrap.servers", brokerList);
            producerProps.put("linger.ms", "0");
            producerProps.put("max.block.ms", ((Object)BoxesRunTime.boxToLong(Long.MAX_VALUE)).toString());
            producerProps.put("acks", producerAcks.toString());
            producerProps.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            producerProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            KafkaProducer producer = new KafkaProducer(producerProps);
            consumer2.seekToEnd(Collections.<TopicPartition>emptyList());
            consumer2.poll(0L);
            DoubleRef totalTime = DoubleRef.create(0.0);
            long[] latencies = new long[numMessages];
            Random random = new Random(0);
            RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numMessages).foreach$mVc$sp((Function1<Object, BoxedUnit>)((Object)new Serializable(topic, messageLen, consumer2, producer, totalTime, latencies, random){
                public static final long serialVersionUID = 0L;
                private final String topic$1;
                private final int messageLen$1;
                private final KafkaConsumer consumer$1;
                private final KafkaProducer producer$1;
                private final DoubleRef totalTime$1;
                private final long[] latencies$1;
                private final Random random$1;

                public final void apply(int i) {
                    this.apply$mcVI$sp(i);
                }

                /*
                 * WARNING - void declaration
                 */
                public void apply$mcVI$sp(int i) {
                    byte[] message = EndToEndLatency$.MODULE$.randomBytesOfLen(this.random$1, this.messageLen$1);
                    long begin = System.nanoTime();
                    this.producer$1.send(new ProducerRecord<K, byte[]>(this.topic$1, message)).get();
                    Iterator<ConsumerRecord<K, V>> recordIter = this.consumer$1.poll(EndToEndLatency$.MODULE$.kafka$tools$EndToEndLatency$$timeout()).iterator();
                    long elapsed = System.nanoTime() - begin;
                    if (recordIter.hasNext()) {
                        void var8_6;
                        void var9_7;
                        String sent = new String(message, StandardCharsets.UTF_8);
                        String read2 = new String((byte[])recordIter.next().value(), StandardCharsets.UTF_8);
                        if (read2.equals(sent)) {
                            if (recordIter.hasNext()) {
                                int count2 = 1 + ((TraversableOnce)JavaConverters$.MODULE$.asScalaIteratorConverter(recordIter).asScala()).size();
                                throw new RuntimeException(new StringContext(Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Only one result was expected during this test. We found [", "]"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(count2)})));
                            }
                            if (i % 1000 == 0) {
                                Predef$.MODULE$.println(new StringBuilder().append(i).append((Object)"\t").append(BoxesRunTime.boxToDouble((double)elapsed / 1000.0 / 1000.0)).toString());
                            }
                            this.totalTime$1.elem += (double)elapsed;
                            this.latencies$1[i] = elapsed / 1000L / 1000L;
                            return;
                        }
                        EndToEndLatency$.MODULE$.kafka$tools$EndToEndLatency$$finalise$1(this.consumer$1, this.producer$1);
                        throw new RuntimeException(new StringContext(Predef$.MODULE$.wrapRefArray((Object[])new String[]{"The message read [", "] did not match the message sent [", "]"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{var9_7, var8_6})));
                    }
                    EndToEndLatency$.MODULE$.kafka$tools$EndToEndLatency$$finalise$1(this.consumer$1, this.producer$1);
                    throw new RuntimeException(new StringContext(Predef$.MODULE$.wrapRefArray((Object[])new String[]{"poll() timed out before finding a result (timeout:[", "])"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(EndToEndLatency$.MODULE$.kafka$tools$EndToEndLatency$$timeout())})));
                }
                {
                    this.topic$1 = topic$1;
                    this.messageLen$1 = messageLen$1;
                    this.consumer$1 = consumer$1;
                    this.producer$1 = producer$1;
                    this.totalTime$1 = totalTime$1;
                    this.latencies$1 = latencies$1;
                    this.random$1 = random$1;
                }
            }));
            Predef$.MODULE$.println(new StringOps(Predef$.MODULE$.augmentString("Avg latency: %.4f ms\n")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToDouble(totalTime.elem / (double)numMessages / 1000.0 / 1000.0)})));
            Arrays.sort(latencies);
            long p50 = latencies[(int)((double)latencies.length * 0.5)];
            long p99 = latencies[(int)((double)latencies.length * 0.99)];
            long p999 = latencies[(int)((double)latencies.length * 0.999)];
            Predef$.MODULE$.println(new StringOps(Predef$.MODULE$.augmentString("Percentiles: 50th = %d, 99th = %d, 99.9th = %d")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(p50), BoxesRunTime.boxToLong(p99), BoxesRunTime.boxToLong(p999)})));
            this.kafka$tools$EndToEndLatency$$finalise$1(consumer2, producer);
            return;
        }
        throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
    }

    public byte[] randomBytesOfLen(Random random, int len) {
        return (byte[])Array$.MODULE$.fill(len, new Serializable(random){
            public static final long serialVersionUID = 0L;
            private final Random random$2;

            public final byte apply() {
                return this.apply$mcB$sp();
            }

            public byte apply$mcB$sp() {
                return (byte)(this.random$2.nextInt(26) + 65);
            }
            {
                this.random$2 = random$2;
            }
        }, ClassTag$.MODULE$.Byte());
    }

    private final Properties loadProps$1(Option propsFile$1) {
        return propsFile$1.map(new Serializable(){
            public static final long serialVersionUID = 0L;

            public final Properties apply(String x$1) {
                return Utils.loadProps(x$1);
            }
        }).getOrElse(new Serializable(){
            public static final long serialVersionUID = 0L;

            public final Properties apply() {
                return new Properties();
            }
        });
    }

    public final void kafka$tools$EndToEndLatency$$finalise$1(KafkaConsumer consumer$1, KafkaProducer producer$1) {
        consumer$1.commitSync();
        producer$1.close();
        consumer$1.close();
    }

    private EndToEndLatency$() {
        MODULE$ = this;
        this.kafka$tools$EndToEndLatency$$timeout = 60000L;
    }
}

