package org.apache.hudi.functional;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceReadOptions$;
import org.apache.hudi.DataSourceWriteOptions$;
import org.apache.hudi.HoodieDataSourceHelpers;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import scala.Array$;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.JavaConversions$;
import scala.collection.Seq$;
import scala.collection.immutable.Map;
import scala.collection.mutable.ArrayOps;
import scala.concurrent.Await$;
import scala.concurrent.ExecutionContext$Implicits$;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.duration.Duration$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: TestStructuredStreaming.scala */
@ScalaSignature(bytes = "\u0006\u0001\t\u001dc\u0001B\r\u001b\u0001\rBQA\u000b\u0001\u0005\u0002-BqA\f\u0001C\u0002\u0013%q\u0006\u0003\u00047\u0001\u0001\u0006I\u0001\r\u0005\bo\u0001\u0001\r\u0011\"\u00019\u0011\u001d\u0001\u0005\u00011A\u0005\u0002\u0005CaA\u0013\u0001!B\u0013I\u0004bB&\u0001\u0005\u0004%\t\u0001\u0014\u0005\u0007;\u0002\u0001\u000b\u0011B'\t\u000by\u0003A\u0011I0\t\u000b-\u0004A\u0011I0\t\u000bA\u0004A\u0011A9\t\u000f\u0005\u0015\u0002\u0001\"\u0001\u0002(!1\u0011q\u0007\u0001\u0005\u0002}Cq!!\u0011\u0001\t\u0013\t\u0019\u0005C\u0004\u00028\u0002!\t!!/\t\r\u0005=\u0007\u0001\"\u0001`\u0011\u0019\t\u0019\u000e\u0001C\u0001?\"1\u0011q\u001b\u0001\u0005\u0002}Cq!a7\u0001\t\u0003\ti\u000eC\u0004\u0002~\u0002!I!a@\t\u000f\t-\u0001\u0001\"\u0003\u0003\u000e!a!\u0011\u0005\u0001\u0011\u0002\u0003\u0005\t\u0011\"\u0001\u0003$!a!q\u0005\u0001\u0011\u0002\u0003\u0005\t\u0011\"\u0001\u0003*!a!Q\u0006\u0001\u0011\u0002\u0003\u0005\t\u0011\"\u0001\u00030\t9B+Z:u'R\u0014Xo\u0019;ve\u0016$7\u000b\u001e:fC6Lgn\u001a\u0006\u00037q\t!BZ;oGRLwN\\1m\u0015\tib$\u0001\u0003ik\u0012L'BA\u0010!\u0003\u0019\t\u0007/Y2iK*\t\u0011%A\u0002pe\u001e\u001c\u0001a\u0005\u0002\u0001IA\u0011Q\u0005K\u0007\u0002M)\u0011q\u0005H\u0001\ni\u0016\u001cH/\u001e;jYNL!!\u000b\u0014\u0003)!{w\u000eZ5f\u00072LWM\u001c;UKN$()Y:f\u0003\u0019a\u0014N\\5u}Q\tA\u0006\u0005\u0002.\u00015\t!$A\u0002m_\u001e,\u0012\u0001\r\t\u0003cQj\u0011A\r\u0006\u0003gy\tQ\u0001\\8hi)L!!\u000e\u001a\u0003\r1{wmZ3s\u0003\u0011awn\u001a\u0011\u0002\u000bM\u0004\u0018M]6\u0016\u0003e\u0002\"A\u000f \u000e\u0003mR!\u0001P\u001f\u0002\u0007M\fHN\u0003\u00028=%\u0011qh\u000f\u0002\r'B\f'o[*fgNLwN\\\u0001\ngB\f'o[0%KF$\"A\u0011%\u0011\u0005\r3U\"\u0001#\u000b\u0003\u0015\u000bQa]2bY\u0006L!a\u0012#\u0003\tUs\u0017\u000e\u001e\u0005\b\u0013\u0016\t\t\u00111\u0001:\u0003\rAH%M\u0001\u0007gB\f'o\u001b\u0011\u0002\u0015\r|W.\\8o\u001fB$8/F\u0001N!\u0011q5+V+\u000e\u0003=S!\u0001U)\u0002\u0013%lW.\u001e;bE2,'B\u0001*E\u0003)\u0019w\u000e\u001c7fGRLwN\\\u0005\u0003)>\u00131!T1q!\t16,D\u0001X\u0015\tA\u0016,\u0001\u0003mC:<'\"\u0001.\u0002\t)\fg/Y\u0005\u00039^\u0013aa\u0015;sS:<\u0017aC2p[6|gn\u00149ug\u0002\nQa]3u+B$\u0012A\u0011\u0015\u0003\u0013\u0005\u0004\"AY5\u000e\u0003\rT!\u0001Z3\u0002\u0007\u0005\u0004\u0018N\u0003\u0002gO\u00069!.\u001e9ji\u0016\u0014(B\u00015!\u0003\u0015QWO\\5u\u0013\tQ7M\u0001\u0006CK\u001a|'/Z#bG\"\f\u0001\u0002^3be\u0012{wO\u001c\u0015\u0003\u00155\u0004\"A\u00198\n\u0005=\u001c'!C!gi\u0016\u0014X)Y2i\u0003aIg.\u001b;TiJ,\u0017-\\5oO^\u0013\u0018\u000e^3GkR,(/\u001a\u000b\teb\f\t!!\u0007\u0002\u001eA\u00191O\u001e\"\u000e\u0003QT!!\u001e#\u0002\u0015\r|gnY;se\u0016tG/\u0003\u0002xi\n1a)\u001e;ve\u0016DQ!_\u0006A\u0002i\faa]2iK6\f\u0007CA>\u007f\u001b\u0005a(BA?<\u0003\u0015!\u0018\u0010]3t\u0013\tyHP\u0001\u0006TiJ,8\r\u001e+za\u0016Dq!a\u0001\f\u0001\u0004\t)!\u0001\u0006t_V\u00148-\u001a)bi\"\u0004B!a\u0002\u0002\u00169!\u0011\u0011BA\t!\r\tY\u0001R\u0007\u0003\u0003\u001bQ1!a\u0004#\u0003\u0019a$o\\8u}%\u0019\u00111\u0003#\u0002\rA\u0013X\rZ3g\u0013\ra\u0016q\u0003\u0006\u0004\u0003'!\u0005bBA\u000e\u0017\u0001\u0007\u0011QA\u0001\tI\u0016\u001cH\u000fU1uQ\"9\u0011qD\u0006A\u0002\u0005\u0005\u0012a\u00035vI&|\u0005\u000f^5p]N\u0004\u0002\"a\u0002\u0002$\u0005\u0015\u0011QA\u0005\u0004)\u0006]\u0011AH5oSR\u001cFO]3b[&twmU8ve\u000e,\u0017I\u001c3EKN$\b+\u0019;i)\u0019\tI#a\f\u00024A91)a\u000b\u0002\u0006\u0005\u0015\u0011bAA\u0017\t\n1A+\u001e9mKJBq!!\r\r\u0001\u0004\t)!A\u0007t_V\u00148-\u001a#je:\u000bW.\u001a\u0005\b\u0003ka\u0001\u0019AA\u0003\u0003-!Wm\u001d;ESJt\u0015-\\3\u0002/Q,7\u000f^*ueV\u001cG/\u001e:fIN#(/Z1nS:<\u0007fA\u0007\u0002<A\u0019!-!\u0010\n\u0007\u0005}2M\u0001\u0003UKN$\u0018aF<bSR$\u0016\u000e\u001c7Bi2,\u0017m\u001d;O\u0007>lW.\u001b;t)1\t)%a\u0013\u0002^\u0005\u0005\u0014QMA5!\r\u0019\u0015qI\u0005\u0004\u0003\u0013\"%aA%oi\"9\u0011Q\n\bA\u0002\u0005=\u0013A\u00014t!\u0011\t\t&!\u0017\u000e\u0005\u0005M#\u0002BA'\u0003+R1!a\u0016\u001f\u0003\u0019A\u0017\rZ8pa&!\u00111LA*\u0005)1\u0015\u000e\\3TsN$X-\u001c\u0005\b\u0003?r\u0001\u0019AA\u0003\u0003%!\u0018M\u00197f!\u0006$\b\u000eC\u0004\u0002d9\u0001\r!!\u0012\u0002\u00159,XnQ8n[&$8\u000fC\u0004\u0002h9\u0001\r!!\u0012\u0002\u0017QLW.Z8viN+7m\u001d\u0005\b\u0003Wr\u0001\u0019AA#\u0003U\u0019H.Z3q'\u0016\u001c7/\u00114uKJ,\u0015m\u00195Sk:DSADA8\u0003\u000f\u0003RaQA9\u0003kJ1!a\u001dE\u0005\u0019!\bN]8xgB!\u0011qOAA\u001d\u0011\tI(! \u000f\t\u0005-\u00111P\u0005\u0002\u000b&\u0019\u0011q\u0010#\u0002\u000fA\f7m[1hK&!\u00111QAC\u0005QIe\u000e^3seV\u0004H/\u001a3Fq\u000e,\u0007\u000f^5p]*\u0019\u0011q\u0010#2\u000fy\t)!!#\u00026FJ1%a#\u0002\u0014\u0006-\u0016QS\u000b\u0005\u0003\u001b\u000by)\u0006\u0002\u0002\u0006\u00119\u0011\u0011\u0013\u0012C\u0002\u0005m%!\u0001+\n\t\u0005U\u0015qS\u0001\u001cI1,7o]5oSR$sM]3bi\u0016\u0014H\u0005Z3gCVdG\u000fJ\u0019\u000b\u0007\u0005eE)\u0001\u0004uQJ|wo]\t\u0005\u0003;\u000b\u0019\u000bE\u0002D\u0003?K1!!)E\u0005\u001dqu\u000e\u001e5j]\u001e\u0004B!!*\u0002(:\u00191)! \n\t\u0005%\u0016Q\u0011\u0002\n)\"\u0014xn^1cY\u0016\f\u0014bIAW\u0003_\u000b\t,!'\u000f\u0007\r\u000by+C\u0002\u0002\u001a\u0012\u000bTAI\"E\u0003g\u0013Qa]2bY\u0006\f4AJA;\u0003E9W\r^\"mkN$XM]5oO>\u0003Ho\u001d\u000b\r\u0003C\tY,a0\u0002D\u0006\u001d\u00171\u001a\u0005\b\u0003{{\u0001\u0019AA\u0003\u0003II7/\u00138mS:,7\t\\;ti\u0016\u0014\u0018N\\4\t\u000f\u0005\u0005w\u00021\u0001\u0002\u0006\u0005\t\u0012n]!ts:\u001c7\t\\;ti\u0016\u0014\u0018N\\4\t\u000f\u0005\u0015w\u00021\u0001\u0002\u0006\u0005\t\u0012n]!ts:\u001c7i\\7qC\u000e$\u0018n\u001c8\t\u000f\u0005%w\u00021\u0001\u0002\u0006\u0005\u00192\r\\;ti\u0016\u0014\u0018N\\4Ok6\u001cu.\\7ji\"9\u0011QZ\bA\u0002\u0005\u0015\u0013\u0001\u00054jY\u0016l\u0015\r\u001f*fG>\u0014HMT;n\u0003-\"Xm\u001d;TiJ,8\r^;sK\u0012\u001cFO]3b[&twmV5uQ&sG.\u001b8f\u00072,8\u000f^3sS:<\u0007f\u0001\t\u0002<\u0005QC/Z:u'R\u0014Xo\u0019;ve\u0016$7\u000b\u001e:fC6LgnZ,ji\"\f5/\u001f8d\u00072,8\u000f^3sS:<\u0007fA\t\u0002<\u00059D/Z:u'R\u0014Xo\u0019;ve\u0016$7\u000b\u001e:fC6LgnZ,ji\"\f5/\u001f8d\u00072,8\u000f^3sS:<\u0017I\u001c3D_6\u0004\u0018m\u0019;j_:D3AEA\u001e\u0003)\u001aHO];diV\u0014X\rZ*ue\u0016\fW.\u001b8h\r>\u0014H+Z:u\u00072,8\u000f^3sS:<'+\u001e8oKJ$rBQAp\u0003C\f\u0019/a;\u0002n\u0006=\u00181\u001f\u0005\b\u0003\u0007\u0019\u0002\u0019AA\u0003\u0011\u001d\tYb\u0005a\u0001\u0003\u000bAq!!0\u0014\u0001\u0004\t)\u000fE\u0002D\u0003OL1!!;E\u0005\u001d\u0011un\u001c7fC:Dq!!1\u0014\u0001\u0004\t)\u000fC\u0004\u0002FN\u0001\r!!:\t\u000f\u0005E8\u00031\u0001\u0002\u0006\u0005\u0011\u0002/\u0019:uSRLwN\\(g%\u0016\u001cwN\u001d3t\u0011\u001d\t)p\u0005a\u0001\u0003o\fQc\u00195fG.\u001cE.^:uKJLgn\u001a*fgVdG\u000f\u0005\u0004D\u0003s\f)AQ\u0005\u0004\u0003w$%!\u0003$v]\u000e$\u0018n\u001c82\u0003e9W\r\u001e'bi\u0016\u001cHOR5mK\u001e\u0013x.\u001e9t\r&dW-\u00133\u0015\t\t\u0005!q\u0001\t\u0006\u0007\n\r\u0011QA\u0005\u0004\u0005\u000b!%!B!se\u0006L\bb\u0002B\u0005)\u0001\u0007\u0011QA\u0001\na\u0006\u0014H/\u001b;j_:\f!e^1jiRKG\u000e\u001c%bg\u000e{W\u000e\u001d7fi\u0016$'+\u001a9mC\u000e,\u0017J\\:uC:$Hc\u0002\"\u0003\u0010\tE!1\u0003\u0005\b\u0003?*\u0002\u0019AA\u0003\u0011\u001d\t9'\u0006a\u0001\u0003\u000bBq!a\u001b\u0016\u0001\u0004\t)\u0005K\u0003\u0016\u0003_\u00129\"M\u0004\u001f\u0003\u000b\u0011IBa\b2\u0013\r\nY)a%\u0003\u001c\u0005U\u0015'C\u0012\u0002.\u0006=&QDAMc\u0015\u00113\tRAZc\r1\u0013QO\u0001\u0013aJ|G/Z2uK\u0012$#-Y:f!\u0006$\b\u000eF\u0002V\u0005KAq!\u0013\f\u0002\u0002\u0003\u0007A&\u0001\u0007qe>$Xm\u0019;fI\u001227\u000f\u0006\u0003\u0002P\t-\u0002bB%\u0018\u0003\u0003\u0005\r\u0001L\u0001\u0018aJ|G/Z2uK\u0012$3/\u001a;nKR\f7\t\\5f]R$RA\u0011B\u0019\u0005gAq!\u0013\r\u0002\u0002\u0003\u0007A\u0006C\u0005\u00036a\t\t\u00111\u0001\u00038\u0005\u0019\u0001\u0010\n\u001a\u0011\t\te\"1I\u0007\u0003\u0005wQAA!\u0010\u0003@\u0005)A/\u00192mK*\u0019!\u0011\t\u000f\u0002\r\r|W.\\8o\u0013\u0011\u0011)Ea\u000f\u0003+!{w\u000eZ5f)\u0006\u0014G.Z'fi\u0006\u001cE.[3oi\u0002")
/* loaded from: input_file:org/apache/hudi/functional/TestStructuredStreaming.class */
public class TestStructuredStreaming extends HoodieClientTestBase {
    private final Logger log = LogManager.getLogger(getClass());
    private SparkSession spark = null;
    private final Map<String, String> commonOpts = Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.insert.shuffle.parallelism"), "4"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("hoodie.upsert.shuffle.parallelism"), "4"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceWriteOptions$.MODULE$.RECORDKEY_FIELD().key()), "_row_key"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceWriteOptions$.MODULE$.PARTITIONPATH_FIELD().key()), "partition"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceWriteOptions$.MODULE$.PRECOMBINE_FIELD().key()), "timestamp"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(HoodieWriteConfig.TBL_NAME.key()), "hoodie_test")}));

    public /* synthetic */ String protected$basePath(TestStructuredStreaming testStructuredStreaming) {
        return testStructuredStreaming.basePath;
    }

    public /* synthetic */ FileSystem protected$fs(TestStructuredStreaming testStructuredStreaming) {
        return testStructuredStreaming.fs;
    }

    public /* synthetic */ void protected$setmetaClient(TestStructuredStreaming testStructuredStreaming, HoodieTableMetaClient hoodieTableMetaClient) {
        testStructuredStreaming.metaClient = hoodieTableMetaClient;
    }

    private Logger log() {
        return this.log;
    }

    public SparkSession spark() {
        return this.spark;
    }

    public void spark_$eq(SparkSession sparkSession) {
        this.spark = sparkSession;
    }

    public Map<String, String> commonOpts() {
        return this.commonOpts;
    }

    @BeforeEach
    public void setUp() {
        initPath();
        initSparkContexts();
        spark_$eq(this.sqlContext.sparkSession());
        initTestDataGenerator();
        initFileSystem();
        initTimelineService();
    }

    @AfterEach
    public void tearDown() {
        cleanupTimelineService();
        cleanupSparkContexts();
        cleanupTestDataGenerator();
        cleanupFileSystem();
    }

    public Future<BoxedUnit> initStreamingWriteFuture(StructType structType, String str, String str2, Map<String, String> map) {
        Dataset json = spark().readStream().schema(structType).json(str);
        return Future$.MODULE$.apply(() -> {
            Predef$.MODULE$.println("streaming starting");
            json.writeStream().format("org.apache.hudi").options(map).trigger(Trigger.ProcessingTime(100L)).option("checkpointLocation", new StringBuilder(11).append(this.protected$basePath(this)).append("/checkpoint").toString()).outputMode(OutputMode.Append()).start(str2).awaitTermination(10000L);
            Predef$.MODULE$.println("streaming ends");
        }, ExecutionContext$Implicits$.MODULE$.global());
    }

    public Tuple2<String, String> initStreamingSourceAndDestPath(String str, String str2) {
        this.fs.delete(new Path(this.basePath), true);
        String sb = new StringBuilder(1).append(this.basePath).append("/").append(str).toString();
        String sb2 = new StringBuilder(1).append(this.basePath).append("/").append(str2).toString();
        this.fs.mkdirs(new Path(sb));
        return new Tuple2<>(sb, sb2);
    }

    @Test
    public void testStructuredStreaming() {
        Tuple2<String, String> initStreamingSourceAndDestPath = initStreamingSourceAndDestPath("source", "dest");
        if (initStreamingSourceAndDestPath == null) {
            throw new MatchError(initStreamingSourceAndDestPath);
        }
        Tuple2 tuple2 = new Tuple2((String) initStreamingSourceAndDestPath._1(), (String) initStreamingSourceAndDestPath._2());
        String str = (String) tuple2._1();
        String str2 = (String) tuple2._2();
        Dataset json = spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(RawTripTestPayload.recordsToStrings(this.dataGen.generateInserts("000", Predef$.MODULE$.int2Integer(100)))).toList(), 2, ClassTag$.MODULE$.apply(String.class)));
        Dataset json2 = spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(RawTripTestPayload.recordsToStrings(this.dataGen.generateUpdates("001", Predef$.MODULE$.int2Integer(100)))).toList(), 2, ClassTag$.MODULE$.apply(String.class)));
        long count = json2.select("_row_key", Predef$.MODULE$.wrapRefArray(new String[0])).distinct().count();
        Await$.MODULE$.result(Future$.MODULE$.sequence(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Future[]{initStreamingWriteFuture(json.schema(), str, str2, commonOpts()), Future$.MODULE$.apply(() -> {
            json.coalesce(1).write().mode(SaveMode.Append).json(str);
            int waitTillAtleastNCommits = this.waitTillAtleastNCommits(this.protected$fs(this), str2, 1, 120, 5);
            Assertions.assertTrue(HoodieDataSourceHelpers.hasNewCommits(this.protected$fs(this), str2, "000"));
            String latestCommit = HoodieDataSourceHelpers.latestCommit(this.protected$fs(this), str2);
            Predef$.MODULE$.assert(this.spark().read().format("org.apache.hudi").load(new StringBuilder(8).append(str2).append("/*/*/*/*").toString()).count() == 100);
            json2.coalesce(1).write().mode(SaveMode.Append).json(str);
            this.waitTillAtleastNCommits(this.protected$fs(this), str2, waitTillAtleastNCommits + 1, 120, 5);
            String latestCommit2 = HoodieDataSourceHelpers.latestCommit(this.protected$fs(this), str2);
            Assertions.assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(this.protected$fs(this), str2, "000").size());
            Assertions.assertEquals(100L, this.spark().read().format("org.apache.hudi").load(new StringBuilder(8).append(str2).append("/*/*/*/*").toString()).count());
            String str3 = (String) HoodieDataSourceHelpers.listCommitsSince(this.protected$fs(this), str2, "000").get(0);
            Dataset load = this.spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME().key(), "000").option(DataSourceReadOptions$.MODULE$.END_INSTANTTIME().key(), str3).load(str2);
            Assertions.assertEquals(100L, load.count());
            Row[] rowArr = (Row[]) load.groupBy("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).count().collect();
            Assertions.assertEquals(1, rowArr.length);
            Assertions.assertEquals(str3, rowArr[0].get(0));
            Dataset load2 = this.spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME().key(), latestCommit).load(str2);
            Assertions.assertEquals(count, load2.count());
            Row[] rowArr2 = (Row[]) load2.groupBy("_hoodie_commit_time", Predef$.MODULE$.wrapRefArray(new String[0])).count().collect();
            Assertions.assertEquals(1, rowArr2.length);
            Assertions.assertEquals(latestCommit2, rowArr2[0].get(0));
        }, ExecutionContext$Implicits$.MODULE$.global())})), Seq$.MODULE$.canBuildFrom(), ExecutionContext$Implicits$.MODULE$.global()), Duration$.MODULE$.Inf());
    }

    private int waitTillAtleastNCommits(FileSystem fileSystem, String str, int i, int i2, int i3) throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        long j = currentTimeMillis;
        int i4 = i2 * 1000;
        int i5 = 0;
        boolean z = false;
        while (!z && j - currentTimeMillis < i4) {
            try {
                try {
                    HoodieTimeline allCompletedCommitsCompactions = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fileSystem, str);
                    log().info(new StringBuilder(10).append("Timeline :").append(allCompletedCommitsCompactions.getInstants().toArray()).toString());
                    if (allCompletedCommitsCompactions.countInstants() >= i) {
                        i5 = allCompletedCommitsCompactions.countInstants();
                        z = true;
                    }
                    HoodieTableMetaClient.builder().setConf(fileSystem.getConf()).setBasePath(str).setLoadActiveTimelineOnLoad(true).build();
                } catch (TableNotFoundException e) {
                    log().info("Got table not found exception. Retrying");
                }
                if (!z) {
                    Thread.sleep(i3 * 1000);
                    j = System.currentTimeMillis();
                }
            } catch (Throwable th) {
                if (!z) {
                    Thread.sleep(i3 * 1000);
                    System.currentTimeMillis();
                }
                throw th;
            }
        }
        if (z) {
            return i5;
        }
        throw new IllegalStateException(new StringBuilder(44).append("Timed-out waiting for ").append(i).append(" commits to appear in ").append(str).toString());
    }

    public Map<String, String> getClusteringOpts(String str, String str2, String str3, String str4, int i) {
        return commonOpts().$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(HoodieClusteringConfig.INLINE_CLUSTERING.key()), str), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key()), str4), Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceWriteOptions$.MODULE$.ASYNC_CLUSTERING_ENABLE().key()), str2), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceWriteOptions$.MODULE$.ASYNC_COMPACT_ENABLE().key()), str3), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key()), str4), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key()), BoxesRunTime.boxToInteger(this.dataGen.getEstimatedFileSizeInBytes(i)).toString())}));
    }

    @Test
    public void testStructuredStreamingWithInlineClustering() {
        Tuple2<String, String> initStreamingSourceAndDestPath = initStreamingSourceAndDestPath("source", "dest");
        if (initStreamingSourceAndDestPath == null) {
            throw new MatchError(initStreamingSourceAndDestPath);
        }
        Tuple2 tuple2 = new Tuple2((String) initStreamingSourceAndDestPath._1(), (String) initStreamingSourceAndDestPath._2());
        structuredStreamingForTestClusteringRunner((String) tuple2._1(), (String) tuple2._2(), true, false, false, "2016/03/15", str -> {
            this.checkClusteringResult$1(str);
            return BoxedUnit.UNIT;
        });
    }

    @Test
    public void testStructuredStreamingWithAsyncClustering() {
        Tuple2<String, String> initStreamingSourceAndDestPath = initStreamingSourceAndDestPath("source", "dest");
        if (initStreamingSourceAndDestPath == null) {
            throw new MatchError(initStreamingSourceAndDestPath);
        }
        Tuple2 tuple2 = new Tuple2((String) initStreamingSourceAndDestPath._1(), (String) initStreamingSourceAndDestPath._2());
        structuredStreamingForTestClusteringRunner((String) tuple2._1(), (String) tuple2._2(), false, true, false, "2016/03/15", str -> {
            this.checkClusteringResult$2(str);
            return BoxedUnit.UNIT;
        });
    }

    @Test
    public void testStructuredStreamingWithAsyncClusteringAndCompaction() {
        Tuple2<String, String> initStreamingSourceAndDestPath = initStreamingSourceAndDestPath("source", "dest");
        if (initStreamingSourceAndDestPath == null) {
            throw new MatchError(initStreamingSourceAndDestPath);
        }
        Tuple2 tuple2 = new Tuple2((String) initStreamingSourceAndDestPath._1(), (String) initStreamingSourceAndDestPath._2());
        structuredStreamingForTestClusteringRunner((String) tuple2._1(), (String) tuple2._2(), false, true, true, "2016/03/15", str -> {
            this.checkClusteringResult$3(str);
            return BoxedUnit.UNIT;
        });
    }

    public void structuredStreamingForTestClusteringRunner(String str, String str2, boolean z, boolean z2, boolean z3, String str3, Function1<String, BoxedUnit> function1) {
        Dataset json = spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(RawTripTestPayload.recordsToStrings(this.dataGen.generateInsertsForPartition("000", Predef$.MODULE$.int2Integer(100), str3))).toList(), 2, ClassTag$.MODULE$.apply(String.class)));
        Dataset json2 = spark().read().json(spark().sparkContext().parallelize(JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(RawTripTestPayload.recordsToStrings(this.dataGen.generateInsertsForPartition("001", Predef$.MODULE$.int2Integer(100), str3))).toList(), 2, ClassTag$.MODULE$.apply(String.class)));
        Await$.MODULE$.result(Future$.MODULE$.sequence(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Future[]{initStreamingWriteFuture(json.schema(), str, str2, getClusteringOpts(BoxesRunTime.boxToBoolean(z).toString(), BoxesRunTime.boxToBoolean(z2).toString(), BoxesRunTime.boxToBoolean(z3).toString(), "2", 100)), Future$.MODULE$.apply(() -> {
            json.coalesce(1).write().mode(SaveMode.Append).json(str);
            int waitTillAtleastNCommits = this.waitTillAtleastNCommits(this.protected$fs(this), str2, 1, 120, 5);
            Assertions.assertTrue(HoodieDataSourceHelpers.hasNewCommits(this.protected$fs(this), str2, "000"));
            json2.coalesce(1).write().mode(SaveMode.Append).json(str);
            int waitTillAtleastNCommits2 = this.waitTillAtleastNCommits(this.protected$fs(this), str2, waitTillAtleastNCommits + 1, 120, 5);
            if (HoodieDataSourceHelpers.allCompletedCommitsCompactions(this.protected$fs(this), str2).getCompletedReplaceTimeline().countInstants() > 0) {
                Assertions.assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(this.protected$fs(this), str2, "000").size());
                this.protected$setmetaClient(this, HoodieTableMetaClient.builder().setConf(this.protected$fs(this).getConf()).setBasePath(str2).setLoadActiveTimelineOnLoad(true).build());
                Assertions.assertTrue(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(this.getLatestFileGroupsFileId(str3))).size() > 0);
            } else {
                Assertions.assertEquals(waitTillAtleastNCommits2, HoodieDataSourceHelpers.listCommitsSince(this.protected$fs(this), str2, "000").size());
                this.protected$setmetaClient(this, HoodieTableMetaClient.builder().setConf(this.protected$fs(this).getConf()).setBasePath(str2).setLoadActiveTimelineOnLoad(true).build());
                Assertions.assertTrue(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(this.getLatestFileGroupsFileId(str3))).size() > 1);
            }
            function1.apply(str2);
            Assertions.assertEquals(200L, this.spark().read().format("org.apache.hudi").load(new StringBuilder(8).append(str2).append("/*/*/*/*").toString()).count());
        }, ExecutionContext$Implicits$.MODULE$.global())})), Seq$.MODULE$.canBuildFrom(), ExecutionContext$Implicits$.MODULE$.global()), Duration$.MODULE$.Inf());
    }

    private String[] getLatestFileGroupsFileId(String str) {
        getHoodieTableFileSystemView(this.metaClient, this.metaClient.getActiveTimeline(), HoodieTestTable.of(this.metaClient).listAllBaseFiles());
        return (String[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(this.tableView.getLatestFileSlices(str).toArray())).map(obj -> {
            return ((FileSlice) obj).getFileGroupId().getFileId();
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class)));
    }

    private void waitTillHasCompletedReplaceInstant(String str, int i, int i2) throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        int i3 = i * 1000;
        boolean z = false;
        for (long j = currentTimeMillis; !z && j - currentTimeMillis < i3; j = System.currentTimeMillis()) {
            try {
                try {
                    this.metaClient.reloadActiveTimeline();
                    int size = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(this.metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray())).size();
                    Predef$.MODULE$.println(new StringBuilder(20).append("completeReplaceSize:").append(size).toString());
                    if (size > 0) {
                        z = true;
                    }
                } catch (TableNotFoundException e) {
                    log().info("Got table not found exception. Retrying");
                }
                Thread.sleep(i2 * 1000);
            } catch (Throwable th) {
                Thread.sleep(i2 * 1000);
                System.currentTimeMillis();
                throw th;
            }
        }
        if (!z) {
            throw new IllegalStateException(new StringBuilder(59).append("Timed-out waiting for completing replace instant appear in ").append(str).toString());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void checkClusteringResult$1(String str) {
        waitTillHasCompletedReplaceInstant(str, 120, 1);
        this.metaClient.reloadActiveTimeline();
        Assertions.assertEquals(1, new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(getLatestFileGroupsFileId("2016/03/15"))).size());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void checkClusteringResult$2(String str) {
        waitTillHasCompletedReplaceInstant(str, 120, 1);
        this.metaClient.reloadActiveTimeline();
        Assertions.assertEquals(1, new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(getLatestFileGroupsFileId("2016/03/15"))).size());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void checkClusteringResult$3(String str) {
        waitTillHasCompletedReplaceInstant(str, 120, 1);
        this.metaClient.reloadActiveTimeline();
        Assertions.assertEquals(1, new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(getLatestFileGroupsFileId("2016/03/15"))).size());
    }
}
