/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.examples.quickstart;

import java.io.Serializable;
import java.util.List;
import org.apache.hudi.QuickstartUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.examples.common.HoodieExampleDataGenerator;
import org.apache.hudi.examples.common.HoodieExampleSparkUtils;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;

public final class HoodieSparkQuickstart {
    private HoodieSparkQuickstart() {
    }

    public static void main(String[] args) {
        if (args.length < 2) {
            System.err.println("Usage: HoodieWriteClientExample <tablePath> <tableName>");
            System.exit(1);
        }
        String tablePath = args[0];
        String tableName = args[1];
        SparkSession spark = HoodieExampleSparkUtils.defaultSparkSession("Hudi Spark basic example");
        SparkConf sparkConf = HoodieExampleSparkUtils.defaultSparkConf("hoodie-client-example");
        try (JavaSparkContext jsc = new JavaSparkContext(sparkConf);){
            HoodieSparkQuickstart.runQuickstart(jsc, spark, tableName, tablePath);
        }
    }

    public static void runQuickstart(JavaSparkContext jsc, SparkSession spark, String tableName, String tablePath) {
        HoodieExampleDataGenerator dataGen = new HoodieExampleDataGenerator();
        String snapshotQuery = "SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table";
        Dataset<Row> insertDf = HoodieSparkQuickstart.insertData(spark, jsc, tablePath, tableName, (HoodieExampleDataGenerator<HoodieAvroPayload>)dataGen);
        HoodieSparkQuickstart.queryData(spark, jsc, tablePath, tableName, (HoodieExampleDataGenerator<HoodieAvroPayload>)dataGen);
        assert (insertDf.except(spark.sql(snapshotQuery)).count() == 0L);
        Dataset snapshotBeforeUpdate = spark.sql(snapshotQuery);
        Dataset<Row> updateDf = HoodieSparkQuickstart.updateData(spark, jsc, tablePath, tableName, (HoodieExampleDataGenerator<HoodieAvroPayload>)dataGen);
        HoodieSparkQuickstart.queryData(spark, jsc, tablePath, tableName, (HoodieExampleDataGenerator<HoodieAvroPayload>)dataGen);
        Dataset snapshotAfterUpdate = spark.sql(snapshotQuery);
        assert (snapshotAfterUpdate.intersect(updateDf).count() == updateDf.count());
        assert (snapshotAfterUpdate.except(updateDf).except(snapshotBeforeUpdate).count() == 0L);
        HoodieSparkQuickstart.incrementalQuery(spark, tablePath, tableName);
        HoodieSparkQuickstart.pointInTimeQuery(spark, tablePath, tableName);
        Dataset snapshotBeforeDelete = snapshotAfterUpdate;
        Dataset<Row> deleteDf = HoodieSparkQuickstart.delete(spark, tablePath, tableName);
        HoodieSparkQuickstart.queryData(spark, jsc, tablePath, tableName, (HoodieExampleDataGenerator<HoodieAvroPayload>)dataGen);
        Dataset snapshotAfterDelete = spark.sql(snapshotQuery);
        assert (snapshotAfterDelete.intersect(deleteDf).count() == 0L);
        assert (snapshotBeforeDelete.except(deleteDf).except(snapshotAfterDelete).count() == 0L);
        Dataset snapshotBeforeOverwrite = snapshotAfterDelete;
        Dataset<Row> overwriteDf = HoodieSparkQuickstart.insertOverwriteData(spark, jsc, tablePath, tableName, (HoodieExampleDataGenerator<HoodieAvroPayload>)dataGen);
        HoodieSparkQuickstart.queryData(spark, jsc, tablePath, tableName, (HoodieExampleDataGenerator<HoodieAvroPayload>)dataGen);
        Dataset withoutThirdPartitionDf = snapshotBeforeOverwrite.filter("partitionpath != '2020/01/03'");
        Dataset expectedDf = withoutThirdPartitionDf.union(overwriteDf);
        Dataset snapshotAfterOverwrite = spark.sql(snapshotQuery);
        assert (snapshotAfterOverwrite.except(expectedDf).count() == 0L);
        Dataset snapshotBeforeDeleteByPartition = snapshotAfterOverwrite;
        HoodieSparkQuickstart.deleteByPartition(spark, tablePath, tableName);
        HoodieSparkQuickstart.queryData(spark, jsc, tablePath, tableName, (HoodieExampleDataGenerator<HoodieAvroPayload>)dataGen);
        Dataset snapshotAfterDeleteByPartition = spark.sql(snapshotQuery);
        assert (snapshotAfterDeleteByPartition.intersect(snapshotBeforeDeleteByPartition.filter("partitionpath == '2020/01/01'")).count() == 0L);
        assert (snapshotAfterDeleteByPartition.count() == snapshotBeforeDeleteByPartition.filter("partitionpath != '2020/01/01'").count());
    }

    public static Dataset<Row> insertData(SparkSession spark, JavaSparkContext jsc, String tablePath, String tableName, HoodieExampleDataGenerator<HoodieAvroPayload> dataGen) {
        String commitTime = Long.toString(System.currentTimeMillis());
        List inserts = dataGen.convertToStringList(dataGen.generateInserts(commitTime, Integer.valueOf(20)));
        Dataset df = spark.read().json(jsc.parallelize(inserts, 1));
        df.write().format("org.apache.hudi").options(QuickstartUtils.getQuickstartWriteConfigs()).option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts").option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid").option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partitionpath").option(HoodieWriteConfig.TBL_NAME.key(), tableName).mode(SaveMode.Overwrite).save(tablePath);
        return df;
    }

    public static Dataset<Row> insertOverwriteData(SparkSession spark, JavaSparkContext jsc, String tablePath, String tableName, HoodieExampleDataGenerator<HoodieAvroPayload> dataGen) {
        String commitTime = Long.toString(System.currentTimeMillis());
        List inserts = dataGen.convertToStringList(dataGen.generateInsertsOnPartition(commitTime, Integer.valueOf(20), "2020/01/03"));
        Dataset df = spark.read().json(jsc.parallelize(inserts, 1));
        df.write().format("org.apache.hudi").options(QuickstartUtils.getQuickstartWriteConfigs()).option("hoodie.datasource.write.operation", WriteOperationType.INSERT_OVERWRITE.name()).option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts").option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid").option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partitionpath").option(HoodieWriteConfig.TBL_NAME.key(), tableName).mode(SaveMode.Append).save(tablePath);
        return df;
    }

    public static void queryData(SparkSession spark, JavaSparkContext jsc, String tablePath, String tableName, HoodieExampleDataGenerator<HoodieAvroPayload> dataGen) {
        Dataset roViewDF = spark.read().format("org.apache.hudi").load(tablePath + "/*/*/*/*");
        roViewDF.createOrReplaceTempView("hudi_ro_table");
        spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_ro_table where fare > 20.0").show();
        spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_ro_table").show();
    }

    public static Dataset<Row> updateData(SparkSession spark, JavaSparkContext jsc, String tablePath, String tableName, HoodieExampleDataGenerator<HoodieAvroPayload> dataGen) {
        String commitTime = Long.toString(System.currentTimeMillis());
        List updates = dataGen.convertToStringList(dataGen.generateUniqueUpdates(commitTime));
        Dataset df = spark.read().json(jsc.parallelize(updates, 1));
        df.write().format("org.apache.hudi").options(QuickstartUtils.getQuickstartWriteConfigs()).option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts").option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid").option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partitionpath").option(HoodieWriteConfig.TBL_NAME.key(), tableName).mode(SaveMode.Append).save(tablePath);
        return df;
    }

    public static Dataset<Row> delete(SparkSession spark, String tablePath, String tableName) {
        Dataset roViewDF = spark.read().format("org.apache.hudi").load(tablePath + "/*/*/*/*");
        roViewDF.createOrReplaceTempView("hudi_ro_table");
        Dataset toBeDeletedDf = spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table limit 2");
        Dataset df = toBeDeletedDf.select("uuid", new String[]{"partitionpath", "ts"});
        df.write().format("org.apache.hudi").options(QuickstartUtils.getQuickstartWriteConfigs()).option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts").option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "uuid").option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partitionpath").option(HoodieWriteConfig.TBL_NAME.key(), tableName).option("hoodie.datasource.write.operation", WriteOperationType.DELETE.value()).mode(SaveMode.Append).save(tablePath);
        return toBeDeletedDf;
    }

    public static void deleteByPartition(SparkSession spark, String tablePath, String tableName) {
        Dataset df = spark.emptyDataFrame();
        df.write().format("org.apache.hudi").options(QuickstartUtils.getQuickstartWriteConfigs()).option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts").option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid").option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partitionpath").option(HoodieWriteConfig.TBL_NAME.key(), tableName).option("hoodie.datasource.write.operation", WriteOperationType.DELETE_PARTITION.value()).option("hoodie.datasource.write.partitions.to.delete", "2020/01/01").mode(SaveMode.Append).save(tablePath);
    }

    public static void incrementalQuery(SparkSession spark, String tablePath, String tableName) {
        List commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_ro_table order by commitTime").toJavaRDD().map((Function & Serializable)row -> row.getString(0)).take(50);
        String beginTime = (String)commits.get(commits.size() - 1);
        Dataset incViewDF = spark.read().format("org.apache.hudi").option("hoodie.datasource.query.type", "incremental").option("hoodie.datasource.read.begin.instanttime", beginTime).load(tablePath);
        incViewDF.createOrReplaceTempView("hudi_incr_table");
        spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show();
    }

    public static void pointInTimeQuery(SparkSession spark, String tablePath, String tableName) {
        List commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_ro_table order by commitTime").toJavaRDD().map((Function & Serializable)row -> row.getString(0)).take(50);
        String beginTime = "000";
        String endTime = (String)commits.get(commits.size() - 1);
        Dataset incViewDF = spark.read().format("org.apache.hudi").option("hoodie.datasource.query.type", "incremental").option("hoodie.datasource.read.begin.instanttime", beginTime).option("hoodie.datasource.read.end.instanttime", endTime).load(tablePath);
        incViewDF.createOrReplaceTempView("hudi_incr_table");
        spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_incr_table where fare > 20.0").show();
    }
}

