/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.functional;

import java.io.Serializable;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceReadOptions$;
import org.apache.hudi.DataSourceWriteOptions$;
import org.apache.hudi.HoodieDataSourceHelpers;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.JavaConversions$;
import scala.collection.Seq;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.mutable.ArrayOps;
import scala.concurrent.Await$;
import scala.concurrent.Awaitable;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.duration.Duration$;
import scala.math.Ordering;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0001\t\rf\u0001\u0002\u000e\u001c\u0001\u0011BQa\u000b\u0001\u0005\u00021Bqa\f\u0001C\u0002\u0013%\u0001\u0007\u0003\u00048\u0001\u0001\u0006I!\r\u0005\nq\u0001\u0001\r\u00111A\u0005\u0002eB\u0011\"\u0011\u0001A\u0002\u0003\u0007I\u0011\u0001\"\t\u0013-\u0003\u0001\u0019!A!B\u0013Q\u0004b\u0002'\u0001\u0005\u0004%\t!\u0014\u0005\u0007=\u0002\u0001\u000b\u0011\u0002(\t\u000b}\u0003A\u0011\t1\t\u000b1\u0004A\u0011A7\t\u000f\u0005u\u0001\u0001\"\u0001\u0002 !9\u0011q\u0006\u0001\u0005\u0002\u0005E\u0002bBA$\u0001\u0011\u0005\u0011\u0011\n\u0005\b\u0003G\u0002A\u0011AA3\u0011\u001d\t\u0019\b\u0001C\u0001\u0003kBq!a \u0001\t\u0003\t\t\tC\u0004\u0002&\u0002!I!a*\t\u000f\tU\u0001\u0001\"\u0001\u0003\u0018!9!Q\u0006\u0001\u0005\u0002\t=\u0002b\u0002B\u001d\u0001\u0011\u0005!1\b\u0005\b\u0005+\u0002A\u0011\u0002B,\u0011\u001d\u0011\u0019\u0007\u0001C\u0005\u0005KBqA!\u001f\u0001\t\u0013\u0011Y\b\u0003\u0007\u0003\b\u0002\u0001\n\u0011!A\u0001\n\u0003\u0011I\t\u0003\u0007\u0003\u000e\u0002\u0001\n\u0011!A\u0001\n\u0003\u0011yIA\fUKN$8\u000b\u001e:vGR,(/\u001a3TiJ,\u0017-\\5oO*\u0011A$H\u0001\u000bMVt7\r^5p]\u0006d'B\u0001\u0010 \u0003\u0011AW\u000fZ5\u000b\u0005\u0001\n\u0013AB1qC\u000eDWMC\u0001#\u0003\ry'oZ\u0002\u0001'\t\u0001Q\u0005\u0005\u0002'S5\tqE\u0003\u0002);\u0005IA/Z:ukRLGn]\u0005\u0003U\u001d\u0012A\u0003S8pI&,7\t\\5f]R$Vm\u001d;CCN,\u0017A\u0002\u001fj]&$h\bF\u0001.!\tq\u0003!D\u0001\u001c\u0003\rawnZ\u000b\u0002cA\u0011!'N\u0007\u0002g)\u0011AgH\u0001\u0006Y><GG[\u0005\u0003mM\u0012a\u0001T8hO\u0016\u0014\u0018\u0001\u00027pO\u0002\nQa\u001d9be.,\u0012A\u000f\t\u0003w}j\u0011\u0001\u0010\u0006\u0003{y\n1a]9m\u0015\tAt$\u0003\u0002Ay\ta1\u000b]1sWN+7o]5p]\u0006I1\u000f]1sW~#S-\u001d\u000b\u0003\u0007&\u0003\"\u0001R$\u000e\u0003\u0015S\u0011AR\u0001\u0006g\u000e\fG.Y\u0005\u0003\u0011\u0016\u0013A!\u00168ji\"9!*BA\u0001\u0002\u0004Q\u0014a\u0001=%c\u000511\u000f]1sW\u0002\n!bY8n[>tw\n\u001d;t+\u0005q\u0005\u0003B(U-Zk\u0011\u0001\u0015\u0006\u0003#J\u000b\u0011\"[7nkR\f'\r\\3\u000b\u0005M+\u0015AC2pY2,7\r^5p]&\u0011Q\u000b\u0015\u0002\u0004\u001b\u0006\u0004\bCA,]\u001b\u0005A&BA-[\u0003\u0011a\u0017M\\4\u000b\u0003m\u000bAA[1wC&\u0011Q\f\u0017\u0002\u0007'R\u0014\u0018N\\4\u0002\u0017\r|W.\\8o\u001fB$8\u000fI\u0001\u0006g\u0016$X\u000b\u001d\u000b\u0002\u0007\"\u0012\u0011B\u0019\t\u0003G*l\u0011\u0001\u001a\u0006\u0003K\u001a\f1!\u00199j\u0015\t9\u0007.A\u0004kkBLG/\u001a:\u000b\u0005%\f\u0013!\u00026v]&$\u0018BA6e\u0005)\u0011UMZ8sK\u0016\u000b7\r[\u0001\u001aS:LGo\u0016:ji&twm\u0015;sK\u0006l\u0017N\\4Rk\u0016\u0014\u0018\u0010F\u0004oir\f\t\"!\u0006\u0011\u0005=\u0014X\"\u00019\u000b\u0005Ed\u0014!C:ue\u0016\fW.\u001b8h\u0013\t\u0019\bO\u0001\bTiJ,\u0017-\\5oOF+XM]=\t\u000bUT\u0001\u0019\u0001<\u0002\rM\u001c\u0007.Z7b!\t9(0D\u0001y\u0015\tIH(A\u0003usB,7/\u0003\u0002|q\nQ1\u000b\u001e:vGR$\u0016\u0010]3\t\u000buT\u0001\u0019\u0001@\u0002\u0015M|WO]2f!\u0006$\b\u000eE\u0002\u0000\u0003\u001bqA!!\u0001\u0002\nA\u0019\u00111A#\u000e\u0005\u0005\u0015!bAA\u0004G\u00051AH]8pizJ1!a\u0003F\u0003\u0019\u0001&/\u001a3fM&\u0019Q,a\u0004\u000b\u0007\u0005-Q\t\u0003\u0004\u0002\u0014)\u0001\rA`\u0001\tI\u0016\u001cH\u000fU1uQ\"9\u0011q\u0003\u0006A\u0002\u0005e\u0011a\u00035vI&|\u0005\u000f^5p]N\u0004Ra`A\u000e}zL1!VA\b\u0003yIg.\u001b;TiJ,\u0017-\\5oON{WO]2f\u0003:$G)Z:u!\u0006$\b\u000e\u0006\u0004\u0002\"\u0005\u001d\u00121\u0006\t\u0006\t\u0006\rbP`\u0005\u0004\u0003K)%A\u0002+va2,'\u0007\u0003\u0004\u0002*-\u0001\rA`\u0001\u000eg>,(oY3ESJt\u0015-\\3\t\r\u000552\u00021\u0001\u007f\u0003-!Wm\u001d;ESJt\u0015-\\3\u0002)\u001d,Go\u00149ug^KG\u000f\u001b+bE2,G+\u001f9f)\u0011\tI\"a\r\t\u000f\u0005UB\u00021\u0001\u00028\u0005IA/\u00192mKRK\b/\u001a\t\u0005\u0003s\t\u0019%\u0004\u0002\u0002<)!\u0011QHA \u0003\u0015iw\u000eZ3m\u0015\r\t\t%H\u0001\u0007G>lWn\u001c8\n\t\u0005\u0015\u00131\b\u0002\u0010\u0011>|G-[3UC\ndW\rV=qK\u0006\tr-\u001a;DYV\u001cH/\u001a:j]\u001e|\u0005\u000f^:\u0015\u0019\u0005e\u00111JA'\u0003#\n)&!\u0017\t\u000f\u0005UR\u00021\u0001\u00028!1\u0011qJ\u0007A\u0002y\f!#[:J]2Lg.Z\"mkN$XM]5oO\"1\u00111K\u0007A\u0002y\f\u0011#[:Bgft7m\u00117vgR,'/\u001b8h\u0011\u0019\t9&\u0004a\u0001}\u0006\u00192\r\\;ti\u0016\u0014\u0018N\\4Ok6\u001cu.\\7ji\"9\u00111L\u0007A\u0002\u0005u\u0013\u0001\u00054jY\u0016l\u0015\r\u001f*fG>\u0014HMT;n!\r!\u0015qL\u0005\u0004\u0003C*%aA%oi\u0006\tr-\u001a;D_6\u0004\u0018m\u0019;j_:|\u0005\u000f^:\u0015\r\u0005e\u0011qMA5\u0011\u001d\t)D\u0004a\u0001\u0003oAq!a\u001b\u000f\u0001\u0004\ti'A\tjg\u0006\u001b\u0018P\\2D_6\u0004\u0018m\u0019;j_:\u00042\u0001RA8\u0013\r\t\t(\u0012\u0002\b\u0005>|G.Z1o\u0003u\u0019HO];diV\u0014X\rZ*ue\u0016\fW.\u001b8h)\u0016\u001cHOU;o]\u0016\u0014HcB\"\u0002x\u0005e\u0014Q\u0010\u0005\b\u0003ky\u0001\u0019AA\u001c\u0011\u001d\tYh\u0004a\u0001\u0003[\nA#\u00193e\u0007>l\u0007/Y2uS>t7i\u001c8gS\u001e\u001c\bbBA6\u001f\u0001\u0007\u0011QN\u0001\u0018i\u0016\u001cHo\u0015;sk\u000e$XO]3e'R\u0014X-Y7j]\u001e$2aQAB\u0011\u001d\t)\u0004\u0005a\u0001\u0003oAs\u0001EAD\u0003/\u000bI\n\u0005\u0003\u0002\n\u0006MUBAAF\u0015\u0011\ti)a$\u0002\u0011A\u0014xN^5eKJT1!!%g\u0003\u0019\u0001\u0018M]1ng&!\u0011QSAF\u0005))e.^7T_V\u00148-Z\u0001\u0006m\u0006dW/Z\u0012\u0003\u0003oA3\u0001EAO!\u0011\ty*!)\u000e\u0005\u0005=\u0015\u0002BAR\u0003\u001f\u0013\u0011\u0003U1sC6,G/\u001a:ju\u0016$G+Z:u\u0003]9\u0018-\u001b;US2d\u0017\t\u001e7fCN$hjQ8n[&$8\u000f\u0006\u0007\u0002^\u0005%\u00161XA`\u0003\u0007\f9\rC\u0004\u0002,F\u0001\r!!,\u0002\u0005\u0019\u001c\b\u0003BAX\u0003ok!!!-\u000b\t\u0005-\u00161\u0017\u0006\u0004\u0003k{\u0012A\u00025bI>|\u0007/\u0003\u0003\u0002:\u0006E&A\u0003$jY\u0016\u001c\u0016p\u001d;f[\"1\u0011QX\tA\u0002y\f\u0011\u0002^1cY\u0016\u0004\u0016\r\u001e5\t\u000f\u0005\u0005\u0017\u00031\u0001\u0002^\u0005Qa.^7D_6l\u0017\u000e^:\t\u000f\u0005\u0015\u0017\u00031\u0001\u0002^\u0005YA/[7f_V$8+Z2t\u0011\u001d\tI-\u0005a\u0001\u0003;\nQc\u001d7fKB\u001cVmY:BMR,'/R1dQJ+h\u000eK\u0003\u0012\u0003\u001b\f)\u000fE\u0003E\u0003\u001f\f\u0019.C\u0002\u0002R\u0016\u0013a\u0001\u001e5s_^\u001c\b\u0003BAk\u0003?tA!a6\u0002\\:!\u00111AAm\u0013\u00051\u0015bAAo\u000b\u00069\u0001/Y2lC\u001e,\u0017\u0002BAq\u0003G\u0014A#\u00138uKJ\u0014X\u000f\u001d;fI\u0016C8-\u001a9uS>t'bAAo\u000bF2aD`At\u0005'\t\u0014bIAu\u0003c\u0014I!a=\u0016\t\u0005-\u0018Q^\u000b\u0002}\u00129\u0011q^\u0012C\u0002\u0005e(!\u0001+\n\t\u0005M\u0018Q_\u0001\u001cI1,7o]5oSR$sM]3bi\u0016\u0014H\u0005Z3gCVdG\u000fJ\u0019\u000b\u0007\u0005]X)\u0001\u0004uQJ|wo]\t\u0005\u0003w\u0014\t\u0001E\u0002E\u0003{L1!a@F\u0005\u001dqu\u000e\u001e5j]\u001e\u0004BAa\u0001\u0003\u00069\u0019A)a7\n\t\t\u001d\u00111\u001d\u0002\n)\"\u0014xn^1cY\u0016\f\u0014b\tB\u0006\u0005\u001b\u0011y!a>\u000f\u0007\u0011\u0013i!C\u0002\u0002x\u0016\u000bTA\t#F\u0005#\u0011Qa]2bY\u0006\f4AJAj\u0003\u0015\"Xm\u001d;TiJ,8\r^;sK\u0012\u001cFO]3b[&twmV5uQ\u000ecWo\u001d;fe&tw\rF\u0002D\u00053Aq!a\u0015\u0013\u0001\u0004\ti\u0007K\u0004\u0013\u0005;\u0011\u0019C!\n\u0011\t\u0005%%qD\u0005\u0005\u0005C\tYIA\u0006WC2,XmU8ve\u000e,\u0017\u0001\u00032p_2,\u0017M\\:-\t\t\u001d\"\u0011F\r\u0002\u0003e\t\u0001\u0001K\u0002\u0013\u0003;\u000bQ\u0005^3tiN#(/^2ukJ,Gm\u0015;sK\u0006l\u0017N\\4XSRD7i\\7qC\u000e$\u0018n\u001c8\u0015\u0007\r\u0013\t\u0004C\u0004\u0002lM\u0001\r!!\u001c)\u000fM\u0011iBa\t\u000361\"!q\u0005B\u0015Q\r\u0019\u0012QT\u0001+gR\u0014Xo\u0019;ve\u0016$7\u000b\u001e:fC6Lgn\u001a$peR+7\u000f^\"mkN$XM]5oOJ+hN\\3s)=\u0019%Q\bB \u0005\u0003\u0012\u0019E!\u0012\u0003H\t-\u0003\"B?\u0015\u0001\u0004q\bBBA\n)\u0001\u0007a\u0010C\u0004\u00026Q\u0001\r!a\u000e\t\u000f\u0005=C\u00031\u0001\u0002n!9\u00111\u000b\u000bA\u0002\u00055\u0004B\u0002B%)\u0001\u0007a0\u0001\nqCJ$\u0018\u000e^5p]>3'+Z2pe\u0012\u001c\bb\u0002B')\u0001\u0007!qJ\u0001\u0016G\",7m[\"mkN$XM]5oOJ+7/\u001e7u!\u0015!%\u0011\u000b@D\u0013\r\u0011\u0019&\u0012\u0002\n\rVt7\r^5p]F\n\u0011dZ3u\u0019\u0006$Xm\u001d;GS2,wI]8vaN4\u0015\u000e\\3JIR!!\u0011\fB0!\u0011!%1\f@\n\u0007\tuSIA\u0003BeJ\f\u0017\u0010\u0003\u0004\u0003bU\u0001\rA`\u0001\na\u0006\u0014H/\u001b;j_:\f!e^1jiRKG\u000e\u001c%bg\u000e{W\u000e\u001d7fi\u0016$'+\u001a9mC\u000e,\u0017J\\:uC:$HcB\"\u0003h\t%$1\u000e\u0005\u0007\u0003{3\u0002\u0019\u0001@\t\u000f\u0005\u0015g\u00031\u0001\u0002^!9\u0011\u0011\u001a\fA\u0002\u0005u\u0003&\u0002\f\u0002N\n=\u0014G\u0002\u0010\u007f\u0005c\u00129(M\u0005$\u0003S\f\tPa\u001d\u0002tFJ1Ea\u0003\u0003\u000e\tU\u0014q_\u0019\u0006E\u0011+%\u0011C\u0019\u0004M\u0005M\u0017!\u00047bi\u0016\u001cH/\u00138ti\u0006tG\u000fF\u0004\u007f\u0005{\u0012yHa!\t\u000f\u0005-v\u00031\u0001\u0002.\"1!\u0011Q\fA\u0002y\f\u0001BY1tKB\u000bG\u000f\u001b\u0005\u0007\u0005\u000b;\u0002\u0019\u0001@\u0002\u001b%t7\u000f^1oi\u0006\u001bG/[8o\u00031\u0001(o\u001c;fGR,G\r\n4t)\u0011\tiKa#\t\u000f)C\u0012\u0011!a\u0001[\u00059\u0002O]8uK\u000e$X\r\u001a\u0013tKRlW\r^1DY&,g\u000e\u001e\u000b\u0006\u0007\nE%1\u0013\u0005\b\u0015f\t\t\u00111\u0001.\u0011%\u0011)*GA\u0001\u0002\u0004\u00119*A\u0002yII\u0002BA!'\u0003 6\u0011!1\u0014\u0006\u0005\u0005;\u000by$A\u0003uC\ndW-\u0003\u0003\u0003\"\nm%!\u0006%p_\u0012LW\rV1cY\u0016lU\r^1DY&,g\u000e\u001e")
public class TestStructuredStreaming
extends HoodieClientTestBase {
    private final Logger log = LogManager.getLogger((Class)this.getClass());
    private SparkSession spark;
    private final Map<String, String> commonOpts = (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.insert.shuffle.parallelism"), (Object)"4"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"hoodie.upsert.shuffle.parallelism"), (Object)"4"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.RECORDKEY_FIELD().key()), (Object)"_row_key"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.PARTITIONPATH_FIELD().key()), (Object)"partition"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.PRECOMBINE_FIELD().key()), (Object)"timestamp"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)HoodieWriteConfig.TBL_NAME.key()), (Object)"hoodie_test")}));

    public /* synthetic */ FileSystem protected$fs(TestStructuredStreaming x$1) {
        return x$1.fs;
    }

    public /* synthetic */ void protected$setmetaClient(TestStructuredStreaming x$1, HoodieTableMetaClient x$2) {
        x$1.metaClient = x$2;
    }

    private Logger log() {
        return this.log;
    }

    public SparkSession spark() {
        return this.spark;
    }

    public void spark_$eq(SparkSession x$1) {
        this.spark = x$1;
    }

    public Map<String, String> commonOpts() {
        return this.commonOpts;
    }

    @BeforeEach
    public void setUp() {
        super.setUp();
        this.spark_$eq(this.sqlContext.sparkSession());
        this.spark().conf().set("spark.sql.streaming.stopTimeout", 30000L);
    }

    public StreamingQuery initWritingStreamingQuery(StructType schema, String sourcePath, String destPath, Map<String, String> hudiOptions) {
        Dataset streamingInput = this.spark().readStream().schema(schema).json(sourcePath);
        return streamingInput.writeStream().format("org.apache.hudi").options(hudiOptions).trigger(Trigger.ProcessingTime((long)1000L)).option("checkpointLocation", new StringBuilder(11).append(this.basePath).append("/checkpoint").toString()).outputMode(OutputMode.Append()).start(destPath);
    }

    public Tuple2<String, String> initStreamingSourceAndDestPath(String sourceDirName, String destDirName) {
        this.fs.delete(new Path(this.basePath), true);
        String sourcePath = new StringBuilder(1).append(this.basePath).append("/").append(sourceDirName).toString();
        String destPath = new StringBuilder(1).append(this.basePath).append("/").append(destDirName).toString();
        this.fs.mkdirs(new Path(sourcePath));
        return new Tuple2((Object)sourcePath, (Object)destPath);
    }

    public Map<String, String> getOptsWithTableType(HoodieTableType tableType) {
        return this.commonOpts().$plus(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.TABLE_TYPE().key()), (Object)tableType.name()));
    }

    public Map<String, String> getClusteringOpts(HoodieTableType tableType, String isInlineClustering, String isAsyncClustering, String clusteringNumCommit, int fileMaxRecordNum) {
        return this.getOptsWithTableType(tableType).$plus(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)HoodieClusteringConfig.INLINE_CLUSTERING.key()), (Object)isInlineClustering), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key()), (Object)clusteringNumCommit), (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.ASYNC_CLUSTERING_ENABLE().key()), (Object)isAsyncClustering), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key()), (Object)clusteringNumCommit), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key()), (Object)((Object)BoxesRunTime.boxToInteger((int)this.dataGen.getEstimatedFileSizeInBytes(fileMaxRecordNum))).toString())}));
    }

    public Map<String, String> getCompactionOpts(HoodieTableType tableType, boolean isAsyncCompaction) {
        return this.getOptsWithTableType(tableType).$plus(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)DataSourceWriteOptions$.MODULE$.ASYNC_COMPACT_ENABLE().key()), (Object)((Object)BoxesRunTime.boxToBoolean((boolean)isAsyncCompaction)).toString()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key()), (Object)"1"), (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[0]));
    }

    public void structuredStreamingTestRunner(HoodieTableType tableType, boolean addCompactionConfigs, boolean isAsyncCompaction) {
        Tuple2<String, String> tuple2 = this.initStreamingSourceAndDestPath("source", "dest");
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        String sourcePath = (String)tuple2._1();
        String destPath = (String)tuple2._2();
        Tuple2 tuple22 = new Tuple2((Object)sourcePath, (Object)destPath);
        Tuple2 tuple23 = tuple22;
        String sourcePath2 = (String)tuple23._1();
        String destPath2 = (String)tuple23._2();
        List records1 = JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateInserts("000", Predef$.MODULE$.int2Integer(100)))).toList();
        Dataset inputDF1 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records1, 2, ClassTag$.MODULE$.apply(String.class)));
        List records2 = JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateUpdates("001", Predef$.MODULE$.int2Integer(100)))).toList();
        Dataset inputDF2 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records2, 2, ClassTag$.MODULE$.apply(String.class)));
        long uniqueKeyCnt = inputDF2.select("_row_key", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[0])).distinct().count();
        Map<String, String> hudiOptions = addCompactionConfigs ? this.getCompactionOpts(tableType, isAsyncCompaction) : this.getOptsWithTableType(tableType);
        StreamingQuery streamingQuery = this.initWritingStreamingQuery(inputDF1.schema(), sourcePath2, destPath2, hudiOptions);
        Future f2 = Future$.MODULE$.apply((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            inputDF1.coalesce(1).write().mode(SaveMode.Append).json(sourcePath2);
            int currNumCommits = this.waitTillAtleastNCommits(this.protected$fs(this), destPath2, 1, 120, 5);
            Assertions.assertTrue((boolean)HoodieDataSourceHelpers.hasNewCommits((FileSystem)this.protected$fs(this), (String)destPath2, (String)"000"));
            String commitInstantTime1 = HoodieDataSourceHelpers.latestCommit((FileSystem)this.protected$fs(this), (String)destPath2);
            Dataset hoodieROViewDF1 = this.spark().read().format("org.apache.hudi").load(new StringBuilder(8).append(destPath2).append("/*/*/*/*").toString());
            Predef$.MODULE$.assert(hoodieROViewDF1.count() == 100L);
            inputDF2.coalesce(1).write().mode(SaveMode.Append).json(sourcePath2);
            int numExpectedCommits = addCompactionConfigs ? currNumCommits + 2 : currNumCommits + 1;
            this.waitTillAtleastNCommits(this.protected$fs(this), destPath2, numExpectedCommits, 120, 5);
            HoodieTableType hoodieTableType = tableType;
            HoodieTableType hoodieTableType2 = HoodieTableType.MERGE_ON_READ;
            String commitInstantTime2 = !(hoodieTableType != null ? !hoodieTableType.equals(hoodieTableType2) : hoodieTableType2 != null) ? this.latestInstant(this.protected$fs(this), destPath2, "deltacommit") : HoodieDataSourceHelpers.latestCommit((FileSystem)this.protected$fs(this), (String)destPath2);
            Assertions.assertEquals((int)numExpectedCommits, (int)HoodieDataSourceHelpers.listCommitsSince((FileSystem)this.protected$fs(this), (String)destPath2, (String)"000").size());
            Dataset hoodieROViewDF2 = this.spark().read().format("org.apache.hudi").load(new StringBuilder(8).append(destPath2).append("/*/*/*/*").toString());
            Assertions.assertEquals((long)100L, (long)hoodieROViewDF2.count());
            String firstCommit = (String)HoodieDataSourceHelpers.listCommitsSince((FileSystem)this.protected$fs(this), (String)destPath2, (String)"000").get(0);
            Dataset hoodieIncViewDF1 = this.spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME().key(), "000").option(DataSourceReadOptions$.MODULE$.END_INSTANTTIME().key(), firstCommit).load(destPath2);
            Assertions.assertEquals((long)100L, (long)hoodieIncViewDF1.count());
            Row[] countsPerCommit = (Row[])hoodieIncViewDF1.groupBy("_hoodie_commit_time", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[0])).count().collect();
            Assertions.assertEquals((int)1, (int)countsPerCommit.length);
            Assertions.assertEquals((Object)firstCommit, (Object)countsPerCommit[0].get(0));
            Dataset hoodieIncViewDF2 = this.spark().read().format("org.apache.hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME().key(), commitInstantTime1).load(destPath2);
            Assertions.assertEquals((long)uniqueKeyCnt, (long)hoodieIncViewDF2.count());
            countsPerCommit = (Row[])hoodieIncViewDF2.groupBy("_hoodie_commit_time", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[0])).count().collect();
            Assertions.assertEquals((int)1, (int)countsPerCommit.length);
            Assertions.assertEquals((Object)commitInstantTime2, (Object)countsPerCommit[0].get(0));
            streamingQuery.stop();
        }, ExecutionContext.Implicits$.MODULE$.global());
        Await$.MODULE$.result((Awaitable)f2, Duration$.MODULE$.apply("120s"));
    }

    @ParameterizedTest
    @EnumSource(value=HoodieTableType.class)
    public void testStructuredStreaming(HoodieTableType tableType) {
        this.structuredStreamingTestRunner(tableType, false, false);
    }

    private int waitTillAtleastNCommits(FileSystem fs, String tablePath, int numCommits, int timeoutSecs, int sleepSecsAfterEachRun) throws InterruptedException {
        long beginTime;
        long currTime = beginTime = System.currentTimeMillis();
        int timeoutMsecs = timeoutSecs * 1000;
        int numInstants = 0;
        boolean success = false;
        while (!success && currTime - beginTime < (long)timeoutMsecs) {
            try {
                try {
                    HoodieTimeline timeline = HoodieDataSourceHelpers.allCompletedCommitsCompactions((FileSystem)fs, (String)tablePath);
                    this.log().info((Object)new StringBuilder(10).append("Timeline :").append(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(timeline.getInstants().toArray())).mkString("Array(", ", ", ")")).toString());
                    if (timeline.countInstants() < numCommits) continue;
                    numInstants = timeline.countInstants();
                    success = true;
                }
                catch (TableNotFoundException tableNotFoundException) {
                    this.log().info((Object)"Got table not found exception. Retrying");
                }
            }
            finally {
                if (success) continue;
                Thread.sleep(sleepSecsAfterEachRun * 1000);
                currTime = System.currentTimeMillis();
            }
        }
        if (!success) {
            throw new IllegalStateException(new StringBuilder(44).append("Timed-out waiting for ").append(numCommits).append(" commits to appear in ").append(tablePath).toString());
        }
        return numInstants;
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testStructuredStreamingWithClustering(boolean isAsyncClustering) {
        Tuple2<String, String> tuple2 = this.initStreamingSourceAndDestPath("source", "dest");
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        String sourcePath = (String)tuple2._1();
        String destPath2 = (String)tuple2._2();
        Tuple2 tuple22 = new Tuple2((Object)sourcePath, (Object)destPath2);
        Tuple2 tuple23 = tuple22;
        String sourcePath2 = (String)tuple23._1();
        String destPath3 = (String)tuple23._2();
        this.structuredStreamingForTestClusteringRunner(sourcePath2, destPath3, HoodieTableType.COPY_ON_WRITE, !isAsyncClustering, isAsyncClustering, "2016/03/15", (Function1<String, BoxedUnit>)(Function1 & Serializable & scala.Serializable)destPath -> {
            this.checkClusteringResult$1(destPath);
            return BoxedUnit.UNIT;
        });
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testStructuredStreamingWithCompaction(boolean isAsyncCompaction) {
        this.structuredStreamingTestRunner(HoodieTableType.MERGE_ON_READ, true, isAsyncCompaction);
    }

    public void structuredStreamingForTestClusteringRunner(String sourcePath, String destPath, HoodieTableType tableType, boolean isInlineClustering, boolean isAsyncClustering, String partitionOfRecords, Function1<String, BoxedUnit> checkClusteringResult) {
        List records1 = JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateInsertsForPartition("000", Predef$.MODULE$.int2Integer(100), partitionOfRecords))).toList();
        Dataset inputDF1 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records1, 2, ClassTag$.MODULE$.apply(String.class)));
        List records2 = JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(RawTripTestPayload.recordsToStrings((java.util.List)this.dataGen.generateInsertsForPartition("000", Predef$.MODULE$.int2Integer(100), partitionOfRecords))).toList();
        Dataset inputDF2 = this.spark().read().json(this.spark().sparkContext().parallelize((Seq)records2, 2, ClassTag$.MODULE$.apply(String.class)));
        Map<String, String> hudiOptions = this.getClusteringOpts(tableType, ((Object)BoxesRunTime.boxToBoolean((boolean)isInlineClustering)).toString(), ((Object)BoxesRunTime.boxToBoolean((boolean)isAsyncClustering)).toString(), "2", 100);
        StreamingQuery streamingQuery = this.initWritingStreamingQuery(inputDF1.schema(), sourcePath, destPath, hudiOptions);
        Future f2 = Future$.MODULE$.apply((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            inputDF1.coalesce(1).write().mode(SaveMode.Append).json(sourcePath);
            int currNumCommits = this.waitTillAtleastNCommits(this.protected$fs(this), destPath, 1, 120, 5);
            Assertions.assertTrue((boolean)HoodieDataSourceHelpers.hasNewCommits((FileSystem)this.protected$fs(this), (String)destPath, (String)"000"));
            inputDF2.coalesce(1).write().mode(SaveMode.Append).json(sourcePath);
            currNumCommits = this.waitTillAtleastNCommits(this.protected$fs(this), destPath, currNumCommits + 1, 120, 5);
            this.protected$setmetaClient(this, HoodieTableMetaClient.builder().setConf(this.protected$fs(this).getConf()).setBasePath(destPath).setLoadActiveTimelineOnLoad(true).build());
            checkClusteringResult.apply((Object)destPath);
            Assertions.assertEquals((int)3, (int)HoodieDataSourceHelpers.listCommitsSince((FileSystem)this.protected$fs(this), (String)destPath, (String)"000").size());
            Assertions.assertTrue((new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])this.getLatestFileGroupsFileId(partitionOfRecords))).size() > 0 ? 1 : 0) != 0);
            Dataset hoodieROViewDF2 = this.spark().read().format("org.apache.hudi").load(new StringBuilder(8).append(destPath).append("/*/*/*/*").toString());
            Assertions.assertEquals((long)200L, (long)hoodieROViewDF2.count());
            Row[] countsPerCommit = (Row[])hoodieROViewDF2.groupBy("_hoodie_commit_time", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[0])).count().collect();
            Assertions.assertEquals((int)2, (int)countsPerCommit.length);
            String commitInstantTime2 = this.latestInstant(this.protected$fs(this), destPath, "commit");
            Assertions.assertEquals((Object)commitInstantTime2, (Object)((Row)new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])countsPerCommit)).maxBy((Function1 & Serializable & scala.Serializable)row -> (String)row.getAs(0), (Ordering)Ordering.String$.MODULE$)).get(0));
            streamingQuery.stop();
        }, ExecutionContext.Implicits$.MODULE$.global());
        Await$.MODULE$.result((Awaitable)f2, Duration$.MODULE$.apply("120s"));
    }

    private String[] getLatestFileGroupsFileId(String partition) {
        this.getHoodieTableFileSystemView(this.metaClient, (HoodieTimeline)this.metaClient.getActiveTimeline(), HoodieTestTable.of((HoodieTableMetaClient)this.metaClient).listAllBaseFiles());
        return (String[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(this.tableView.getLatestFileSlices(partition).toArray())).map((Function1 & Serializable & scala.Serializable)slice -> ((FileSlice)slice).getFileGroupId().getFileId(), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class)));
    }

    private void waitTillHasCompletedReplaceInstant(String tablePath, int timeoutSecs, int sleepSecsAfterEachRun) throws InterruptedException {
        long beginTime;
        long currTime = beginTime = System.currentTimeMillis();
        int timeoutMsecs = timeoutSecs * 1000;
        boolean success = false;
        while (!success && currTime - beginTime < (long)timeoutMsecs) {
            try {
                try {
                    this.metaClient.reloadActiveTimeline();
                    int completeReplaceSize = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(this.metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray())).size();
                    Predef$.MODULE$.println((Object)new StringBuilder(20).append("completeReplaceSize:").append(completeReplaceSize).toString());
                    if (completeReplaceSize <= 0) continue;
                    success = true;
                }
                catch (TableNotFoundException te) {
                    this.log().info((Object)"Got table not found exception. Retrying");
                }
            }
            finally {
                Thread.sleep(sleepSecsAfterEachRun * 1000);
                currTime = System.currentTimeMillis();
            }
        }
        if (!success) {
            throw new IllegalStateException(new StringBuilder(59).append("Timed-out waiting for completing replace instant appear in ").append(tablePath).toString());
        }
    }

    private String latestInstant(FileSystem fs, String basePath, String instantAction) {
        HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
        return ((HoodieInstant)metaClient.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet((Object[])new String[]{instantAction})).filterCompletedInstants().lastInstant().get()).getTimestamp();
    }

    private final void checkClusteringResult$1(String destPath) {
        this.waitTillHasCompletedReplaceInstant(destPath, 120, 1);
        this.metaClient.reloadActiveTimeline();
        Assertions.assertEquals((int)1, (int)new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])this.getLatestFileGroupsFileId("2016/03/15"))).size());
    }
}

