/*
 * Decompiled with CFR 0.152.
 */
package com.logicalclocks.hsfs.spark.engine.hudi;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Map;
import org.apache.commons.lang3.EnumUtils;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

public class DeltaStreamerConfig
implements Serializable {
    private HoodieDeltaStreamer.Config deltaStreamerConfig(final Map<String, String> writeOptions) {
        HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
        cfg.targetBasePath = writeOptions.get("hoodie.base.path");
        cfg.targetTableName = writeOptions.get("hoodie.table.name");
        cfg.tableType = writeOptions.get("hoodie.datasource.write.storage.type");
        cfg.operation = writeOptions.containsKey("operation") && EnumUtils.isValidEnum(WriteOperationType.class, (String)writeOptions.get("operation")) ? WriteOperationType.valueOf((String)writeOptions.get("operation")) : WriteOperationType.UPSERT;
        if (writeOptions.containsKey("initialCheckPointString")) {
            cfg.checkpoint = writeOptions.get("initialCheckPointString");
        }
        cfg.enableHiveSync = true;
        cfg.sourceClassName = "com.logicalclocks.hsfs.spark.engine.hudi.DeltaStreamerKafkaSource";
        cfg.schemaProviderClassName = "com.logicalclocks.hsfs.spark.engine.hudi.DeltaStreamerSchemaProvider";
        if (writeOptions.get("minSyncIntervalSeconds") != null) {
            cfg.minSyncIntervalSeconds = Integer.parseInt(writeOptions.get("minSyncIntervalSeconds"));
            cfg.continuousMode = true;
        }
        cfg.sparkMaster = "yarn";
        cfg.transformerClassNames = new ArrayList<String>(){
            {
                this.add("com.logicalclocks.hsfs.spark.engine.hudi.DeltaStreamerTransformer");
            }
        };
        cfg.sourceOrderingField = writeOptions.get("sourceOrderingField");
        cfg.configs = new ArrayList<String>(){
            {
                writeOptions.entrySet().stream().filter(e -> !((String)e.getKey()).startsWith("kafka.")).forEach((? super T e) -> this.add((String)e.getKey() + "=" + (String)e.getValue()));
                writeOptions.entrySet().stream().filter(e -> ((String)e.getKey()).startsWith("kafka.")).forEach((? super T e) -> this.add(((String)e.getKey()).replace("kafka.", "") + "=" + (String)e.getValue()));
            }
        };
        return cfg;
    }

    public void streamToHoodieTable(Map<String, String> writeOptions, SparkSession spark) throws Exception {
        HoodieDeltaStreamer deltaSync = new HoodieDeltaStreamer(this.deltaStreamerConfig(writeOptions), JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext()));
        deltaSync.sync();
    }
}

