package org.apache.hudi.utilities;

import com.beust.jcommander.IValueValidator;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.ResourceBundle;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.HoodieJsonPayload;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:org/apache/hudi/utilities/HDFSParquetImporter.class */
public class HDFSParquetImporter implements Serializable {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) HDFSParquetImporter.class);
    private static final DateTimeFormatter PARTITION_FORMATTER = DateTimeFormatter.ofPattern("yyyy/MM/dd").withZone(ZoneId.systemDefault());
    private final Config cfg;
    private transient FileSystem fs;
    private TypedProperties props;

    /* loaded from: input_file:org/apache/hudi/utilities/HDFSParquetImporter$CommandValidator.class */
    public static class CommandValidator implements IValueValidator<String> {
        List<String> validCommands = Arrays.asList("insert", "upsert", "bulkinsert");

        public void validate(String str, String str2) {
            if (str2 == null || !this.validCommands.contains(str2.toLowerCase())) {
                throw new ParameterException(String.format("Invalid command: value:%s: supported commands:%s", str2, this.validCommands));
            }
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/HDFSParquetImporter$Config.class */
    public static class Config implements Serializable {

        @Parameter(names = {"--command", "-c"}, description = "Write command Valid values are insert(default)/upsert/bulkinsert", validateValueWith = {CommandValidator.class})
        public String command = "INSERT";

        @Parameter(names = {"--src-path", "-sp"}, description = "Base path for the input table", required = true)
        public String srcPath = null;

        @Parameter(names = {"--target-path", "-tp"}, description = "Base path for the target hoodie table", required = true)
        public String targetPath = null;

        @Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true)
        public String tableName = null;

        @Parameter(names = {"--table-type", "-tt"}, description = "Table type", required = true)
        public String tableType = null;

        @Parameter(names = {"--row-key-field", "-rk"}, description = "Row key field name", required = true)
        public String rowKey = null;

        @Parameter(names = {"--partition-key-field", "-pk"}, description = "Partition key field name", required = true)
        public String partitionKey = null;

        @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert(default)/upsert/bulkinsert", required = true)
        public int parallelism = 1;

        @Parameter(names = {"--schema-file", "-sf"}, description = "path for Avro schema file", required = true)
        public String schemaFile = null;

        @Parameter(names = {"--format", "-f"}, description = "Format for the input data.", validateValueWith = {FormatValidator.class})
        public String format = null;

        @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
        public String sparkMaster = null;

        @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = true)
        public String sparkMemory = null;

        @Parameter(names = {"--retry", "-rt"}, description = "number of retries")
        public int retry = 0;

        @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for hoodie client for importing")
        public String propsFilePath = null;

        @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file (using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", splitter = IdentitySplitter.class)
        public List<String> configs = new ArrayList();

        @Parameter(names = {"--help", "-h"}, help = true)
        public Boolean help = false;
    }

    /* loaded from: input_file:org/apache/hudi/utilities/HDFSParquetImporter$FormatValidator.class */
    public static class FormatValidator implements IValueValidator<String> {
        List<String> validFormats = Collections.singletonList("parquet");

        public void validate(String str, String str2) {
            if (str2 == null || !this.validFormats.contains(str2)) {
                throw new ParameterException(String.format("Invalid format type: value:%s: supported formats:%s", str2, this.validFormats));
            }
        }
    }

    public HDFSParquetImporter(Config config) {
        this.cfg = config;
    }

    public static void main(String[] strArr) {
        Config config = new Config();
        JCommander jCommander = new JCommander(config, (ResourceBundle) null, strArr);
        if (config.help.booleanValue() || strArr.length == 0) {
            jCommander.usage();
            System.exit(1);
        }
        HDFSParquetImporter hDFSParquetImporter = new HDFSParquetImporter(config);
        JavaSparkContext buildSparkContext = UtilHelpers.buildSparkContext("data-importer-" + config.tableName, config.sparkMaster, config.sparkMemory);
        try {
            hDFSParquetImporter.dataImport(buildSparkContext, config.retry);
            buildSparkContext.stop();
        } catch (Throwable th) {
            buildSparkContext.stop();
            throw th;
        }
    }

    private boolean isUpsert() {
        return "upsert".equalsIgnoreCase(this.cfg.command);
    }

    public int dataImport(JavaSparkContext javaSparkContext, int i) {
        int i2;
        this.fs = FSUtils.getFs(this.cfg.targetPath, javaSparkContext.hadoopConfiguration());
        this.props = this.cfg.propsFilePath == null ? UtilHelpers.buildProperties(this.cfg.configs) : UtilHelpers.readConfig(this.fs.getConf(), new Path(this.cfg.propsFilePath), this.cfg.configs).getProps(true);
        LOG.info("Starting data import with configs : " + this.props.toString());
        int i3 = -1;
        try {
        } catch (Throwable th) {
            LOG.error("Import data error", th);
        }
        if (this.fs.exists(new Path(this.cfg.targetPath)) && !isUpsert()) {
            throw new HoodieIOException(String.format("Make sure %s is not present.", this.cfg.targetPath));
        }
        do {
            i3 = dataImport(javaSparkContext);
            if (i3 == 0) {
                break;
            }
            i2 = i;
            i--;
        } while (i2 > 0);
        return i3;
    }

    protected int dataImport(JavaSparkContext javaSparkContext) throws IOException {
        try {
            if (this.fs.exists(new Path(this.cfg.targetPath)) && !isUpsert()) {
                this.fs.delete(new Path(this.cfg.targetPath), true);
            }
            if (!this.fs.exists(new Path(this.cfg.targetPath))) {
                HoodieTableMetaClient.initTableAndGetMetaClient(javaSparkContext.hadoopConfiguration(), this.cfg.targetPath, HoodieTableMetaClient.withPropertyBuilder().setTableName(this.cfg.tableName).setTableType(this.cfg.tableType).build());
            }
            String parseSchema = UtilHelpers.parseSchema(this.fs, this.cfg.schemaFile);
            SparkRDDWriteClient<HoodieRecordPayload> createHoodieClient = UtilHelpers.createHoodieClient(javaSparkContext, this.cfg.targetPath, parseSchema, this.cfg.parallelism, Option.empty(), this.props);
            JavaRDD<HoodieRecord<HoodieRecordPayload>> buildHoodieRecordsForImport = buildHoodieRecordsForImport(javaSparkContext, parseSchema);
            String startCommit = createHoodieClient.startCommit();
            return UtilHelpers.handleErrors(javaSparkContext, startCommit, load(createHoodieClient, startCommit, buildHoodieRecordsForImport));
        } catch (Throwable th) {
            LOG.error("Error occurred.", th);
            return -1;
        }
    }

    protected JavaRDD<HoodieRecord<HoodieRecordPayload>> buildHoodieRecordsForImport(JavaSparkContext javaSparkContext, String str) throws IOException {
        Job job = Job.getInstance(javaSparkContext.hadoopConfiguration());
        job.getConfiguration().set("mapreduce.input.fileinputformat.input.dir.recursive", "true");
        job.getConfiguration().set("mapreduce.input.fileinputformat.list-status.num-threads", "1024");
        AvroReadSupport.setAvroReadSchema(javaSparkContext.hadoopConfiguration(), new Schema.Parser().parse(str));
        ParquetInputFormat.setReadSupportClass(job, AvroReadSupport.class);
        new HoodieSparkEngineContext(javaSparkContext).setJobStatus(getClass().getSimpleName(), "Build records for import: " + this.cfg.tableName);
        return javaSparkContext.newAPIHadoopFile(this.cfg.srcPath, ParquetInputFormat.class, Void.class, GenericRecord.class, job.getConfiguration()).coalesce(16 * this.cfg.parallelism).map(obj -> {
            GenericRecord genericRecord = (GenericRecord) ((Tuple2) obj)._2();
            Object obj = genericRecord.get(this.cfg.partitionKey);
            if (obj == null) {
                throw new HoodieIOException("partition key is missing. :" + this.cfg.partitionKey);
            }
            Object obj2 = genericRecord.get(this.cfg.rowKey);
            if (obj2 == null) {
                throw new HoodieIOException("row field is missing. :" + this.cfg.rowKey);
            }
            String obj3 = obj.toString();
            LOG.debug("Row Key : " + obj2 + ", Partition Path is (" + obj3 + ")");
            if (obj instanceof Number) {
                try {
                    obj3 = PARTITION_FORMATTER.format(Instant.ofEpochMilli((long) (Double.parseDouble(obj.toString()) * 1000.0d)));
                } catch (NumberFormatException e) {
                    LOG.warn("Unable to parse date from partition field. Assuming partition as (" + obj + ")");
                }
            }
            return new HoodieAvroRecord(new HoodieKey(obj2.toString(), obj3), new HoodieJsonPayload(genericRecord.toString()));
        });
    }

    protected <T extends HoodieRecordPayload> JavaRDD<WriteStatus> load(SparkRDDWriteClient<T> sparkRDDWriteClient, String str, JavaRDD<HoodieRecord<T>> javaRDD) {
        String lowerCase = this.cfg.command.toLowerCase();
        boolean z = -1;
        switch (lowerCase.hashCode()) {
            case -838395601:
                if (lowerCase.equals("upsert")) {
                    z = false;
                    break;
                }
                break;
            case -267485621:
                if (lowerCase.equals("bulkinsert")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return sparkRDDWriteClient.upsert(javaRDD, str);
            case true:
                return sparkRDDWriteClient.bulkInsert(javaRDD, str);
            default:
                return sparkRDDWriteClient.insert(javaRDD, str);
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1604262682:
                if (implMethodName.equals("lambda$buildHoodieRecordsForImport$ecca0a96$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/utilities/HDFSParquetImporter") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                    HDFSParquetImporter hDFSParquetImporter = (HDFSParquetImporter) serializedLambda.getCapturedArg(0);
                    return obj -> {
                        GenericRecord genericRecord = (GenericRecord) ((Tuple2) obj)._2();
                        Object obj = genericRecord.get(this.cfg.partitionKey);
                        if (obj == null) {
                            throw new HoodieIOException("partition key is missing. :" + this.cfg.partitionKey);
                        }
                        Object obj2 = genericRecord.get(this.cfg.rowKey);
                        if (obj2 == null) {
                            throw new HoodieIOException("row field is missing. :" + this.cfg.rowKey);
                        }
                        String obj3 = obj.toString();
                        LOG.debug("Row Key : " + obj2 + ", Partition Path is (" + obj3 + ")");
                        if (obj instanceof Number) {
                            try {
                                obj3 = PARTITION_FORMATTER.format(Instant.ofEpochMilli((long) (Double.parseDouble(obj.toString()) * 1000.0d)));
                            } catch (NumberFormatException e) {
                                LOG.warn("Unable to parse date from partition field. Assuming partition as (" + obj + ")");
                            }
                        }
                        return new HoodieAvroRecord(new HoodieKey(obj2.toString(), obj3), new HoodieJsonPayload(genericRecord.toString()));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
