/*
 * Decompiled with CFR 0.152.
 */
package com.logicalclocks.hsfs.spark.engine.hudi;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.lang3.EnumUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.SupportsUpgradeDowngrade;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

public class DeltaStreamerConfig
implements Serializable {
    private HoodieDeltaStreamer.Config deltaStreamerConfig(final Map<String, String> writeOptions) {
        HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
        cfg.targetBasePath = writeOptions.get("hoodie.base.path");
        cfg.targetTableName = writeOptions.get("hoodie.table.name");
        cfg.tableType = writeOptions.get("hoodie.datasource.write.storage.type");
        cfg.operation = writeOptions.containsKey("operation") && EnumUtils.isValidEnum(WriteOperationType.class, (String)writeOptions.get("operation")) ? WriteOperationType.valueOf((String)writeOptions.get("operation")) : WriteOperationType.UPSERT;
        if (writeOptions.containsKey("initialCheckPointString")) {
            cfg.checkpoint = writeOptions.get("initialCheckPointString");
        }
        cfg.enableHiveSync = true;
        cfg.sourceClassName = "com.logicalclocks.hsfs.spark.engine.hudi.DeltaStreamerKafkaSource";
        cfg.schemaProviderClassName = "com.logicalclocks.hsfs.spark.engine.hudi.DeltaStreamerSchemaProvider";
        if (writeOptions.get("minSyncIntervalSeconds") != null) {
            cfg.minSyncIntervalSeconds = Integer.parseInt(writeOptions.get("minSyncIntervalSeconds"));
            cfg.continuousMode = true;
        }
        cfg.sparkMaster = "yarn";
        cfg.transformerClassNames = new ArrayList<String>(){
            {
                this.add("com.logicalclocks.hsfs.spark.engine.hudi.DeltaStreamerTransformer");
            }
        };
        cfg.sourceOrderingField = writeOptions.get("sourceOrderingField");
        cfg.configs = new ArrayList<String>(){
            {
                writeOptions.entrySet().stream().filter(e -> !((String)e.getKey()).startsWith("kafka.")).forEach((? super T e) -> this.add((String)e.getKey() + "=" + (String)e.getValue()));
                writeOptions.entrySet().stream().filter(e -> ((String)e.getKey()).startsWith("kafka.")).forEach((? super T e) -> this.add(((String)e.getKey()).replace("kafka.", "") + "=" + (String)e.getValue()));
            }
        };
        return cfg;
    }

    public void streamToHoodieTable(Map<String, String> writeOptions, SparkSession spark) throws Exception {
        JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        this.migrateTable(writeOptions, javaSparkContext);
        HoodieDeltaStreamer deltaSync = new HoodieDeltaStreamer(this.deltaStreamerConfig(writeOptions), javaSparkContext);
        deltaSync.sync();
    }

    private void migrateTable(Map<String, String> writeOptions, JavaSparkContext javaSparkContext) {
        HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(javaSparkContext.hadoopConfiguration()).setBasePath(writeOptions.get("hoodie.base.path")).setLoadActiveTimelineOnLoad(false).build();
        if (metaClient.getTableConfig().contains(HoodieTableConfig.VERSION) && metaClient.getTableConfig().getTableVersion() != HoodieTableVersion.FIVE) {
            metaClient.getTableConfig().setValue("hoodie.datasource.write.operation", WriteOperationType.UPSERT.value());
            HoodieTableConfig.update((FileSystem)metaClient.getFs(), (Path)new Path(metaClient.getMetaPath()), (Properties)metaClient.getTableConfig().getProps());
            HoodieWriteConfig updatedConfig = HoodieWriteConfig.newBuilder().forTable(metaClient.getTableConfig().getTableName()).withPath(writeOptions.get("hoodie.base.path")).withRollbackUsingMarkers(true).withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER).build()).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
            new UpgradeDowngrade(metaClient, updatedConfig, (HoodieEngineContext)new HoodieSparkEngineContext(javaSparkContext), (SupportsUpgradeDowngrade)SparkUpgradeDowngradeHelper.getInstance()).run(HoodieTableVersion.FIVE, null);
        }
    }
}

