/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.kafka010;

import java.io.File;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkFunSuite;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.LocalStreamingContext;
import org.apache.spark.streaming.Milliseconds$;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.Time$;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.streaming.dstream.DStream$;
import org.apache.spark.streaming.dstream.InputDStream;
import org.apache.spark.streaming.kafka010.CanCommitOffsets;
import org.apache.spark.streaming.kafka010.ConstantEstimator;
import org.apache.spark.streaming.kafka010.ConstantRateController;
import org.apache.spark.streaming.kafka010.ConsumerStrategies$;
import org.apache.spark.streaming.kafka010.DefaultPerPartitionConfig;
import org.apache.spark.streaming.kafka010.DirectKafkaInputDStream;
import org.apache.spark.streaming.kafka010.DirectKafkaStreamSuite$;
import org.apache.spark.streaming.kafka010.HasOffsetRanges;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.apache.spark.streaming.kafka010.KafkaUtils$;
import org.apache.spark.streaming.kafka010.LocationStrategies$;
import org.apache.spark.streaming.kafka010.LocationStrategy;
import org.apache.spark.streaming.kafka010.OffsetRange;
import org.apache.spark.streaming.kafka010.PerPartitionConfig;
import org.apache.spark.streaming.scheduler.RateController;
import org.apache.spark.streaming.scheduler.StreamingListener;
import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted;
import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted;
import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted;
import org.apache.spark.streaming.scheduler.StreamingListenerOutputOperationCompleted;
import org.apache.spark.streaming.scheduler.StreamingListenerOutputOperationStarted;
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverError;
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted;
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped;
import org.apache.spark.streaming.scheduler.StreamingListenerStreamingStarted;
import org.apache.spark.util.Utils$;
import org.scalactic.Bool;
import org.scalactic.Bool$;
import org.scalactic.Equality$;
import org.scalactic.Prettifier$;
import org.scalactic.TripleEqualsSupport;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import org.scalatest.BeforeAndAfterEach;
import org.scalatest.Tag;
import org.scalatest.compatible.Assertion;
import org.scalatest.concurrent.AbstractPatienceConfiguration;
import org.scalatest.concurrent.AbstractPatienceConfiguration$PatienceConfig$;
import org.scalatest.concurrent.Eventually;
import org.scalatest.concurrent.PatienceConfiguration;
import org.scalatest.concurrent.ScaledTimeSpans;
import org.scalatest.enablers.Retrying;
import org.scalatest.enablers.Retrying$;
import org.scalatest.time.Span;
import org.scalatest.time.Span$;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.math.Numeric;
import scala.math.Ordering;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction0;
import scala.util.Random$;

@ScalaSignature(bytes="\u0006\u0001\tec\u0001\u0002\u0013&\u0001ABQa\u0012\u0001\u0005\u0002!Cqa\u0013\u0001C\u0002\u0013\u0005A\n\u0003\u0004Q\u0001\u0001\u0006I!\u0014\u0005\n#\u0002\u0001\r\u00111A\u0005\nIC\u0011b\u0017\u0001A\u0002\u0003\u0007I\u0011\u0002/\t\u0013\u0015\u0004\u0001\u0019!A!B\u0013\u0019\u0006\"\u00034\u0001\u0001\u0004\u0005\r\u0011\"\u0003h\u0011%Y\u0007\u00011AA\u0002\u0013%A\u000eC\u0005o\u0001\u0001\u0007\t\u0011)Q\u0005Q\")q\u000e\u0001C!a\")\u0011\u000f\u0001C!a\")!\u000f\u0001C!a\")1\u000f\u0001C\u0001i\"I\u0011\u0011\u0006\u0001C\u0002\u0013\u0005\u00111\u0006\u0005\t\u0003g\u0001\u0001\u0015!\u0003\u0002.!9\u0011Q\u0007\u0001\u0005\n\u0005]\u0002bBA&\u0001\u0011%\u0011Q\n\u0005\b\u0003o\u0003A\u0011BA]\u000f\u001d\t9/\nE\u0001\u0003S4a\u0001J\u0013\t\u0002\u0005-\bBB$\u0015\t\u0003\tI\u0010C\u0005\u0002|R\u0011\r\u0011\"\u0001\u0002~\"A!Q\u0002\u000b!\u0002\u0013\tyP\u0002\u0004\u0003\u0010Q\u0001!\u0011\u0003\u0005\u0007\u000fb!\tA!\u0007\t\u0013\t}\u0001D1A\u0005\u0002\u0005u\b\u0002\u0003B\u00111\u0001\u0006I!a@\t\u0013\t\r\u0002D1A\u0005\u0002\u0005u\b\u0002\u0003B\u00131\u0001\u0006I!a@\t\u0013\t\u001d\u0002D1A\u0005\u0002\u0005u\b\u0002\u0003B\u00151\u0001\u0006I!a@\t\u000f\t-\u0002\u0004\"\u0011\u0003.!9!\u0011\b\r\u0005B\tm\u0002b\u0002B$1\u0011\u0005#\u0011\n\u0005\n\u0005+\"\u0012\u0011!C\u0005\u0005/\u0012a\u0003R5sK\u000e$8*\u00194lCN#(/Z1n'VLG/\u001a\u0006\u0003M\u001d\n\u0001b[1gW\u0006\u0004\u0014\u0007\r\u0006\u0003Q%\n\u0011b\u001d;sK\u0006l\u0017N\\4\u000b\u0005)Z\u0013!B:qCJ\\'B\u0001\u0017.\u0003\u0019\t\u0007/Y2iK*\ta&A\u0002pe\u001e\u001c\u0001aE\u0003\u0001cUJ\u0014\t\u0005\u00023g5\t\u0011&\u0003\u00025S\ti1\u000b]1sW\u001a+hnU;ji\u0016\u0004\"AN\u001c\u000e\u0003\u001dJ!\u0001O\u0014\u0003+1{7-\u00197TiJ,\u0017-\\5oO\u000e{g\u000e^3yiB\u0011!hP\u0007\u0002w)\u0011A(P\u0001\u000bG>t7-\u001e:sK:$(B\u0001 .\u0003%\u00198-\u00197bi\u0016\u001cH/\u0003\u0002Aw\tQQI^3oiV\fG\u000e\\=\u0011\u0005\t+U\"A\"\u000b\u0005\u0011K\u0013\u0001C5oi\u0016\u0014h.\u00197\n\u0005\u0019\u001b%a\u0002'pO\u001eLgnZ\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003%\u0003\"A\u0013\u0001\u000e\u0003\u0015\n\u0011b\u001d9be.\u001cuN\u001c4\u0016\u00035\u0003\"A\r(\n\u0005=K#!C*qCJ\\7i\u001c8g\u0003)\u0019\b/\u0019:l\u0007>tg\rI\u0001\bi\u0016\u001cH\u000fR5s+\u0005\u0019\u0006C\u0001+Z\u001b\u0005)&B\u0001,X\u0003\tIwNC\u0001Y\u0003\u0011Q\u0017M^1\n\u0005i+&\u0001\u0002$jY\u0016\f1\u0002^3ti\u0012K'o\u0018\u0013fcR\u0011Ql\u0019\t\u0003=\u0006l\u0011a\u0018\u0006\u0002A\u0006)1oY1mC&\u0011!m\u0018\u0002\u0005+:LG\u000fC\u0004e\u000b\u0005\u0005\t\u0019A*\u0002\u0007a$\u0013'\u0001\u0005uKN$H)\u001b:!\u00039Y\u0017MZ6b)\u0016\u001cH/\u0016;jYN,\u0012\u0001\u001b\t\u0003\u0015&L!A[\u0013\u0003\u001d-\u000bgm[1UKN$X\u000b^5mg\u0006\u00112.\u00194lCR+7\u000f^+uS2\u001cx\fJ3r)\tiV\u000eC\u0004e\u0011\u0005\u0005\t\u0019\u00015\u0002\u001f-\fgm[1UKN$X\u000b^5mg\u0002\n\u0011BY3g_J,\u0017\t\u001c7\u0015\u0003u\u000b\u0001\"\u00194uKJ\fE\u000e\\\u0001\nC\u001a$XM]#bG\"\fabZ3u\u0017\u000647.\u0019)be\u0006l7\u000fF\u0002v\u00033\u0001RA^=|\u0003\u001bi\u0011a\u001e\u0006\u0003q^\u000bA!\u001e;jY&\u0011!p\u001e\u0002\b\u0011\u0006\u001c\b.T1q!\ra\u0018q\u0001\b\u0004{\u0006\r\u0001C\u0001@`\u001b\u0005y(bAA\u0001_\u00051AH]8pizJ1!!\u0002`\u0003\u0019\u0001&/\u001a3fM&!\u0011\u0011BA\u0006\u0005\u0019\u0019FO]5oO*\u0019\u0011QA0\u0011\t\u0005=\u0011QC\u0007\u0003\u0003#Q1!a\u0005X\u0003\u0011a\u0017M\\4\n\t\u0005]\u0011\u0011\u0003\u0002\u0007\u001f\nTWm\u0019;\t\u000f\u0005mQ\u00021\u0001\u0002\u001e\u0005)Q\r\u001f;sCB)a,a\b\u0002$%\u0019\u0011\u0011E0\u0003\u0015q\u0012X\r]3bi\u0016$g\b\u0005\u0004_\u0003KY\u0018QB\u0005\u0004\u0003Oy&A\u0002+va2,''\u0001\bqe\u00164WM\u001d:fI\"{7\u000f^:\u0016\u0005\u00055\u0002c\u0001&\u00020%\u0019\u0011\u0011G\u0013\u0003!1{7-\u0019;j_:\u001cFO]1uK\u001eL\u0018a\u00049sK\u001a,'O]3e\u0011>\u001cHo\u001d\u0011\u0002!\t\f7m\u001b9sKN\u001cXO]3UKN$HcB/\u0002:\u0005\r\u0013q\t\u0005\b\u0003w\u0001\u0002\u0019AA\u001f\u0003Mi\u0017\r\u001f*bi\u0016\u0004VM\u001d)beRLG/[8o!\rq\u0016qH\u0005\u0004\u0003\u0003z&aA%oi\"9\u0011Q\t\tA\u0002\u0005u\u0012aC5oSRL\u0017\r\u001c*bi\u0016Dq!!\u0013\u0011\u0001\u0004\ti$A\fnCblUm]:bO\u0016\u001c\b+\u001a:QCJ$\u0018\u000e^5p]\u0006yq-\u001a;PM\u001a\u001cX\r\u001e*b]\u001e,7/\u0006\u0004\u0002P\u0005}\u00151\u0017\u000b\u0005\u0003#\n9\b\u0005\u0004\u0002T\u0005u\u00131\r\b\u0005\u0003+\nIFD\u0002\u007f\u0003/J\u0011\u0001Y\u0005\u0004\u00037z\u0016a\u00029bG.\fw-Z\u0005\u0005\u0003?\n\tGA\u0002TKFT1!a\u0017`!\u001dq\u0016QEA3\u0003W\u00022ANA4\u0013\r\tIg\n\u0002\u0005)&lW\rE\u0003_\u0003[\n\t(C\u0002\u0002p}\u0013Q!\u0011:sCf\u00042ASA:\u0013\r\t)(\n\u0002\f\u001f\u001a47/\u001a;SC:<W\rC\u0004\u0002zE\u0001\r!a\u001f\u0002\u0017-\fgm[1TiJ,\u0017-\u001c\t\u0007\u0003{\n\u0019)a\"\u000e\u0005\u0005}$bAAAO\u00059Am\u001d;sK\u0006l\u0017\u0002BAC\u0003\u007f\u0012q\u0001R*ue\u0016\fW\u000e\u0005\u0005\u0002\n\u0006]\u00151TAY\u001b\t\tYI\u0003\u0003\u0002\u000e\u0006=\u0015\u0001C2p]N,X.\u001a:\u000b\t\u0005E\u00151S\u0001\bG2LWM\u001c;t\u0015\r\t)jK\u0001\u0006W\u000647.Y\u0005\u0005\u00033\u000bYI\u0001\bD_:\u001cX/\\3s%\u0016\u001cwN\u001d3\u0011\t\u0005u\u0015q\u0014\u0007\u0001\t\u001d\t\t+\u0005b\u0001\u0003G\u0013\u0011aS\t\u0005\u0003K\u000bY\u000bE\u0002_\u0003OK1!!+`\u0005\u001dqu\u000e\u001e5j]\u001e\u00042AXAW\u0013\r\tyk\u0018\u0002\u0004\u0003:L\b\u0003BAO\u0003g#q!!.\u0012\u0005\u0004\t\u0019KA\u0001W\u0003Q9W\r\u001e#je\u0016\u001cGoS1gW\u0006\u001cFO]3b[RA\u00111XAa\u0003\u000b\fY\u000eE\u0003K\u0003{[80C\u0002\u0002@\u0016\u0012q\u0003R5sK\u000e$8*\u00194lC&s\u0007/\u001e;E'R\u0014X-Y7\t\r\u0005\r'\u00031\u0001|\u0003\u0015!x\u000e]5d\u0011\u001d\t9M\u0005a\u0001\u0003\u0013\f!#\\8dWJ\u000bG/Z\"p]R\u0014x\u000e\u001c7feB)a,a3\u0002P&\u0019\u0011QZ0\u0003\r=\u0003H/[8o!\u0011\t\t.a6\u000e\u0005\u0005M'bAAkO\u0005I1o\u00195fIVdWM]\u0005\u0005\u00033\f\u0019N\u0001\bSCR,7i\u001c8ue>dG.\u001a:\t\u000f\u0005u'\u00031\u0001\u0002`\u0006\u0019\u0001\u000f]2\u0011\u000by\u000bY-!9\u0011\u0007)\u000b\u0019/C\u0002\u0002f\u0016\u0012!\u0003U3s!\u0006\u0014H/\u001b;j_:\u001cuN\u001c4jO\u00061B)\u001b:fGR\\\u0015MZ6b'R\u0014X-Y7Tk&$X\r\u0005\u0002K)M)A#!<\u0002tB\u0019a,a<\n\u0007\u0005ExL\u0001\u0004B]f\u0014VM\u001a\t\u0004=\u0006U\u0018bAA|?\na1+\u001a:jC2L'0\u00192mKR\u0011\u0011\u0011^\u0001\u0006i>$\u0018\r\\\u000b\u0003\u0003\u007f\u0004BA!\u0001\u0003\n5\u0011!1\u0001\u0006\u0005\u0005\u000b\u00119!\u0001\u0004bi>l\u0017n\u0019\u0006\u0003y]LAAa\u0003\u0003\u0004\tQ\u0011\t^8nS\u000eduN\\4\u0002\rQ|G/\u00197!\u0005IIe\u000e];u\u0013:4wnQ8mY\u0016\u001cGo\u001c:\u0014\u000ba\tiOa\u0005\u0011\t\u0005E'QC\u0005\u0005\u0005/\t\u0019NA\tTiJ,\u0017-\\5oO2K7\u000f^3oKJ$\"Aa\u0007\u0011\u0007\tu\u0001$D\u0001\u0015\u0003MqW/\u001c*fG>\u0014Hm]*vE6LG\u000f^3e\u0003QqW/\u001c*fG>\u0014Hm]*vE6LG\u000f^3eA\u0005\tb.^7SK\u000e|'\u000fZ:Ti\u0006\u0014H/\u001a3\u0002%9,XNU3d_J$7o\u0015;beR,G\rI\u0001\u0014]Vl'+Z2pe\u0012\u001c8i\\7qY\u0016$X\rZ\u0001\u0015]Vl'+Z2pe\u0012\u001c8i\\7qY\u0016$X\r\u001a\u0011\u0002!=t')\u0019;dQN+(-\\5ui\u0016$GcA/\u00030!9!\u0011\u0007\u0011A\u0002\tM\u0012A\u00042bi\u000eD7+\u001e2nSR$X\r\u001a\t\u0005\u0003#\u0014)$\u0003\u0003\u00038\u0005M'aH*ue\u0016\fW.\u001b8h\u0019&\u001cH/\u001a8fe\n\u000bGo\u00195Tk\nl\u0017\u000e\u001e;fI\u0006qqN\u001c\"bi\u000eD7\u000b^1si\u0016$GcA/\u0003>!9!qH\u0011A\u0002\t\u0005\u0013\u0001\u00042bi\u000eD7\u000b^1si\u0016$\u0007\u0003BAi\u0005\u0007JAA!\u0012\u0002T\ni2\u000b\u001e:fC6Lgn\u001a'jgR,g.\u001a:CCR\u001c\u0007n\u0015;beR,G-\u0001\tp]\n\u000bGo\u00195D_6\u0004H.\u001a;fIR\u0019QLa\u0013\t\u000f\t5#\u00051\u0001\u0003P\u0005q!-\u0019;dQ\u000e{W\u000e\u001d7fi\u0016$\u0007\u0003BAi\u0005#JAAa\u0015\u0002T\ny2\u000b\u001e:fC6Lgn\u001a'jgR,g.\u001a:CCR\u001c\u0007nQ8na2,G/\u001a3\u0002\u0017I,\u0017\r\u001a*fg>dg/\u001a\u000b\u0003\u0003\u001b\u0001")
public class DirectKafkaStreamSuite
extends SparkFunSuite
implements LocalStreamingContext,
Eventually {
    private final SparkConf sparkConf;
    private File testDir;
    private KafkaTestUtils kafkaTestUtils;
    private final LocationStrategy preferredHosts;
    private final AbstractPatienceConfiguration.PatienceConfig org$scalatest$concurrent$PatienceConfiguration$$defaultPatienceConfig;
    private volatile AbstractPatienceConfiguration$PatienceConfig$ PatienceConfig$module;
    private transient StreamingContext ssc;
    private transient boolean stopSparkContext;

    public static AtomicLong total() {
        return DirectKafkaStreamSuite$.MODULE$.total();
    }

    public <T> T eventually(PatienceConfiguration.Timeout timeout, PatienceConfiguration.Interval interval, Function0<T> fun, Retrying<T> retrying, Position pos) {
        return (T)Eventually.eventually$((Eventually)this, (PatienceConfiguration.Timeout)timeout, (PatienceConfiguration.Interval)interval, fun, retrying, (Position)pos);
    }

    public <T> T eventually(PatienceConfiguration.Timeout timeout, Function0<T> fun, AbstractPatienceConfiguration.PatienceConfig config, Retrying<T> retrying, Position pos) {
        return (T)Eventually.eventually$((Eventually)this, (PatienceConfiguration.Timeout)timeout, fun, (AbstractPatienceConfiguration.PatienceConfig)config, retrying, (Position)pos);
    }

    public <T> T eventually(PatienceConfiguration.Interval interval, Function0<T> fun, AbstractPatienceConfiguration.PatienceConfig config, Retrying<T> retrying, Position pos) {
        return (T)Eventually.eventually$((Eventually)this, (PatienceConfiguration.Interval)interval, fun, (AbstractPatienceConfiguration.PatienceConfig)config, retrying, (Position)pos);
    }

    public <T> T eventually(Function0<T> fun, AbstractPatienceConfiguration.PatienceConfig config, Retrying<T> retrying, Position pos) {
        return (T)Eventually.eventually$((Eventually)this, fun, (AbstractPatienceConfiguration.PatienceConfig)config, retrying, (Position)pos);
    }

    public AbstractPatienceConfiguration.PatienceConfig patienceConfig() {
        return PatienceConfiguration.patienceConfig$((PatienceConfiguration)this);
    }

    public PatienceConfiguration.Timeout timeout(Span value) {
        return PatienceConfiguration.timeout$((PatienceConfiguration)this, (Span)value);
    }

    public PatienceConfiguration.Interval interval(Span value) {
        return PatienceConfiguration.interval$((PatienceConfiguration)this, (Span)value);
    }

    public final Span scaled(Span span) {
        return ScaledTimeSpans.scaled$((ScaledTimeSpans)this, (Span)span);
    }

    public double spanScaleFactor() {
        return ScaledTimeSpans.spanScaleFactor$((ScaledTimeSpans)this);
    }

    public /* synthetic */ void org$apache$spark$streaming$LocalStreamingContext$$super$afterEach() {
        BeforeAndAfterEach.afterEach$((BeforeAndAfterEach)this);
    }

    public void resetStreamingContext() {
        LocalStreamingContext.resetStreamingContext$((LocalStreamingContext)this);
    }

    public AbstractPatienceConfiguration.PatienceConfig org$scalatest$concurrent$PatienceConfiguration$$defaultPatienceConfig() {
        return this.org$scalatest$concurrent$PatienceConfiguration$$defaultPatienceConfig;
    }

    public final void org$scalatest$concurrent$PatienceConfiguration$_setter_$org$scalatest$concurrent$PatienceConfiguration$$defaultPatienceConfig_$eq(AbstractPatienceConfiguration.PatienceConfig x$1) {
        this.org$scalatest$concurrent$PatienceConfiguration$$defaultPatienceConfig = x$1;
    }

    public AbstractPatienceConfiguration$PatienceConfig$ PatienceConfig() {
        if (this.PatienceConfig$module == null) {
            this.PatienceConfig$lzycompute$1();
        }
        return this.PatienceConfig$module;
    }

    public StreamingContext ssc() {
        return this.ssc;
    }

    public void ssc_$eq(StreamingContext x$1) {
        this.ssc = x$1;
    }

    public boolean stopSparkContext() {
        return this.stopSparkContext;
    }

    public void stopSparkContext_$eq(boolean x$1) {
        this.stopSparkContext = x$1;
    }

    public SparkConf sparkConf() {
        return this.sparkConf;
    }

    private File testDir() {
        return this.testDir;
    }

    private void testDir_$eq(File x$1) {
        this.testDir = x$1;
    }

    private KafkaTestUtils kafkaTestUtils() {
        return this.kafkaTestUtils;
    }

    private void kafkaTestUtils_$eq(KafkaTestUtils x$1) {
        this.kafkaTestUtils = x$1;
    }

    public void beforeAll() {
        super.beforeAll();
        this.kafkaTestUtils_$eq(new KafkaTestUtils());
        this.kafkaTestUtils().setup();
    }

    public void afterAll() {
        try {
            if (this.kafkaTestUtils() != null) {
                this.kafkaTestUtils().teardown();
                this.kafkaTestUtils_$eq(null);
            }
        }
        finally {
            super.afterAll();
        }
    }

    public void afterEach() {
        try {
            if (this.testDir() != null) {
                Utils$.MODULE$.deleteRecursively(this.testDir());
            }
        }
        finally {
            LocalStreamingContext.afterEach$((LocalStreamingContext)this);
        }
    }

    /*
     * WARNING - void declaration
     */
    public HashMap<String, Object> getKafkaParams(Seq<Tuple2<String, Object>> extra) {
        void var2_2;
        HashMap<String, Object> kp = new HashMap<String, Object>();
        kp.put("bootstrap.servers", this.kafkaTestUtils().brokerAddress());
        kp.put("key.deserializer", StringDeserializer.class);
        kp.put("value.deserializer", StringDeserializer.class);
        kp.put("group.id", new StringBuilder(15).append("test-consumer-").append(Random$.MODULE$.nextInt()).append("-").append(System.currentTimeMillis()).toString());
        extra.foreach((Function1 & Serializable & scala.Serializable)e -> kp.put((String)e._1(), e._2()));
        return var2_2;
    }

    public LocationStrategy preferredHosts() {
        return this.preferredHosts;
    }

    private void backpressureTest(int maxRatePerPartition, int initialRate, int maxMessagesPerPartition) {
        String topic = UUID.randomUUID().toString();
        HashMap<String, Object> kafkaParams = this.getKafkaParams((Seq<Tuple2<String, Object>>)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"auto.offset.reset"), (Object)"earliest")}));
        SparkConf sparkConf = new SparkConf().setMaster("local[1]").setAppName(this.getClass().getSimpleName()).set("spark.streaming.backpressure.enabled", "true").set("spark.streaming.backpressure.initialRate", ((Object)BoxesRunTime.boxToInteger((int)initialRate)).toString()).set("spark.streaming.kafka.maxRatePerPartition", ((Object)BoxesRunTime.boxToInteger((int)maxRatePerPartition)).toString());
        scala.collection.immutable.Map messages = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"foo"), (Object)BoxesRunTime.boxToInteger((int)5000))}));
        this.kafkaTestUtils().sendMessages(topic, (scala.collection.immutable.Map<String, Object>)messages);
        this.ssc_$eq(new StreamingContext(sparkConf, Milliseconds$.MODULE$.apply(500L)));
        DirectKafkaInputDStream kafkaStream = (DirectKafkaInputDStream)this.withClue("Error creating direct stream", (Function0 & Serializable & scala.Serializable)() -> new DirectKafkaInputDStream(this.ssc(), this.preferredHosts(), ConsumerStrategies$.MODULE$.Subscribe((Iterable<String>)new .colon.colon((Object)topic, (List)Nil$.MODULE$), (Map<String, Object>)((Map)JavaConverters$.MODULE$.mapAsScalaMapConverter((java.util.Map)kafkaParams).asScala())), new DefaultPerPartitionConfig(sparkConf)));
        kafkaStream.start();
        scala.collection.immutable.Map input = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 0)), (Object)BoxesRunTime.boxToLong((long)1000L))}));
        scala.collection.immutable.Map $org_scalatest_assert_macro_left = (scala.collection.immutable.Map)kafkaStream.maxMessagesPerPartition((scala.collection.immutable.Map<TopicPartition, Object>)input).get();
        scala.collection.immutable.Map $org_scalatest_assert_macro_right = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 0)), (Object)BoxesRunTime.boxToInteger((int)maxMessagesPerPartition))}));
        scala.collection.immutable.Map map = $org_scalatest_assert_macro_left;
        scala.collection.immutable.Map map2 = $org_scalatest_assert_macro_right;
        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "==", (Object)$org_scalatest_assert_macro_right, !(map != null ? !map.equals(map2) : map2 != null), Prettifier$.MODULE$.default());
        Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 668));
        kafkaStream.stop();
    }

    private <K, V> Seq<Tuple2<Time, OffsetRange[]>> getOffsetRanges(DStream<ConsumerRecord<K, V>> kafkaStream) {
        return (Seq)kafkaStream.generatedRDDs().mapValues((Function1 & Serializable & scala.Serializable)rdd -> ((HasOffsetRanges)rdd).offsetRanges()).toSeq().sortBy((Function1 & Serializable & scala.Serializable)x$12 -> (Time)x$12._1(), Time$.MODULE$.ordering());
    }

    private DirectKafkaInputDStream<String, String> getDirectKafkaStream(String topic, Option<RateController> mockRateController, Option<PerPartitionConfig> ppc) {
        int batchIntervalMilliseconds = 100;
        SparkConf sparkConf = new SparkConf().setMaster("local[1]").setAppName(this.getClass().getSimpleName()).set("spark.streaming.kafka.maxRatePerPartition", "100");
        this.ssc_$eq(new StreamingContext(sparkConf, Milliseconds$.MODULE$.apply((long)batchIntervalMilliseconds)));
        HashMap<String, Object> kafkaParams = this.getKafkaParams((Seq<Tuple2<String, Object>>)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"auto.offset.reset"), (Object)"earliest")}));
        HashMap<String, Object> ekp = new HashMap<String, Object>(kafkaParams);
        KafkaUtils$.MODULE$.fixKafkaParams(ekp);
        DirectKafkaInputDStream<String, String> s = new DirectKafkaInputDStream<String, String>(this, ekp, kafkaParams, topic, ppc, sparkConf, mockRateController){
            private final Option<RateController> rateController;

            public Option<RateController> rateController() {
                return this.rateController;
            }
            {
                this.rateController = mockRateController$1;
            }
        };
        s.start();
        return s;
    }

    private final void PatienceConfig$lzycompute$1() {
        DirectKafkaStreamSuite directKafkaStreamSuite = this;
        synchronized (directKafkaStreamSuite) {
            if (this.PatienceConfig$module == null) {
                this.PatienceConfig$module = new AbstractPatienceConfiguration$PatienceConfig$((AbstractPatienceConfiguration)this);
            }
        }
    }

    public static final /* synthetic */ void $anonfun$new$2(DirectKafkaStreamSuite $this, scala.collection.immutable.Map data$1, String t) {
        $this.kafkaTestUtils().createTopic(t);
        $this.kafkaTestUtils().sendMessages(t, (scala.collection.immutable.Map<String, Object>)data$1);
    }

    public static final /* synthetic */ InputDStream $anonfun$new$3(DirectKafkaStreamSuite $this, List topics$1, HashMap kafkaParams$1, scala.collection.immutable.Map offsets$1) {
        return KafkaUtils$.MODULE$.createDirectStream($this.ssc(), $this.preferredHosts(), ConsumerStrategies$.MODULE$.Subscribe((Iterable<String>)topics$1, (Map<String, Object>)((Map)JavaConverters$.MODULE$.mapAsScalaMapConverter((java.util.Map)kafkaParams$1).asScala()), (Map<TopicPartition, Object>)offsets$1));
    }

    public static final /* synthetic */ Iterator $anonfun$new$9(ObjectRef offsetRanges$1, int i, Iterator iter) {
        OffsetRange off = ((OffsetRange[])offsetRanges$1.elem)[i];
        Seq all = iter.toSeq();
        int partSize = all.size();
        long rangeSize = off.untilOffset() - off.fromOffset();
        return scala.package$.MODULE$.Iterator().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2.mcIJ.sp(partSize, rangeSize)}));
    }

    public static final /* synthetic */ void $anonfun$new$6(DirectKafkaStreamSuite $this, ObjectRef offsetRanges$1, RDD rdd) {
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])((OffsetRange[])offsetRanges$1.elem))).foreach((Function1 & Serializable & scala.Serializable)o -> {
            $this.logInfo((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(3).append(o.topic()).append(" ").append(o.partition()).append(" ").append(o.fromOffset()).append(" ").append(o.untilOffset()).toString());
            return BoxedUnit.UNIT;
        });
        Tuple2[] collected = (Tuple2[])rdd.mapPartitionsWithIndex((Function2 & Serializable & scala.Serializable)(i, iter) -> DirectKafkaStreamSuite.$anonfun$new$9(offsetRanges$1, BoxesRunTime.unboxToInt((Object)i), iter), rdd.mapPartitionsWithIndex$default$2(), ClassTag$.MODULE$.apply(Tuple2.class)).collect();
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])collected)).foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Tuple2 tuple2 = x0$1;
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            int partSize = tuple2._1$mcI$sp();
            long rangeSize = tuple2._2$mcJ$sp();
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = $this.convertToEqualizer(BoxesRunTime.boxToInteger((int)partSize));
            long $org_scalatest_assert_macro_right = rangeSize;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
            Assertion assertion = Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"offset ranges are wrong", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 146));
            return assertion;
        });
    }

    public static final /* synthetic */ void $anonfun$new$15(DirectKafkaStreamSuite $this, scala.collection.immutable.Map data$2, String t) {
        $this.kafkaTestUtils().createTopic(t);
        $this.kafkaTestUtils().sendMessages(t, (scala.collection.immutable.Map<String, Object>)data$2);
    }

    public static final /* synthetic */ Iterator $anonfun$new$22(ObjectRef offsetRanges$2, int i, Iterator iter) {
        OffsetRange off = ((OffsetRange[])offsetRanges$2.elem)[i];
        Seq all = iter.toSeq();
        int partSize = all.size();
        long rangeSize = off.untilOffset() - off.fromOffset();
        return scala.package$.MODULE$.Iterator().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2.mcIJ.sp(partSize, rangeSize)}));
    }

    public static final /* synthetic */ void $anonfun$new$19(DirectKafkaStreamSuite $this, ObjectRef offsetRanges$2, RDD rdd) {
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])((OffsetRange[])offsetRanges$2.elem))).foreach((Function1 & Serializable & scala.Serializable)o -> {
            $this.logInfo((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(3).append(o.topic()).append(" ").append(o.partition()).append(" ").append(o.fromOffset()).append(" ").append(o.untilOffset()).toString());
            return BoxedUnit.UNIT;
        });
        Tuple2[] collected = (Tuple2[])rdd.mapPartitionsWithIndex((Function2 & Serializable & scala.Serializable)(i, iter) -> DirectKafkaStreamSuite.$anonfun$new$22(offsetRanges$2, BoxesRunTime.unboxToInt((Object)i), iter), rdd.mapPartitionsWithIndex$default$2(), ClassTag$.MODULE$.apply(Tuple2.class)).collect();
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])collected)).foreach((Function1 & Serializable & scala.Serializable)x0$2 -> {
            Tuple2 tuple2 = x0$2;
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            int partSize = tuple2._1$mcI$sp();
            long rangeSize = tuple2._2$mcJ$sp();
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = $this.convertToEqualizer(BoxesRunTime.boxToInteger((int)partSize));
            long $org_scalatest_assert_macro_right = rangeSize;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
            Assertion assertion = Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"offset ranges are wrong", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 212));
            return assertion;
        });
    }

    private static final long getLatestOffset$1(KafkaConsumer kc$1, TopicPartition topicPartition$1) {
        kc$1.seekToEnd(Arrays.asList((Object[])new TopicPartition[]{topicPartition$1}));
        return kc$1.position(topicPartition$1);
    }

    private static final long getLatestOffset$2(KafkaConsumer kc$2, TopicPartition topicPartition$2) {
        kc$2.seekToEnd(Arrays.asList((Object[])new TopicPartition[]{topicPartition$2}));
        return kc$2.position(topicPartition$2);
    }

    public static final /* synthetic */ String $anonfun$new$40(int x$3) {
        return ((Object)BoxesRunTime.boxToInteger((int)x$3)).toString();
    }

    private final void sendData$1(Seq data, String topic$2) {
        Seq strings = (Seq)data.map((Function1 & Serializable & scala.Serializable)x$3 -> DirectKafkaStreamSuite.$anonfun$new$40(BoxesRunTime.unboxToInt((Object)x$3)), Seq$.MODULE$.canBuildFrom());
        this.kafkaTestUtils().sendMessages(topic$2, (scala.collection.immutable.Map<String, Object>)((TraversableOnce)strings.map((Function1 & Serializable & scala.Serializable)x$4 -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(x$4), (Object)BoxesRunTime.boxToInteger((int)1)), Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()));
    }

    public static final /* synthetic */ void $anonfun$new$47(Tuple2 x) {
        DirectKafkaStreamSuite$.MODULE$.total().set(x._2$mcI$sp());
    }

    public static final /* synthetic */ void $anonfun$new$46(RDD rdd) {
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])rdd.collect())).headOption().foreach((Function1 & Serializable & scala.Serializable)x -> {
            DirectKafkaStreamSuite.$anonfun$new$47(x);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ boolean $anonfun$new$50(DirectKafkaStreamSuite $this, OffsetRange x$5) {
        return $this.convertToEqualizer(BoxesRunTime.boxToLong((long)x$5.fromOffset())).$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)0), Equality$.MODULE$.default());
    }

    public static final /* synthetic */ String $anonfun$new$58(int x$6) {
        return ((Object)BoxesRunTime.boxToInteger((int)x$6)).toString();
    }

    private final void sendDataAndWaitForReceive$1(Seq data, String topic$3, ConcurrentLinkedQueue collectedData$3) {
        Seq strings = (Seq)data.map((Function1 & Serializable & scala.Serializable)x$6 -> DirectKafkaStreamSuite.$anonfun$new$58(BoxesRunTime.unboxToInt((Object)x$6)), Seq$.MODULE$.canBuildFrom());
        this.kafkaTestUtils().sendMessages(topic$3, (scala.collection.immutable.Map<String, Object>)((TraversableOnce)strings.map((Function1 & Serializable & scala.Serializable)x$7 -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(x$7), (Object)BoxesRunTime.boxToInteger((int)1)), Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()));
        this.eventually(this.timeout(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(10)).seconds())), this.interval(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(50)).milliseconds())), (Function0 & Serializable & scala.Serializable)() -> {
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(strings.forall((Function1 & Serializable & scala.Serializable)x$1 -> BoxesRunTime.boxToBoolean((boolean)collectedData$3.contains(x$1))), "strings.forall({\n  ((x$1: Any) => collectedData.contains(x$1))\n})", Prettifier$.MODULE$.default());
            return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 439));
        }, Retrying$.MODULE$.retryingNatureOfT(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 438));
    }

    public static final /* synthetic */ void $anonfun$new$63(DirectKafkaStreamSuite $this, ConcurrentLinkedQueue collectedData$3, InputDStream kafkaStream$1, ConcurrentHashMap committed$1, RDD rdd, Time time) {
        OffsetRange[] offsets = ((HasOffsetRanges)rdd).offsetRanges();
        String[] data = (String[])rdd.map((Function1 & Serializable & scala.Serializable)x$8 -> (String)x$8.value(), ClassTag$.MODULE$.apply(String.class)).collect();
        collectedData$3.addAll(Arrays.asList((Object[])data));
        ((CanCommitOffsets)kafkaStream$1).commitAsync(offsets, (m, e) -> {
            if (e != null) {
                $this.logError((Function0 & Serializable & scala.Serializable)() -> "commit failed", e);
            } else {
                committed$1.putAll(m);
                $this.logDebug((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(18).append("commit succeeded: ").append(m).toString());
            }
        });
    }

    private static final String dataToString$1(ConcurrentLinkedQueue collectedData$4) {
        return ((TraversableOnce)((TraversableLike)JavaConverters$.MODULE$.collectionAsScalaIterableConverter((Collection)collectedData$4).asScala()).map((Function1 & Serializable & scala.Serializable)x$9 -> new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])x$9)).mkString("[", ",", "]"), Iterable$.MODULE$.canBuildFrom())).mkString("{", ", ", "}");
    }

    public static final /* synthetic */ void $anonfun$new$83(ConcurrentLinkedQueue collectedData$4, RDD rdd, Time time) {
        String[] data = (String[])rdd.map((Function1 & Serializable & scala.Serializable)x$10 -> (String)x$10._2(), ClassTag$.MODULE$.apply(String.class)).collect();
        collectedData$4.add(data);
    }

    public static final /* synthetic */ boolean $anonfun$new$87(long expectedSize$1, String[] x$11) {
        return (long)new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])x$11)).size() == expectedSize$1;
    }

    public static final /* synthetic */ Assertion $anonfun$new$85(DirectKafkaStreamSuite $this, ConcurrentLinkedQueue collectedData$4, ConstantEstimator estimator$1, int batchIntervalMilliseconds$1, int rate) {
        collectedData$4.clear();
        estimator$1.updateRate(rate);
        long expectedSize = Math.round((double)(rate * batchIntervalMilliseconds$1) * 0.001);
        return (Assertion)$this.eventually($this.timeout(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(5)).seconds())), $this.interval(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(10)).milliseconds())), (Function0 & Serializable & scala.Serializable)() -> {
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.simpleMacroBool(((IterableLike)JavaConverters$.MODULE$.collectionAsScalaIterableConverter((Collection)collectedData$4).asScala()).exists((Function1 & Serializable & scala.Serializable)x$11 -> BoxesRunTime.boxToBoolean((boolean)DirectKafkaStreamSuite.$anonfun$new$87(expectedSize, x$11))), "scala.collection.JavaConverters.collectionAsScalaIterableConverter[Array[String]](collectedData).asScala.exists(((x$11: Array[String]) => scala.Predef.refArrayOps[String](x$11).size.==(expectedSize)))", Prettifier$.MODULE$.default());
            return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)new StringBuilder(41).append(" - No arrays of size ").append(expectedSize).append(" for rate ").append(rate).append(" found in ").append(DirectKafkaStreamSuite.dataToString$1(collectedData$4)).toString(), Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 619));
        }, Retrying$.MODULE$.retryingNatureOfT(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 616));
    }

    public DirectKafkaStreamSuite() {
        LocalStreamingContext.$init$((LocalStreamingContext)this);
        ScaledTimeSpans.$init$((ScaledTimeSpans)this);
        AbstractPatienceConfiguration.$init$((AbstractPatienceConfiguration)this);
        PatienceConfiguration.$init$((PatienceConfiguration)this);
        Eventually.$init$((Eventually)this);
        this.sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass().getSimpleName()).set("spark.streaming.kafka.consumer.poll.ms", "10000");
        this.preferredHosts = LocationStrategies$.MODULE$.PreferConsistent();
        this.test("basic stream receiving with multiple topics and smallest starting offset", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            .colon.colon topics = new .colon.colon((Object)"basic1", (List)new .colon.colon((Object)"basic2", (List)new .colon.colon((Object)"basic3", (List)Nil$.MODULE$)));
            scala.collection.immutable.Map data = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"a"), (Object)BoxesRunTime.boxToInteger((int)7)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"b"), (Object)BoxesRunTime.boxToInteger((int)9))}));
            topics.foreach((Function1 & Serializable & scala.Serializable)t -> {
                DirectKafkaStreamSuite.$anonfun$new$2(this, data, t);
                return BoxedUnit.UNIT;
            });
            scala.collection.immutable.Map offsets = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition("basic3", 0)), (Object)BoxesRunTime.boxToLong((long)2L))}));
            int expectedTotal = BoxesRunTime.unboxToInt((Object)data.values().sum((Numeric)Numeric.IntIsIntegral$.MODULE$)) * topics.size() - 2;
            HashMap<String, Object> kafkaParams = this.getKafkaParams((Seq<Tuple2<String, Object>>)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"auto.offset.reset"), (Object)"earliest")}));
            this.ssc_$eq(new StreamingContext(this.sparkConf(), Milliseconds$.MODULE$.apply(1000L)));
            InputDStream stream = (InputDStream)this.withClue("Error creating direct stream", () -> DirectKafkaStreamSuite.$anonfun$new$3(this, (List)topics, kafkaParams, offsets));
            ConcurrentLinkedQueue allReceived = new ConcurrentLinkedQueue();
            ObjectRef offsetRanges = ObjectRef.create((Object)((OffsetRange[])Array$.MODULE$.apply((Seq)Nil$.MODULE$, ClassTag$.MODULE$.apply(OffsetRange.class))));
            DStream tf = stream.transform((Function1 & Serializable & scala.Serializable)rdd -> {
                offsetRanges$1.elem = ((HasOffsetRanges)rdd).offsetRanges();
                return rdd.map((Function1 & Serializable & scala.Serializable)r -> new Tuple2(r.key(), r.value()), ClassTag$.MODULE$.apply(Tuple2.class));
            }, ClassTag$.MODULE$.apply(Tuple2.class));
            tf.foreachRDD((Function1 & Serializable & scala.Serializable)rdd -> {
                DirectKafkaStreamSuite.$anonfun$new$6(this, offsetRanges, rdd);
                return BoxedUnit.UNIT;
            });
            stream.foreachRDD((Function1 & Serializable & scala.Serializable)rdd -> {
                allReceived.addAll(Arrays.asList((Object[])rdd.map((Function1 & Serializable & scala.Serializable)r -> new Tuple2(r.key(), r.value()), ClassTag$.MODULE$.apply(Tuple2.class)).collect()));
                return BoxedUnit.UNIT;
            });
            this.ssc().start();
            this.eventually(this.timeout(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(100)).seconds())), this.interval(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(1)).second())), (Function0 & Serializable & scala.Serializable)() -> {
                TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.convertToEqualizer(BoxesRunTime.boxToInteger((int)allReceived.size()));
                int $org_scalatest_assert_macro_right = expectedTotal;
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)new StringBuilder(50).append("didn't get expected number of messages, messages:\n").append(((TraversableOnce)JavaConverters$.MODULE$.collectionAsScalaIterableConverter((Collection)allReceived).asScala()).mkString("\n")).toString(), Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 155));
            }, Retrying$.MODULE$.retryingNatureOfT(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 154));
            StreamingContext qual$1 = this.ssc();
            boolean x$1 = qual$1.stop$default$1();
            qual$1.stop(x$1);
        }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 100));
        this.test("pattern based subscription", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            .colon.colon topics = new .colon.colon((Object)"pat1", (List)new .colon.colon((Object)"pat2", (List)new .colon.colon((Object)"pat3", (List)new .colon.colon((Object)"advanced3", (List)Nil$.MODULE$))));
            Pattern pat = new StringOps(Predef$.MODULE$.augmentString("pat\\d")).r().pattern();
            scala.collection.immutable.Map data = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"a"), (Object)BoxesRunTime.boxToInteger((int)7)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"b"), (Object)BoxesRunTime.boxToInteger((int)9))}));
            topics.foreach((Function1 & Serializable & scala.Serializable)t -> {
                DirectKafkaStreamSuite.$anonfun$new$15(this, data, t);
                return BoxedUnit.UNIT;
            });
            scala.collection.immutable.Map offsets = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition("pat2", 0)), (Object)BoxesRunTime.boxToLong((long)3L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition("pat3", 0)), (Object)BoxesRunTime.boxToLong((long)4L))}));
            int expectedTotal = BoxesRunTime.unboxToInt((Object)data.values().sum((Numeric)Numeric.IntIsIntegral$.MODULE$)) * 3 - 7;
            HashMap<String, Object> kafkaParams = this.getKafkaParams((Seq<Tuple2<String, Object>>)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"auto.offset.reset"), (Object)"earliest")}));
            this.ssc_$eq(new StreamingContext(this.sparkConf(), Milliseconds$.MODULE$.apply(1000L)));
            InputDStream stream = (InputDStream)this.withClue("Error creating direct stream", (Function0 & Serializable & scala.Serializable)() -> KafkaUtils$.MODULE$.createDirectStream(this.ssc(), this.preferredHosts(), ConsumerStrategies$.MODULE$.SubscribePattern(pat, (Map<String, Object>)((Map)JavaConverters$.MODULE$.mapAsScalaMapConverter((java.util.Map)kafkaParams).asScala()), (Map<TopicPartition, Object>)offsets)));
            ConcurrentLinkedQueue allReceived = new ConcurrentLinkedQueue();
            ObjectRef offsetRanges = ObjectRef.create((Object)((OffsetRange[])Array$.MODULE$.apply((Seq)Nil$.MODULE$, ClassTag$.MODULE$.apply(OffsetRange.class))));
            DStream tf = stream.transform((Function1 & Serializable & scala.Serializable)rdd -> {
                offsetRanges$2.elem = ((HasOffsetRanges)rdd).offsetRanges();
                return rdd.map((Function1 & Serializable & scala.Serializable)r -> new Tuple2(r.key(), r.value()), ClassTag$.MODULE$.apply(Tuple2.class));
            }, ClassTag$.MODULE$.apply(Tuple2.class));
            tf.foreachRDD((Function1 & Serializable & scala.Serializable)rdd -> {
                DirectKafkaStreamSuite.$anonfun$new$19(this, offsetRanges, rdd);
                return BoxedUnit.UNIT;
            });
            stream.foreachRDD((Function1 & Serializable & scala.Serializable)rdd -> {
                allReceived.addAll(Arrays.asList((Object[])rdd.map((Function1 & Serializable & scala.Serializable)r -> new Tuple2(r.key(), r.value()), ClassTag$.MODULE$.apply(Tuple2.class)).collect()));
                return BoxedUnit.UNIT;
            });
            this.ssc().start();
            this.eventually(this.timeout(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(100)).seconds())), this.interval(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(1)).second())), (Function0 & Serializable & scala.Serializable)() -> {
                TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.convertToEqualizer(BoxesRunTime.boxToInteger((int)allReceived.size()));
                int $org_scalatest_assert_macro_right = expectedTotal;
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)new StringBuilder(50).append("didn't get expected number of messages, messages:\n").append(((TraversableOnce)JavaConverters$.MODULE$.collectionAsScalaIterableConverter((Collection)allReceived).asScala()).mkString("\n")).toString(), Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 221));
            }, Retrying$.MODULE$.retryingNatureOfT(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 220));
            StreamingContext qual$2 = this.ssc();
            boolean x$2 = qual$2.stop$default$1();
            qual$2.stop(x$2);
        }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 162));
        this.test("receiving from largest starting offset", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            String topic = "latest";
            TopicPartition topicPartition = new TopicPartition(topic, 0);
            scala.collection.immutable.Map data = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"a"), (Object)BoxesRunTime.boxToInteger((int)10))}));
            this.kafkaTestUtils().createTopic(topic);
            HashMap<String, Object> kafkaParams = this.getKafkaParams((Seq<Tuple2<String, Object>>)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"auto.offset.reset"), (Object)"latest")}));
            KafkaConsumer kc = new KafkaConsumer(kafkaParams);
            kc.assign(Arrays.asList((Object[])new TopicPartition[]{topicPartition}));
            this.kafkaTestUtils().sendMessages(topic, (scala.collection.immutable.Map<String, Object>)data);
            this.eventually(this.timeout(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(10)).seconds())), this.interval(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(20)).milliseconds())), (Function0 & Serializable & scala.Serializable)() -> {
                long $org_scalatest_assert_macro_left = DirectKafkaStreamSuite.getLatestOffset$1(kc, topicPartition);
                int $org_scalatest_assert_macro_right = 3;
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_left), ">", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left > (long)$org_scalatest_assert_macro_right, Prettifier$.MODULE$.default());
                return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 245));
            }, Retrying$.MODULE$.retryingNatureOfT(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 244));
            long offsetBeforeStart = DirectKafkaStreamSuite.getLatestOffset$1(kc, topicPartition);
            kc.close();
            this.ssc_$eq(new StreamingContext(this.sparkConf(), Milliseconds$.MODULE$.apply(200L)));
            DirectKafkaInputDStream stream = (DirectKafkaInputDStream)this.withClue("Error creating direct stream", (Function0 & Serializable & scala.Serializable)() -> {
                DirectKafkaInputDStream s = new DirectKafkaInputDStream(this.ssc(), this.preferredHosts(), ConsumerStrategies$.MODULE$.Subscribe((Iterable<String>)new .colon.colon((Object)topic, (List)Nil$.MODULE$), (Map<String, Object>)((Map)JavaConverters$.MODULE$.mapAsScalaMapConverter((java.util.Map)kafkaParams).asScala())), new DefaultPerPartitionConfig(this.sparkConf()));
                s.consumer().poll(0L);
                long $org_scalatest_assert_macro_left = s.consumer().position(topicPartition);
                long $org_scalatest_assert_macro_right = offsetBeforeStart;
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_left), ">=", (Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left >= $org_scalatest_assert_macro_right, Prettifier$.MODULE$.default());
                Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"Start offset not from latest", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 259));
                return s;
            });
            ConcurrentLinkedQueue collectedData = new ConcurrentLinkedQueue();
            stream.map((Function1 & Serializable & scala.Serializable)x$1 -> (String)x$1.value(), ClassTag$.MODULE$.apply(String.class)).foreachRDD((Function1 & Serializable & scala.Serializable)rdd -> {
                collectedData.addAll(Arrays.asList((Object[])rdd.collect()));
                return BoxedUnit.UNIT;
            });
            this.ssc().start();
            scala.collection.immutable.Map newData = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"b"), (Object)BoxesRunTime.boxToInteger((int)10))}));
            this.kafkaTestUtils().sendMessages(topic, (scala.collection.immutable.Map<String, Object>)newData);
            this.eventually(this.timeout(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(10)).seconds())), this.interval(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(50)).milliseconds())), (Function0)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> collectedData.contains("b"), (Retrying)Retrying$.MODULE$.retryingNatureOfT(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 273));
            ConcurrentLinkedQueue $org_scalatest_assert_macro_left = collectedData;
            String $org_scalatest_assert_macro_right = "a";
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.notBool(Bool$.MODULE$.binaryMacroBool($org_scalatest_assert_macro_left, "contains", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left.contains($org_scalatest_assert_macro_right), Prettifier$.MODULE$.default()), Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 276));
            StreamingContext qual$3 = this.ssc();
            boolean x$3 = qual$3.stop$default$1();
            qual$3.stop(x$3);
        }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 229));
        this.test("creating stream by offset", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            String topic = "offset";
            TopicPartition topicPartition = new TopicPartition(topic, 0);
            scala.collection.immutable.Map data = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"a"), (Object)BoxesRunTime.boxToInteger((int)10))}));
            this.kafkaTestUtils().createTopic(topic);
            HashMap<String, Object> kafkaParams = this.getKafkaParams((Seq<Tuple2<String, Object>>)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"auto.offset.reset"), (Object)"latest")}));
            KafkaConsumer kc = new KafkaConsumer(kafkaParams);
            kc.assign(Arrays.asList((Object[])new TopicPartition[]{topicPartition}));
            this.kafkaTestUtils().sendMessages(topic, (scala.collection.immutable.Map<String, Object>)data);
            this.eventually(this.timeout(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(10)).seconds())), this.interval(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(20)).milliseconds())), (Function0 & Serializable & scala.Serializable)() -> {
                long $org_scalatest_assert_macro_left = DirectKafkaStreamSuite.getLatestOffset$2(kc, topicPartition);
                int $org_scalatest_assert_macro_right = 10;
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_left), ">=", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left >= (long)$org_scalatest_assert_macro_right, Prettifier$.MODULE$.default());
                return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 297));
            }, Retrying$.MODULE$.retryingNatureOfT(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 296));
            long offsetBeforeStart = DirectKafkaStreamSuite.getLatestOffset$2(kc, topicPartition);
            kc.close();
            kafkaParams.put("auto.offset.reset", "none");
            this.ssc_$eq(new StreamingContext(this.sparkConf(), Milliseconds$.MODULE$.apply(200L)));
            DirectKafkaInputDStream stream = (DirectKafkaInputDStream)this.withClue("Error creating direct stream", (Function0 & Serializable & scala.Serializable)() -> {
                DirectKafkaInputDStream s = new DirectKafkaInputDStream(this.ssc(), this.preferredHosts(), ConsumerStrategies$.MODULE$.Assign((Iterable<TopicPartition>)new .colon.colon((Object)topicPartition, (List)Nil$.MODULE$), (Map<String, Object>)((Map)JavaConverters$.MODULE$.mapAsScalaMapConverter((java.util.Map)kafkaParams).asScala()), (Map<TopicPartition, Object>)((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topicPartition), (Object)BoxesRunTime.boxToLong((long)11L))})))), new DefaultPerPartitionConfig(this.sparkConf()));
                s.consumer().poll(0L);
                long $org_scalatest_assert_macro_left = s.consumer().position(topicPartition);
                long $org_scalatest_assert_macro_right = offsetBeforeStart;
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_left), ">=", (Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left >= $org_scalatest_assert_macro_right, Prettifier$.MODULE$.default());
                Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"Start offset not from latest", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 315));
                return s;
            });
            ConcurrentLinkedQueue collectedData = new ConcurrentLinkedQueue();
            stream.map((Function1 & Serializable & scala.Serializable)x$2 -> (String)x$2.value(), ClassTag$.MODULE$.apply(String.class)).foreachRDD((Function1 & Serializable & scala.Serializable)rdd -> {
                collectedData.addAll(Arrays.asList((Object[])rdd.collect()));
                return BoxedUnit.UNIT;
            });
            this.ssc().start();
            scala.collection.immutable.Map newData = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"b"), (Object)BoxesRunTime.boxToInteger((int)10))}));
            this.kafkaTestUtils().sendMessages(topic, (scala.collection.immutable.Map<String, Object>)newData);
            this.eventually(this.timeout(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(10)).seconds())), this.interval(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(50)).milliseconds())), (Function0)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> collectedData.contains("b"), (Retrying)Retrying$.MODULE$.retryingNatureOfT(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 327));
            ConcurrentLinkedQueue $org_scalatest_assert_macro_left = collectedData;
            String $org_scalatest_assert_macro_right = "a";
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.notBool(Bool$.MODULE$.binaryMacroBool($org_scalatest_assert_macro_left, "contains", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left.contains($org_scalatest_assert_macro_right), Prettifier$.MODULE$.default()), Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 330));
            StreamingContext qual$4 = this.ssc();
            boolean x$4 = qual$4.stop$default$1();
            qual$4.stop(x$4);
        }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 281));
        this.test("offset recovery", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            String topic = "recovery";
            this.kafkaTestUtils().createTopic(topic);
            this.testDir_$eq(Utils$.MODULE$.createTempDir(Utils$.MODULE$.createTempDir$default$1(), Utils$.MODULE$.createTempDir$default$2()));
            HashMap<String, Object> kafkaParams = this.getKafkaParams((Seq<Tuple2<String, Object>>)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"auto.offset.reset"), (Object)"earliest")}));
            this.ssc_$eq(new StreamingContext(this.sparkConf(), Milliseconds$.MODULE$.apply(100L)));
            InputDStream kafkaStream = (InputDStream)this.withClue("Error creating direct stream", (Function0 & Serializable & scala.Serializable)() -> KafkaUtils$.MODULE$.createDirectStream(this.ssc(), this.preferredHosts(), ConsumerStrategies$.MODULE$.Subscribe((Iterable<String>)new .colon.colon((Object)topic, (List)Nil$.MODULE$), (Map<String, Object>)((Map)JavaConverters$.MODULE$.mapAsScalaMapConverter((java.util.Map)kafkaParams).asScala()))));
            DStream keyedStream = kafkaStream.map((Function1 & Serializable & scala.Serializable)r -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"key"), (Object)BoxesRunTime.boxToInteger((int)new StringOps(Predef$.MODULE$.augmentString((String)r.value())).toInt())), ClassTag$.MODULE$.apply(Tuple2.class));
            DStream stateStream = DStream$.MODULE$.toPairDStreamFunctions(keyedStream, ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.Int(), (Ordering)Ordering.String$.MODULE$).updateStateByKey((Function2 & Serializable & scala.Serializable)(values2, state) -> new Some((Object)BoxesRunTime.boxToInteger((int)(BoxesRunTime.unboxToInt((Object)values2.sum((Numeric)Numeric.IntIsIntegral$.MODULE$)) + BoxesRunTime.unboxToInt((Object)state.getOrElse((Function0)(JFunction0.mcI.sp & Serializable & scala.Serializable)() -> 0))))), ClassTag$.MODULE$.Int());
            this.ssc().checkpoint(this.testDir().getAbsolutePath());
            stateStream.foreachRDD((Function1 & Serializable & scala.Serializable)rdd -> {
                DirectKafkaStreamSuite.$anonfun$new$46(rdd);
                return BoxedUnit.UNIT;
            });
            this.ssc().start();
            RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 10).grouped(4).foreach((Function1 & Serializable & scala.Serializable)i -> {
                this.sendData$1((Seq)i, topic);
                return BoxedUnit.UNIT;
            });
            this.eventually(this.timeout(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(20)).seconds())), this.interval(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(50)).milliseconds())), (Function0 & Serializable & scala.Serializable)() -> {
                TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.convertToEqualizer(BoxesRunTime.boxToLong((long)DirectKafkaStreamSuite$.MODULE$.total().get()));
                int $org_scalatest_assert_macro_right = RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 10).sum((Numeric)Numeric.IntIsIntegral$.MODULE$);
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 377));
            }, Retrying$.MODULE$.retryingNatureOfT(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 376));
            StreamingContext qual$5 = this.ssc();
            boolean x$52 = qual$5.stop$default$1();
            qual$5.stop(x$52);
            Seq<Tuple2<Time, OffsetRange[]>> offsetRangesBeforeStop = this.getOffsetRanges((DStream)kafkaStream);
            int $org_scalatest_assert_macro_left = offsetRangesBeforeStop.size();
            int $org_scalatest_assert_macro_right = 1;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_left), ">=", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left >= $org_scalatest_assert_macro_right, Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"No offset ranges generated", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 384));
            Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.simpleMacroBool(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])((Tuple2)offsetRangesBeforeStop.head())._2())).forall((Function1 & Serializable & scala.Serializable)x$5 -> BoxesRunTime.boxToBoolean((boolean)DirectKafkaStreamSuite.$anonfun$new$50(this, x$5))), "scala.Predef.refArrayOps[org.apache.spark.streaming.kafka010.OffsetRange](offsetRangesBeforeStop.head._2).forall(((x$5: org.apache.spark.streaming.kafka010.OffsetRange) => DirectKafkaStreamSuite.this.convertToEqualizer[Long](x$5.fromOffset).===(0)(scalactic.this.Equality.default[Long])))", Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"starting offset not zero", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 385));
            this.logInfo((Function0 & Serializable & scala.Serializable)() -> "====== RESTARTING ========");
            this.ssc_$eq(new StreamingContext(this.testDir().getAbsolutePath()));
            DStream recoveredStream = (DStream)new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])this.ssc().graph().getInputStreams())).head();
            Seq recoveredOffsetRanges = (Seq)this.getOffsetRanges(recoveredStream).map((Function1 & Serializable & scala.Serializable)x -> new Tuple2(x._1(), (Object)new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])x._2())).toSet()), Seq$.MODULE$.canBuildFrom());
            int $org_scalatest_assert_macro_left2 = recoveredOffsetRanges.size();
            int $org_scalatest_assert_macro_right2 = 0;
            Bool $org_scalatest_assert_macro_expr3 = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_left2), ">", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right2), $org_scalatest_assert_macro_left2 > $org_scalatest_assert_macro_right2, Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr3, (Object)"No offset ranges recovered", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 399));
            Seq earlierOffsetRanges = (Seq)offsetRangesBeforeStop.map((Function1 & Serializable & scala.Serializable)x -> new Tuple2(x._1(), (Object)new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])x._2())).toSet()), Seq$.MODULE$.canBuildFrom());
            Bool $org_scalatest_assert_macro_expr4 = Bool$.MODULE$.simpleMacroBool(recoveredOffsetRanges.forall((Function1 & Serializable & scala.Serializable)or -> BoxesRunTime.boxToBoolean((boolean)earlierOffsetRanges.contains((Object)new Tuple2(or._1(), or._2())))), "recoveredOffsetRanges.forall(((or: (org.apache.spark.streaming.Time, scala.collection.immutable.Set[org.apache.spark.streaming.kafka010.OffsetRange])) => earlierOffsetRanges.contains[(org.apache.spark.streaming.Time, scala.collection.immutable.Set[org.apache.spark.streaming.kafka010.OffsetRange])](scala.Tuple2.apply[org.apache.spark.streaming.Time, scala.collection.immutable.Set[org.apache.spark.streaming.kafka010.OffsetRange]](or._1, or._2))))", Prettifier$.MODULE$.default());
            Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr4, (Object)new StringBuilder(57).append("Recovered ranges are not the same as the ones generated\n").append(earlierOffsetRanges).append("\n").append(recoveredOffsetRanges).toString(), Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 401));
            this.ssc().start();
            RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(11), 20).grouped(4).foreach((Function1 & Serializable & scala.Serializable)i -> {
                this.sendData$1((Seq)i, topic);
                return BoxedUnit.UNIT;
            });
            this.eventually(this.timeout(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(20)).seconds())), this.interval(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(50)).milliseconds())), (Function0 & Serializable & scala.Serializable)() -> {
                TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.convertToEqualizer(BoxesRunTime.boxToLong((long)DirectKafkaStreamSuite$.MODULE$.total().get()));
                int $org_scalatest_assert_macro_right = RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 20).sum((Numeric)Numeric.IntIsIntegral$.MODULE$);
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 416));
            }, Retrying$.MODULE$.retryingNatureOfT(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 415));
            StreamingContext qual$6 = this.ssc();
            boolean x$6 = qual$6.stop$default$1();
            qual$6.stop(x$6);
        }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 335));
        this.test("offset recovery from kafka", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            String topic = "recoveryfromkafka";
            this.kafkaTestUtils().createTopic(topic);
            HashMap<String, Object> kafkaParams = this.getKafkaParams((Seq<Tuple2<String, Object>>)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"auto.offset.reset"), (Object)"earliest"), new Tuple2((Object)"enable.auto.commit", (Object)Predef$.MODULE$.boolean2Boolean(false))}));
            ConcurrentLinkedQueue collectedData = new ConcurrentLinkedQueue();
            ConcurrentHashMap committed = new ConcurrentHashMap();
            this.ssc_$eq(new StreamingContext(this.sparkConf(), Milliseconds$.MODULE$.apply(100L)));
            this.withClue("Error creating direct stream", (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
                InputDStream kafkaStream = KafkaUtils$.MODULE$.createDirectStream(this.ssc(), this.preferredHosts(), ConsumerStrategies$.MODULE$.Subscribe((Iterable<String>)new .colon.colon((Object)topic, (List)Nil$.MODULE$), (Map<String, Object>)((Map)JavaConverters$.MODULE$.mapAsScalaMapConverter((java.util.Map)kafkaParams).asScala())));
                kafkaStream.foreachRDD((Function2 & Serializable & scala.Serializable)(rdd, time) -> {
                    DirectKafkaStreamSuite.$anonfun$new$63(this, collectedData, kafkaStream, committed, rdd, time);
                    return BoxedUnit.UNIT;
                });
            });
            this.ssc().start();
            RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 10).grouped(4).foreach((Function1 & Serializable & scala.Serializable)i -> {
                this.sendDataAndWaitForReceive$1((Seq)i, topic, collectedData);
                return BoxedUnit.UNIT;
            });
            this.eventually(this.timeout(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(10)).seconds())), this.interval(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(50)).milliseconds())), (Function0 & Serializable & scala.Serializable)() -> {
                ConcurrentHashMap $org_scalatest_assert_macro_left = committed;
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.notBool(Bool$.MODULE$.unaryMacroBool((Object)$org_scalatest_assert_macro_left, "isEmpty", $org_scalatest_assert_macro_left.isEmpty(), Prettifier$.MODULE$.default()), Prettifier$.MODULE$.default());
                return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 471));
            }, Retrying$.MODULE$.retryingNatureOfT(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 470));
            StreamingContext qual$7 = this.ssc();
            boolean x$7 = qual$7.stop$default$1();
            qual$7.stop(x$7);
            KafkaConsumer consumer = new KafkaConsumer(kafkaParams);
            consumer.subscribe(Arrays.asList((Object[])new String[]{topic}));
            consumer.poll(0L);
            ((IterableLike)JavaConverters$.MODULE$.mapAsScalaConcurrentMapConverter(committed).asScala()).foreach((Function1 & Serializable & scala.Serializable)x0$3 -> {
                Tuple2 tuple2 = x0$3;
                if (tuple2 == null) {
                    throw new MatchError((Object)tuple2);
                }
                TopicPartition k = (TopicPartition)tuple2._1();
                OffsetAndMetadata v = (OffsetAndMetadata)tuple2._2();
                long $org_scalatest_assert_macro_left = v.offset();
                int $org_scalatest_assert_macro_right = 0;
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_left), ">", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left > (long)$org_scalatest_assert_macro_right, Prettifier$.MODULE$.default());
                Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 480));
                long $org_scalatest_assert_macro_left2 = consumer.position(k);
                long $org_scalatest_assert_macro_right2 = v.offset();
                Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_left2), ">=", (Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_right2), $org_scalatest_assert_macro_left2 >= $org_scalatest_assert_macro_right2, Prettifier$.MODULE$.default());
                Assertion assertion = Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 481));
                return assertion;
            });
        }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 422));
        this.test("Direct Kafka stream report input information", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            String topic = "report-test";
            scala.collection.immutable.Map data = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"a"), (Object)BoxesRunTime.boxToInteger((int)7)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"b"), (Object)BoxesRunTime.boxToInteger((int)9))}));
            this.kafkaTestUtils().createTopic(topic);
            this.kafkaTestUtils().sendMessages(topic, (scala.collection.immutable.Map<String, Object>)data);
            int totalSent = BoxesRunTime.unboxToInt((Object)data.values().sum((Numeric)Numeric.IntIsIntegral$.MODULE$));
            HashMap<String, Object> kafkaParams = this.getKafkaParams((Seq<Tuple2<String, Object>>)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"auto.offset.reset"), (Object)"earliest")}));
            this.ssc_$eq(new StreamingContext(this.sparkConf(), Milliseconds$.MODULE$.apply(200L)));
            InputInfoCollector collector = new InputInfoCollector();
            this.ssc().addStreamingListener((StreamingListener)collector);
            InputDStream stream = (InputDStream)this.withClue("Error creating direct stream", (Function0 & Serializable & scala.Serializable)() -> KafkaUtils$.MODULE$.createDirectStream(this.ssc(), this.preferredHosts(), ConsumerStrategies$.MODULE$.Subscribe((Iterable<String>)new .colon.colon((Object)topic, (List)Nil$.MODULE$), (Map<String, Object>)((Map)JavaConverters$.MODULE$.mapAsScalaMapConverter((java.util.Map)kafkaParams).asScala()))));
            ConcurrentLinkedQueue allReceived = new ConcurrentLinkedQueue();
            stream.map((Function1 & Serializable & scala.Serializable)r -> new Tuple2(r.key(), r.value()), ClassTag$.MODULE$.apply(Tuple2.class)).foreachRDD((Function1 & Serializable & scala.Serializable)rdd -> {
                allReceived.addAll(Arrays.asList((Object[])rdd.collect()));
                return BoxedUnit.UNIT;
            });
            this.ssc().start();
            this.eventually(this.timeout(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(20000)).milliseconds())), this.interval(Span$.MODULE$.convertDurationToSpan((Duration)new package.DurationInt(package$.MODULE$.DurationInt(200)).milliseconds())), (Function0 & Serializable & scala.Serializable)() -> {
                TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.convertToEqualizer(BoxesRunTime.boxToInteger((int)allReceived.size()));
                int $org_scalatest_assert_macro_right = totalSent;
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)new StringBuilder(50).append("didn't get expected number of messages, messages:\n").append(((TraversableOnce)JavaConverters$.MODULE$.collectionAsScalaIterableConverter((Collection)allReceived).asScala()).mkString("\n")).toString(), Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 513));
                TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left2 = this.convertToEqualizer(BoxesRunTime.boxToLong((long)collector.numRecordsSubmitted().get()));
                int $org_scalatest_assert_macro_right2 = totalSent;
                Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left2, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right2), $org_scalatest_assert_macro_left2.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right2), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 518));
                TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left3 = this.convertToEqualizer(BoxesRunTime.boxToLong((long)collector.numRecordsStarted().get()));
                int $org_scalatest_assert_macro_right3 = totalSent;
                Bool $org_scalatest_assert_macro_expr3 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left3, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right3), $org_scalatest_assert_macro_left3.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right3), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr3, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 519));
                TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left4 = this.convertToEqualizer(BoxesRunTime.boxToLong((long)collector.numRecordsCompleted().get()));
                int $org_scalatest_assert_macro_right4 = totalSent;
                Bool $org_scalatest_assert_macro_expr4 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left4, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right4), $org_scalatest_assert_macro_left4.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right4), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
                return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr4, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 520));
            }, Retrying$.MODULE$.retryingNatureOfT(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 512));
            StreamingContext qual$8 = this.ssc();
            boolean x$8 = qual$8.stop$default$1();
            qual$8.stop(x$8);
        }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 486));
        this.test("maxMessagesPerPartition with backpressure disabled", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0 & Serializable & scala.Serializable)() -> {
            String topic = "maxMessagesPerPartition";
            DirectKafkaInputDStream<String, String> kafkaStream = this.getDirectKafkaStream(topic, (Option<RateController>)None$.MODULE$, (Option<PerPartitionConfig>)None$.MODULE$);
            scala.collection.immutable.Map input = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 0)), (Object)BoxesRunTime.boxToLong((long)50L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 1)), (Object)BoxesRunTime.boxToLong((long)50L))}));
            scala.collection.immutable.Map $org_scalatest_assert_macro_left = (scala.collection.immutable.Map)kafkaStream.maxMessagesPerPartition((scala.collection.immutable.Map<TopicPartition, Object>)input).get();
            scala.collection.immutable.Map $org_scalatest_assert_macro_right = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 0)), (Object)BoxesRunTime.boxToLong((long)10L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 1)), (Object)BoxesRunTime.boxToLong((long)10L))}));
            scala.collection.immutable.Map map = $org_scalatest_assert_macro_left;
            scala.collection.immutable.Map map2 = $org_scalatest_assert_macro_right;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "==", (Object)$org_scalatest_assert_macro_right, !(map != null ? !map.equals(map2) : map2 != null), Prettifier$.MODULE$.default());
            return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 530));
        }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 525));
        this.test("maxMessagesPerPartition with no lag", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0 & Serializable & scala.Serializable)() -> {
            String topic = "maxMessagesPerPartition";
            Some rateController = new Some((Object)new ConstantRateController(0, new ConstantEstimator(100L), 100L));
            DirectKafkaInputDStream<String, String> kafkaStream = this.getDirectKafkaStream(topic, (Option<RateController>)rateController, (Option<PerPartitionConfig>)None$.MODULE$);
            scala.collection.immutable.Map input = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 0)), (Object)BoxesRunTime.boxToLong((long)0L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 1)), (Object)BoxesRunTime.boxToLong((long)0L))}));
            Option<scala.collection.immutable.Map<TopicPartition, Object>> $org_scalatest_assert_macro_left = kafkaStream.maxMessagesPerPartition((scala.collection.immutable.Map<TopicPartition, Object>)input);
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.unaryMacroBool($org_scalatest_assert_macro_left, "isEmpty", $org_scalatest_assert_macro_left.isEmpty(), Prettifier$.MODULE$.default());
            return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 540));
        }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 534));
        this.test("maxMessagesPerPartition respects max rate", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0 & Serializable & scala.Serializable)() -> {
            String topic = "maxMessagesPerPartition";
            Some rateController = new Some((Object)new ConstantRateController(0, new ConstantEstimator(100L), 1000L));
            Some ppc = new Some((Object)new PerPartitionConfig(null, topic){
                private final String topic$5;

                public long maxRatePerPartition(TopicPartition tp) {
                    String string = tp.topic();
                    String string2 = this.topic$5;
                    return !(string != null ? !string.equals(string2) : string2 != null) && tp.partition() == 0 ? 50L : 100L;
                }
                {
                    this.topic$5 = topic$5;
                }
            });
            DirectKafkaInputDStream<String, String> kafkaStream = this.getDirectKafkaStream(topic, (Option<RateController>)rateController, (Option<PerPartitionConfig>)ppc);
            scala.collection.immutable.Map input = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 0)), (Object)BoxesRunTime.boxToLong((long)1000L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 1)), (Object)BoxesRunTime.boxToLong((long)1000L))}));
            scala.collection.immutable.Map $org_scalatest_assert_macro_left = (scala.collection.immutable.Map)kafkaStream.maxMessagesPerPartition((scala.collection.immutable.Map<TopicPartition, Object>)input).get();
            scala.collection.immutable.Map $org_scalatest_assert_macro_right = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 0)), (Object)BoxesRunTime.boxToLong((long)5L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 1)), (Object)BoxesRunTime.boxToLong((long)10L))}));
            scala.collection.immutable.Map map = $org_scalatest_assert_macro_left;
            scala.collection.immutable.Map map2 = $org_scalatest_assert_macro_right;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "==", (Object)$org_scalatest_assert_macro_right, !(map != null ? !map.equals(map2) : map2 != null), Prettifier$.MODULE$.default());
            return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 557));
        }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 543));
        this.test("using rate controller", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            String topic = "backpressure";
            this.kafkaTestUtils().createTopic(topic, 1);
            HashMap<String, Object> kafkaParams = this.getKafkaParams((Seq<Tuple2<String, Object>>)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"auto.offset.reset"), (Object)"earliest")}));
            HashMap<String, Object> executorKafkaParams = new HashMap<String, Object>(kafkaParams);
            KafkaUtils$.MODULE$.fixKafkaParams(executorKafkaParams);
            int batchIntervalMilliseconds = 500;
            ConstantEstimator estimator = new ConstantEstimator(100L);
            scala.collection.immutable.Map messages = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"foo"), (Object)BoxesRunTime.boxToInteger((int)5000))}));
            this.kafkaTestUtils().sendMessages(topic, (scala.collection.immutable.Map<String, Object>)messages);
            SparkConf sparkConf = new SparkConf().setMaster("local[1]").setAppName(this.getClass().getSimpleName()).set("spark.streaming.kafka.maxRatePerPartition", "100");
            this.ssc_$eq(new StreamingContext(sparkConf, Milliseconds$.MODULE$.apply((long)batchIntervalMilliseconds)));
            DStream kafkaStream = (DStream)this.withClue("Error creating direct stream", (Function0 & Serializable & scala.Serializable)() -> new DirectKafkaInputDStream<String, String>(this, topic, kafkaParams, sparkConf, estimator){
                private final Some<DirectKafkaInputDStream.DirectKafkaRateController> rateController;

                public Some<DirectKafkaInputDStream.DirectKafkaRateController> rateController() {
                    return this.rateController;
                }
                {
                    this.rateController = new Some((Object)((Object)new DirectKafkaInputDStream.DirectKafkaRateController(this.id(), estimator$1)));
                }
            }.map((Function1 & Serializable & scala.Serializable)r -> new Tuple2(r.key(), r.value()), ClassTag$.MODULE$.apply(Tuple2.class)));
            ConcurrentLinkedQueue collectedData = new ConcurrentLinkedQueue();
            kafkaStream.foreachRDD((Function2 & Serializable & scala.Serializable)(rdd, time) -> {
                DirectKafkaStreamSuite.$anonfun$new$83(collectedData, rdd, time);
                return BoxedUnit.UNIT;
            });
            this.ssc().start();
            ((IterableLike)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{100, 50, 20}))).foreach((Function1 & Serializable & scala.Serializable)rate -> DirectKafkaStreamSuite.$anonfun$new$85(this, collectedData, estimator, batchIntervalMilliseconds, BoxesRunTime.unboxToInt((Object)rate)));
            StreamingContext qual$9 = this.ssc();
            boolean x$9 = qual$9.stop$default$1();
            qual$9.stop(x$9);
        }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 561));
        this.test("backpressure.initialRate should honor maxRatePerPartition", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> this.backpressureTest(1000, 500, 250), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 627));
        this.test("use backpressure.initialRate with backpressure", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> this.backpressureTest(300, 1000, 150), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 631));
        this.test("maxMessagesPerPartition with zero offset and rate equal to the specified minimum with default 1", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0 & Serializable & scala.Serializable)() -> {
            String topic = "backpressure";
            HashMap<String, Object> kafkaParams = this.getKafkaParams((Seq<Tuple2<String, Object>>)Nil$.MODULE$);
            int batchIntervalMilliseconds = 60000;
            SparkConf sparkConf = new SparkConf().setMaster("local[1]").setAppName(this.getClass().getSimpleName()).set("spark.streaming.kafka.maxRatePerPartition", "100").set("spark.streaming.kafka.minRatePerPartition", "5");
            this.ssc_$eq(new StreamingContext(sparkConf, Milliseconds$.MODULE$.apply((long)batchIntervalMilliseconds)));
            long estimateRate = 1L;
            scala.collection.immutable.Map fromOffsets = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 0)), (Object)BoxesRunTime.boxToLong((long)0L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 1)), (Object)BoxesRunTime.boxToLong((long)0L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 2)), (Object)BoxesRunTime.boxToLong((long)0L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 3)), (Object)BoxesRunTime.boxToLong((long)0L))}));
            DirectKafkaInputDStream kafkaStream = (DirectKafkaInputDStream)this.withClue("Error creating direct stream", (Function0 & Serializable & scala.Serializable)() -> new DirectKafkaInputDStream<String, String>(this, topic, kafkaParams, sparkConf, fromOffsets, estimateRate){
                private final Some<ConstantRateController> rateController;

                public Some<ConstantRateController> rateController() {
                    return this.rateController;
                }
                {
                    this.currentOffsets_$eq((scala.collection.immutable.Map<TopicPartition, Object>)fromOffsets$1);
                    this.rateController = new Some((Object)((Object)new ConstantRateController(this.id(), null, estimateRate$1)));
                }
            });
            scala.collection.immutable.Map offsets = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 0)), (Object)BoxesRunTime.boxToLong((long)0L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 1)), (Object)BoxesRunTime.boxToLong((long)100L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 2)), (Object)BoxesRunTime.boxToLong((long)200L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 3)), (Object)BoxesRunTime.boxToLong((long)300L))}));
            Option<scala.collection.immutable.Map<TopicPartition, Object>> result = kafkaStream.maxMessagesPerPartition((scala.collection.immutable.Map<TopicPartition, Object>)offsets);
            scala.collection.immutable.Map expected = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 0)), (Object)BoxesRunTime.boxToLong((long)5L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 1)), (Object)BoxesRunTime.boxToLong((long)10L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 2)), (Object)BoxesRunTime.boxToLong((long)20L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 3)), (Object)BoxesRunTime.boxToLong((long)30L))}));
            Option<scala.collection.immutable.Map<TopicPartition, Object>> $org_scalatest_assert_macro_left = result;
            scala.collection.immutable.Map $org_scalatest_assert_macro_right = expected;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool($org_scalatest_assert_macro_left, "contains", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left.contains((Object)$org_scalatest_assert_macro_right), Prettifier$.MODULE$.default());
            return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)new StringBuilder(80).append("Number of messages per partition must be at least equal").append(" to the specified minimum").toString(), Prettifier$.MODULE$.default(), new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 722));
        }, new Position("DirectKafkaStreamSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 675));
    }

    public static class InputInfoCollector
    implements StreamingListener {
        private final AtomicLong numRecordsSubmitted;
        private final AtomicLong numRecordsStarted;
        private final AtomicLong numRecordsCompleted;

        public void onStreamingStarted(StreamingListenerStreamingStarted streamingStarted) {
            StreamingListener.onStreamingStarted$((StreamingListener)this, (StreamingListenerStreamingStarted)streamingStarted);
        }

        public void onReceiverStarted(StreamingListenerReceiverStarted receiverStarted) {
            StreamingListener.onReceiverStarted$((StreamingListener)this, (StreamingListenerReceiverStarted)receiverStarted);
        }

        public void onReceiverError(StreamingListenerReceiverError receiverError) {
            StreamingListener.onReceiverError$((StreamingListener)this, (StreamingListenerReceiverError)receiverError);
        }

        public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {
            StreamingListener.onReceiverStopped$((StreamingListener)this, (StreamingListenerReceiverStopped)receiverStopped);
        }

        public void onOutputOperationStarted(StreamingListenerOutputOperationStarted outputOperationStarted) {
            StreamingListener.onOutputOperationStarted$((StreamingListener)this, (StreamingListenerOutputOperationStarted)outputOperationStarted);
        }

        public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted outputOperationCompleted) {
            StreamingListener.onOutputOperationCompleted$((StreamingListener)this, (StreamingListenerOutputOperationCompleted)outputOperationCompleted);
        }

        public AtomicLong numRecordsSubmitted() {
            return this.numRecordsSubmitted;
        }

        public AtomicLong numRecordsStarted() {
            return this.numRecordsStarted;
        }

        public AtomicLong numRecordsCompleted() {
            return this.numRecordsCompleted;
        }

        public void onBatchSubmitted(StreamingListenerBatchSubmitted batchSubmitted) {
            this.numRecordsSubmitted().addAndGet(batchSubmitted.batchInfo().numRecords());
        }

        public void onBatchStarted(StreamingListenerBatchStarted batchStarted) {
            this.numRecordsStarted().addAndGet(batchStarted.batchInfo().numRecords());
        }

        public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {
            this.numRecordsCompleted().addAndGet(batchCompleted.batchInfo().numRecords());
        }

        public InputInfoCollector() {
            StreamingListener.$init$((StreamingListener)this);
            this.numRecordsSubmitted = new AtomicLong(0L);
            this.numRecordsStarted = new AtomicLong(0L);
            this.numRecordsCompleted = new AtomicLong(0L);
        }
    }
}

