package kafka.tools;

import com.typesafe.scalalogging.Logger;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.Timer;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.metrics.KafkaMetricsGroup;
import kafka.tools.MirrorMaker;
import kafka.utils.CommandLineUtils$;
import kafka.utils.Log4jControllerRegistration$;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Time;
import org.codehaus.jackson.util.MinimalPrettyPrinter;
import org.eclipse.persistence.oxm.XMLConstants;
import scala.Function0;
import scala.MatchError;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.StringOps;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.RichInt$;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.Try$;
import scala.util.control.ControlThrowable;

/* compiled from: MirrorMaker.scala */
/* loaded from: input_file:kafka/tools/MirrorMaker$.class */
public final class MirrorMaker$ implements KafkaMetricsGroup {
    public static MirrorMaker$ MODULE$;
    private MirrorMaker.MirrorMakerProducer producer;
    private Seq<MirrorMaker.MirrorMakerThread> kafka$tools$MirrorMaker$$mirrorMakerThreads;
    private final AtomicBoolean kafka$tools$MirrorMaker$$isShuttingDown;
    private final AtomicInteger kafka$tools$MirrorMaker$$numDroppedMessages;
    private MirrorMaker.MirrorMakerMessageHandler kafka$tools$MirrorMaker$$messageHandler;
    private int kafka$tools$MirrorMaker$$offsetCommitIntervalMs;
    private boolean kafka$tools$MirrorMaker$$abortOnSendFailure;
    private volatile boolean kafka$tools$MirrorMaker$$exitingOnSendFailure;
    private long lastSuccessfulCommitTime;
    private final Time time;
    private Logger logger;
    private String logIdent;
    private volatile boolean bitmap$0;

    static {
        new MirrorMaker$();
    }

    @Override // kafka.metrics.KafkaMetricsGroup
    public MetricName metricName(String str, Map<String, String> map) {
        return metricName(str, map);
    }

    @Override // kafka.metrics.KafkaMetricsGroup
    public MetricName explicitMetricName(String str, String str2, String str3, Map<String, String> map) {
        return explicitMetricName(str, str2, str3, map);
    }

    @Override // kafka.metrics.KafkaMetricsGroup
    public <T> Gauge<T> newGauge(String str, Gauge<T> gauge, Map<String, String> map) {
        return newGauge(str, gauge, map);
    }

    @Override // kafka.metrics.KafkaMetricsGroup
    public <T> Map<String, String> newGauge$default$3() {
        return newGauge$default$3();
    }

    @Override // kafka.metrics.KafkaMetricsGroup
    public Meter newMeter(String str, String str2, TimeUnit timeUnit, Map<String, String> map) {
        return newMeter(str, str2, timeUnit, map);
    }

    @Override // kafka.metrics.KafkaMetricsGroup
    public Map<String, String> newMeter$default$4() {
        return newMeter$default$4();
    }

    @Override // kafka.metrics.KafkaMetricsGroup
    public Histogram newHistogram(String str, boolean z, Map<String, String> map) {
        return newHistogram(str, z, map);
    }

    @Override // kafka.metrics.KafkaMetricsGroup
    public boolean newHistogram$default$2() {
        return newHistogram$default$2();
    }

    @Override // kafka.metrics.KafkaMetricsGroup
    public Map<String, String> newHistogram$default$3() {
        return newHistogram$default$3();
    }

    @Override // kafka.metrics.KafkaMetricsGroup
    public Timer newTimer(String str, TimeUnit timeUnit, TimeUnit timeUnit2, Map<String, String> map) {
        return newTimer(str, timeUnit, timeUnit2, map);
    }

    @Override // kafka.metrics.KafkaMetricsGroup
    public Map<String, String> newTimer$default$4() {
        return newTimer$default$4();
    }

    @Override // kafka.metrics.KafkaMetricsGroup
    public void removeMetric(String str, Map<String, String> map) {
        removeMetric(str, map);
    }

    @Override // kafka.metrics.KafkaMetricsGroup
    public Map<String, String> removeMetric$default$2() {
        return removeMetric$default$2();
    }

    @Override // kafka.utils.Logging
    public String loggerName() {
        String loggerName;
        loggerName = loggerName();
        return loggerName;
    }

    @Override // kafka.utils.Logging
    public String msgWithLogIdent(String str) {
        String msgWithLogIdent;
        msgWithLogIdent = msgWithLogIdent(str);
        return msgWithLogIdent;
    }

    @Override // kafka.utils.Logging
    public void trace(Function0<String> function0) {
        trace(function0);
    }

    @Override // kafka.utils.Logging
    public void trace(Function0<String> function0, Function0<Throwable> function02) {
        trace(function0, function02);
    }

    @Override // kafka.utils.Logging
    public boolean isDebugEnabled() {
        boolean isDebugEnabled;
        isDebugEnabled = isDebugEnabled();
        return isDebugEnabled;
    }

    @Override // kafka.utils.Logging
    public boolean isTraceEnabled() {
        boolean isTraceEnabled;
        isTraceEnabled = isTraceEnabled();
        return isTraceEnabled;
    }

    @Override // kafka.utils.Logging
    public void debug(Function0<String> function0) {
        debug(function0);
    }

    @Override // kafka.utils.Logging
    public void debug(Function0<String> function0, Function0<Throwable> function02) {
        debug(function0, function02);
    }

    @Override // kafka.utils.Logging
    public void info(Function0<String> function0) {
        info(function0);
    }

    @Override // kafka.utils.Logging
    public void info(Function0<String> function0, Function0<Throwable> function02) {
        info(function0, function02);
    }

    @Override // kafka.utils.Logging
    public void warn(Function0<String> function0) {
        warn(function0);
    }

    @Override // kafka.utils.Logging
    public void warn(Function0<String> function0, Function0<Throwable> function02) {
        warn(function0, function02);
    }

    @Override // kafka.utils.Logging
    public void error(Function0<String> function0) {
        error(function0);
    }

    @Override // kafka.utils.Logging
    public void error(Function0<String> function0, Function0<Throwable> function02) {
        error(function0, function02);
    }

    @Override // kafka.utils.Logging
    public void fatal(Function0<String> function0) {
        fatal(function0);
    }

    @Override // kafka.utils.Logging
    public void fatal(Function0<String> function0, Function0<Throwable> function02) {
        fatal(function0, function02);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [kafka.tools.MirrorMaker$] */
    private Logger logger$lzycompute() {
        Logger logger;
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                logger = logger();
                this.logger = logger;
                r0 = this;
                r0.bitmap$0 = true;
            }
        }
        return this.logger;
    }

    @Override // kafka.utils.Logging
    public Logger logger() {
        return !this.bitmap$0 ? logger$lzycompute() : this.logger;
    }

    @Override // kafka.utils.Logging
    public String logIdent() {
        return this.logIdent;
    }

    @Override // kafka.utils.Logging
    public void logIdent_$eq(String str) {
        this.logIdent = str;
    }

    public MirrorMaker.MirrorMakerProducer producer() {
        return this.producer;
    }

    public void producer_$eq(MirrorMaker.MirrorMakerProducer mirrorMakerProducer) {
        this.producer = mirrorMakerProducer;
    }

    private Seq<MirrorMaker.MirrorMakerThread> kafka$tools$MirrorMaker$$mirrorMakerThreads() {
        return this.kafka$tools$MirrorMaker$$mirrorMakerThreads;
    }

    public void kafka$tools$MirrorMaker$$mirrorMakerThreads_$eq(Seq<MirrorMaker.MirrorMakerThread> seq) {
        this.kafka$tools$MirrorMaker$$mirrorMakerThreads = seq;
    }

    public AtomicBoolean kafka$tools$MirrorMaker$$isShuttingDown() {
        return this.kafka$tools$MirrorMaker$$isShuttingDown;
    }

    public AtomicInteger kafka$tools$MirrorMaker$$numDroppedMessages() {
        return this.kafka$tools$MirrorMaker$$numDroppedMessages;
    }

    public MirrorMaker.MirrorMakerMessageHandler kafka$tools$MirrorMaker$$messageHandler() {
        return this.kafka$tools$MirrorMaker$$messageHandler;
    }

    public void kafka$tools$MirrorMaker$$messageHandler_$eq(MirrorMaker.MirrorMakerMessageHandler mirrorMakerMessageHandler) {
        this.kafka$tools$MirrorMaker$$messageHandler = mirrorMakerMessageHandler;
    }

    public int kafka$tools$MirrorMaker$$offsetCommitIntervalMs() {
        return this.kafka$tools$MirrorMaker$$offsetCommitIntervalMs;
    }

    public void kafka$tools$MirrorMaker$$offsetCommitIntervalMs_$eq(int i) {
        this.kafka$tools$MirrorMaker$$offsetCommitIntervalMs = i;
    }

    public boolean kafka$tools$MirrorMaker$$abortOnSendFailure() {
        return this.kafka$tools$MirrorMaker$$abortOnSendFailure;
    }

    public void kafka$tools$MirrorMaker$$abortOnSendFailure_$eq(boolean z) {
        this.kafka$tools$MirrorMaker$$abortOnSendFailure = z;
    }

    public boolean kafka$tools$MirrorMaker$$exitingOnSendFailure() {
        return this.kafka$tools$MirrorMaker$$exitingOnSendFailure;
    }

    public void kafka$tools$MirrorMaker$$exitingOnSendFailure_$eq(boolean z) {
        this.kafka$tools$MirrorMaker$$exitingOnSendFailure = z;
    }

    private long lastSuccessfulCommitTime() {
        return this.lastSuccessfulCommitTime;
    }

    private void lastSuccessfulCommitTime_$eq(long j) {
        this.lastSuccessfulCommitTime = j;
    }

    private Time time() {
        return this.time;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void main(String[] strArr) {
        info(() -> {
            return "Starting mirror maker";
        });
        try {
            MirrorMaker.MirrorMakerOptions mirrorMakerOptions = new MirrorMaker.MirrorMakerOptions(strArr);
            CommandLineUtils$.MODULE$.printHelpAndExitIfNeeded(mirrorMakerOptions, "This tool helps to continuously copy data between two Kafka clusters.");
            mirrorMakerOptions.checkArgs();
        } catch (Throwable th) {
            if (th instanceof ControlThrowable) {
                throw ((Throwable) ((ControlThrowable) th));
            }
            if (th == 0) {
                throw th;
            }
            error(() -> {
                return "Exception when starting mirror maker.";
            }, () -> {
                return th;
            });
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        kafka$tools$MirrorMaker$$mirrorMakerThreads().foreach(mirrorMakerThread -> {
            mirrorMakerThread.start();
            return BoxedUnit.UNIT;
        });
        kafka$tools$MirrorMaker$$mirrorMakerThreads().foreach(mirrorMakerThread2 -> {
            mirrorMakerThread2.awaitShutdown();
            return BoxedUnit.UNIT;
        });
    }

    public Seq<MirrorMaker.ConsumerWrapper> createConsumers(int i, Properties properties, Option<ConsumerRebalanceListener> option, Option<String> option2) {
        kafka$tools$MirrorMaker$$maybeSetDefaultProperty(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, XMLConstants.BOOLEAN_STRING_FALSE);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        String property = properties.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
        IndexedSeq indexedSeq = (IndexedSeq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), i).map(obj -> {
            return $anonfun$createConsumers$1(properties, property, BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom());
        option2.getOrElse(() -> {
            throw new IllegalArgumentException("White list cannot be empty");
        });
        return (Seq) indexedSeq.map(kafkaConsumer -> {
            return new MirrorMaker.ConsumerWrapper(kafkaConsumer, option, option2);
        }, IndexedSeq$.MODULE$.canBuildFrom());
    }

    public void commitOffsets(MirrorMaker.ConsumerWrapper consumerWrapper) {
        Object obj;
        if (kafka$tools$MirrorMaker$$exitingOnSendFailure()) {
            info(() -> {
                return "Exiting on send failure, skip committing offsets.";
            });
            return;
        }
        IntRef create = IntRef.create(0);
        boolean z = true;
        while (z) {
            trace(() -> {
                return "Committing offsets.";
            });
            try {
                consumerWrapper.commit();
                lastSuccessfulCommitTime_$eq(time().milliseconds());
                z = false;
            } catch (CommitFailedException unused) {
                z = false;
                warn(() -> {
                    return new StringBuilder(190).append("Failed to commit offsets because the consumer group has rebalanced and assigned partitions to another instance. If you see this regularly, it could indicate that you need to either increase ").append(new StringBuilder(66).append("the consumer's ").append(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG).append(" or reduce the number of records ").toString()).append(new StringBuilder(47).append("handled on each iteration with ").append(ConsumerConfig.MAX_POLL_RECORDS_CONFIG).toString()).toString();
                });
            } catch (TimeoutException unused2) {
                Try apply = Try$.MODULE$.apply(() -> {
                    return consumerWrapper.consumer().listTopics();
                });
                if (apply instanceof Success) {
                    java.util.Map map = (java.util.Map) ((Success) apply).value();
                    obj = consumerWrapper.offsets().retain((topicPartition, obj2) -> {
                        return BoxesRunTime.boxToBoolean($anonfun$commitOffsets$3(map, topicPartition, BoxesRunTime.unboxToLong(obj2)));
                    });
                } else {
                    if (!(apply instanceof Failure)) {
                        throw new MatchError(apply);
                    }
                    Throwable exception = ((Failure) apply).exception();
                    warn(() -> {
                        return "Failed to list all authorized topics after committing offsets timed out: ";
                    }, () -> {
                        return exception;
                    });
                    obj = BoxedUnit.UNIT;
                }
                create.elem++;
                warn(() -> {
                    return new StringBuilder(100).append("Failed to commit offsets because the offset commit request processing can not be completed in time. ").append(new StringBuilder(109).append("If you see this regularly, it could indicate that you need to increase the consumer's ").append(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG).append(MinimalPrettyPrinter.DEFAULT_ROOT_VALUE_SEPARATOR).toString()).append(new StringBuilder(54).append("Last successful offset commit timestamp=").append(MODULE$.lastSuccessfulCommitTime()).append(", retry count=").append(create.elem).toString()).toString();
                });
                Thread.sleep(100L);
            } catch (WakeupException e) {
                commitOffsets(consumerWrapper);
                throw e;
            }
        }
    }

    public void cleanShutdown() {
        if (kafka$tools$MirrorMaker$$isShuttingDown().compareAndSet(false, true)) {
            info(() -> {
                return "Start clean shutdown.";
            });
            info(() -> {
                return "Shutting down consumer threads.";
            });
            if (kafka$tools$MirrorMaker$$mirrorMakerThreads() != null) {
                kafka$tools$MirrorMaker$$mirrorMakerThreads().foreach(mirrorMakerThread -> {
                    mirrorMakerThread.shutdown();
                    return BoxedUnit.UNIT;
                });
                kafka$tools$MirrorMaker$$mirrorMakerThreads().foreach(mirrorMakerThread2 -> {
                    mirrorMakerThread2.awaitShutdown();
                    return BoxedUnit.UNIT;
                });
            }
            info(() -> {
                return "Closing producer.";
            });
            producer().close();
            info(() -> {
                return "Kafka mirror maker shutdown successfully";
            });
        }
    }

    public void kafka$tools$MirrorMaker$$maybeSetDefaultProperty(Properties properties, String str, String str2) {
        String property = properties.getProperty(str);
        properties.setProperty(str, (String) Option$.MODULE$.apply(property).getOrElse(() -> {
            return str2;
        }));
        String property2 = properties.getProperty(str);
        if (property2 == null) {
            if (str2 == null) {
                return;
            }
        } else if (property2.equals(str2)) {
            return;
        }
        info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Property %s is overridden to %s - data loss or message reordering is possible.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{str, property}));
        });
    }

    public static final /* synthetic */ KafkaConsumer $anonfun$createConsumers$1(Properties properties, String str, int i) {
        properties.setProperty("client.id", new StringBuilder(1).append(str).append("-").append(BoxesRunTime.boxToInteger(i).toString()).toString());
        return new KafkaConsumer(properties);
    }

    public static final /* synthetic */ boolean $anonfun$commitOffsets$3(java.util.Map map, TopicPartition topicPartition, long j) {
        return map.containsKey(topicPartition.topic());
    }

    private MirrorMaker$() {
        MODULE$ = this;
        Log4jControllerRegistration$.MODULE$;
        KafkaMetricsGroup.$init$((KafkaMetricsGroup) this);
        this.producer = null;
        this.kafka$tools$MirrorMaker$$mirrorMakerThreads = null;
        this.kafka$tools$MirrorMaker$$isShuttingDown = new AtomicBoolean(false);
        this.kafka$tools$MirrorMaker$$numDroppedMessages = new AtomicInteger(0);
        this.kafka$tools$MirrorMaker$$messageHandler = null;
        this.kafka$tools$MirrorMaker$$offsetCommitIntervalMs = 0;
        this.kafka$tools$MirrorMaker$$abortOnSendFailure = true;
        this.kafka$tools$MirrorMaker$$exitingOnSendFailure = false;
        this.lastSuccessfulCommitTime = -1L;
        this.time = Time.SYSTEM;
        newGauge("MirrorMaker-numDroppedMessages", new Gauge<Object>() { // from class: kafka.tools.MirrorMaker$$anon$1
            public int value() {
                return MirrorMaker$.MODULE$.kafka$tools$MirrorMaker$$numDroppedMessages().get();
            }

            @Override // com.yammer.metrics.core.Gauge
            /* renamed from: value */
            public /* bridge */ /* synthetic */ Object mo521value() {
                return BoxesRunTime.boxToInteger(value());
            }
        }, newGauge$default$3());
    }
}
