/*
 * Decompiled with CFR 0.152.
 */
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieDataSourceHelpers;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.testutils.Transformations;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncConfigHolder;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.hive.NonPartitionedExtractor;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;

public class HoodieJavaApp {
    @Parameter(names={"--table-path", "-p"}, description="path for Hoodie sample table")
    private String tablePath = "file:///tmp/hoodie/sample-table";
    @Parameter(names={"--table-name", "-n"}, description="table name for Hoodie sample table")
    private String tableName = "hoodie_test";
    @Parameter(names={"--table-type", "-t"}, description="One of COPY_ON_WRITE or MERGE_ON_READ")
    private String tableType = HoodieTableType.COPY_ON_WRITE.name();
    @Parameter(names={"--hive-sync", "-hv"}, description="Enable syncing to hive")
    private Boolean enableHiveSync = false;
    @Parameter(names={"--hive-db", "-hd"}, description="hive database")
    private String hiveDB = "default";
    @Parameter(names={"--hive-table", "-ht"}, description="hive table")
    private String hiveTable = "hoodie_sample_test";
    @Parameter(names={"--hive-user", "-hu"}, description="hive username")
    private String hiveUser = "hive";
    @Parameter(names={"--hive-password", "-hp"}, description="hive password")
    private String hivePass = "hive";
    @Parameter(names={"--hive-url", "-hl"}, description="hive JDBC URL")
    private String hiveJdbcUrl = "jdbc:hive2://localhost:10000";
    @Parameter(names={"--non-partitioned", "-np"}, description="Use non-partitioned Table")
    private Boolean nonPartitionedTable = false;
    @Parameter(names={"--use-multi-partition-keys", "-mp"}, description="Use Multiple Partition Keys")
    private Boolean useMultiPartitionKeys = false;
    @Parameter(names={"--help", "-h"}, help=true)
    public Boolean help = false;
    private static final Logger LOG = LogManager.getLogger(HoodieJavaApp.class);

    public static void main(String[] args) throws Exception {
        HoodieJavaApp cli = new HoodieJavaApp();
        JCommander cmd = new JCommander((Object)cli, null, args);
        if (cli.help.booleanValue()) {
            cmd.usage();
            System.exit(1);
        }
        cli.run();
    }

    public void run() throws Exception {
        SparkSession spark = SparkSession.builder().appName("Hoodie Spark APP").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[1]").getOrCreate();
        JavaSparkContext jssc = new JavaSparkContext(spark.sparkContext());
        spark.sparkContext().setLogLevel("WARN");
        FileSystem fs = FileSystem.get((Configuration)jssc.hadoopConfiguration());
        HoodieTestDataGenerator dataGen = null;
        dataGen = this.nonPartitionedTable != false ? new HoodieTestDataGenerator(new String[]{""}) : new HoodieTestDataGenerator();
        fs.delete(new Path(this.tablePath), true);
        ArrayList recordsSoFar = new ArrayList(dataGen.generateInserts("001", Integer.valueOf(100)));
        List records1 = RawTripTestPayload.recordsToStrings(recordsSoFar);
        Dataset inputDF1 = spark.read().json(jssc.parallelize(records1, 2));
        DataFrameWriter writer = inputDF1.write().format("org.apache.hudi").option("hoodie.insert.shuffle.parallelism", "2").option("hoodie.upsert.shuffle.parallelism", "2").option(DataSourceWriteOptions.TABLE_TYPE().key(), this.tableType).option(DataSourceWriteOptions.OPERATION().key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL()).option(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key").option(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "partition").option(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp").option(HoodieWriteConfig.TBL_NAME.key(), this.tableName).option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), this.nonPartitionedTable != false ? NonpartitionedKeyGenerator.class.getCanonicalName() : SimpleKeyGenerator.class.getCanonicalName()).option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key(), "false").option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), "true").mode(SaveMode.Overwrite);
        this.updateHiveSyncConfig((DataFrameWriter<Row>)writer);
        writer.save(this.tablePath);
        String commitInstantTime1 = HoodieDataSourceHelpers.latestCommit((FileSystem)fs, (String)this.tablePath);
        LOG.info((Object)("First commit at instant time :" + commitInstantTime1));
        List recordsToBeUpdated = dataGen.generateUpdates("002", Integer.valueOf(100));
        recordsSoFar.addAll(recordsToBeUpdated);
        List records2 = RawTripTestPayload.recordsToStrings((List)recordsToBeUpdated);
        Dataset inputDF2 = spark.read().json(jssc.parallelize(records2, 2));
        writer = inputDF2.write().format("org.apache.hudi").option("hoodie.insert.shuffle.parallelism", "2").option("hoodie.upsert.shuffle.parallelism", "2").option(DataSourceWriteOptions.TABLE_TYPE().key(), this.tableType).option(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key").option(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "partition").option(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp").option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), this.nonPartitionedTable != false ? NonpartitionedKeyGenerator.class.getCanonicalName() : SimpleKeyGenerator.class.getCanonicalName()).option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1").option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key(), "false").option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), "true").option(HoodieWriteConfig.TBL_NAME.key(), this.tableName).mode(SaveMode.Append);
        this.updateHiveSyncConfig((DataFrameWriter<Row>)writer);
        writer.save(this.tablePath);
        String commitInstantTime2 = HoodieDataSourceHelpers.latestCommit((FileSystem)fs, (String)this.tablePath);
        LOG.info((Object)("Second commit at instant time :" + commitInstantTime2));
        List deletes = Transformations.randomSelectAsHoodieKeys(recordsSoFar, (int)20).stream().map(hr -> "{\"_row_key\":\"" + hr.getRecordKey() + "\",\"partition\":\"" + hr.getPartitionPath() + "\"}").collect(Collectors.toList());
        Dataset inputDF3 = spark.read().json(jssc.parallelize(deletes, 2));
        writer = inputDF3.write().format("org.apache.hudi").option("hoodie.insert.shuffle.parallelism", "2").option("hoodie.upsert.shuffle.parallelism", "2").option("hoodie.delete.shuffle.parallelism", "2").option(DataSourceWriteOptions.TABLE_TYPE().key(), this.tableType).option(DataSourceWriteOptions.OPERATION().key(), "delete").option(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key").option(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "partition").option(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp").option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), this.nonPartitionedTable != false ? NonpartitionedKeyGenerator.class.getCanonicalName() : SimpleKeyGenerator.class.getCanonicalName()).option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1").option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key(), "false").option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), "true").option(HoodieWriteConfig.TBL_NAME.key(), this.tableName).mode(SaveMode.Append);
        this.updateHiveSyncConfig((DataFrameWriter<Row>)writer);
        writer.save(this.tablePath);
        String commitInstantTime3 = HoodieDataSourceHelpers.latestCommit((FileSystem)fs, (String)this.tablePath);
        LOG.info((Object)("Third commit at instant time :" + commitInstantTime3));
        Dataset snapshotQueryDF = spark.read().format("org.apache.hudi").load(this.tablePath + (this.nonPartitionedTable != false ? "/*" : "/*/*/*/*"));
        snapshotQueryDF.registerTempTable("hoodie_ro");
        spark.sql("describe hoodie_ro").show();
        spark.sql("select fare.amount, begin_lon, begin_lat, timestamp from hoodie_ro where fare.amount > 2.0").show();
        if (this.tableType.equals(HoodieTableType.COPY_ON_WRITE.name())) {
            Dataset incQueryDF = spark.read().format("org.apache.hudi").option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), commitInstantTime1).load(this.tablePath);
            LOG.info((Object)("You will only see records from : " + commitInstantTime2));
            incQueryDF.groupBy(new Column[]{incQueryDF.col("_hoodie_commit_time")}).count().show();
        }
    }

    private DataFrameWriter<Row> updateHiveSyncConfig(DataFrameWriter<Row> writer) {
        if (this.enableHiveSync.booleanValue()) {
            LOG.info((Object)("Enabling Hive sync to " + this.hiveJdbcUrl));
            writer = writer.option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), this.hiveTable).option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), this.hiveDB).option(HiveSyncConfigHolder.HIVE_URL.key(), this.hiveJdbcUrl).option(HiveSyncConfigHolder.HIVE_USER.key(), this.hiveUser).option(HiveSyncConfigHolder.HIVE_PASS.key(), this.hivePass).option(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key(), "true");
            writer = this.nonPartitionedTable != false ? writer.option(HiveSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), NonPartitionedExtractor.class.getCanonicalName()).option(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "") : (this.useMultiPartitionKeys != false ? writer.option(HiveSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "year,month,day").option(HiveSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), MultiPartKeysValueExtractor.class.getCanonicalName()) : writer.option(HiveSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "dateStr").option(HiveSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), SlashEncodedDayPartitionValueExtractor.class.getCanonicalName()));
        }
        return writer;
    }
}

