/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.examples.spark;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.examples.common.HoodieExampleDataGenerator;
import org.apache.hudi.examples.common.HoodieExampleSparkUtils;
import org.apache.hudi.index.HoodieIndex;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

public class HoodieWriteClientExample {
    private static final Logger LOG = LogManager.getLogger(HoodieWriteClientExample.class);
    private static String tableType = HoodieTableType.COPY_ON_WRITE.name();

    public static void main(String[] args) throws Exception {
        if (args.length < 2) {
            System.err.println("Usage: HoodieWriteClientExample <tablePath> <tableName>");
            System.exit(1);
        }
        String tablePath = args[0];
        String tableName = args[1];
        SparkConf sparkConf = HoodieExampleSparkUtils.defaultSparkConf("hoodie-client-example");
        try (JavaSparkContext jsc = new JavaSparkContext(sparkConf);){
            HoodieExampleDataGenerator dataGen = new HoodieExampleDataGenerator();
            Path path = new Path(tablePath);
            FileSystem fs = FSUtils.getFs((String)tablePath, (Configuration)jsc.hadoopConfiguration());
            if (!fs.exists(path)) {
                HoodieTableMetaClient.withPropertyBuilder().setTableType(tableType).setTableName(tableName).setPayloadClass(HoodieAvroPayload.class).initTable(jsc.hadoopConfiguration(), tablePath);
            }
            HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath).withSchema(HoodieExampleDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).withDeleteParallelism(2).forTable(tableName).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(20, 30).build()).build();
            SparkRDDWriteClient client = new SparkRDDWriteClient((HoodieEngineContext)new HoodieSparkEngineContext(jsc), cfg);
            String newCommitTime = client.startCommit();
            LOG.info((Object)("Starting commit " + newCommitTime));
            List records = dataGen.generateInserts(newCommitTime, 10);
            ArrayList recordsSoFar = new ArrayList(records);
            JavaRDD writeRecords = jsc.parallelize(records, 1);
            client.insert(writeRecords, newCommitTime);
            newCommitTime = client.startCommit();
            LOG.info((Object)("Starting commit " + newCommitTime));
            List toBeUpdated = dataGen.generateUpdates(newCommitTime, 2);
            records.addAll(toBeUpdated);
            recordsSoFar.addAll(toBeUpdated);
            writeRecords = jsc.parallelize(records, 1);
            client.upsert(writeRecords, newCommitTime);
            newCommitTime = client.startCommit();
            LOG.info((Object)("Starting commit " + newCommitTime));
            int numToDelete = recordsSoFar.size() / 2;
            List toBeDeleted = recordsSoFar.stream().map(HoodieRecord::getKey).limit(numToDelete).collect(Collectors.toList());
            JavaRDD deleteRecords = jsc.parallelize(toBeDeleted, 1);
            client.delete(deleteRecords, newCommitTime);
            newCommitTime = client.startCommit();
            client.startCommitWithTime(newCommitTime, "replacecommit");
            LOG.info((Object)("Starting commit " + newCommitTime));
            List partitionList = toBeDeleted.stream().map(s -> s.getPartitionPath()).distinct().collect(Collectors.toList());
            List deleteList = recordsSoFar.stream().filter(f -> !partitionList.contains(f.getPartitionPath())).map(m -> m.getKey().getPartitionPath()).distinct().collect(Collectors.toList());
            client.deletePartitions(deleteList, newCommitTime);
            if (HoodieTableType.valueOf((String)tableType) == HoodieTableType.MERGE_ON_READ) {
                Option instant = client.scheduleCompaction(Option.empty());
                JavaRDD writeStatues = (JavaRDD)client.compact((String)instant.get());
                client.commitCompaction((String)instant.get(), writeStatues, Option.empty());
            }
        }
    }
}

