/*
 * Decompiled with CFR 0.152.
 */
package com.logicalclocks.hsfs.engine;

import com.logicalclocks.hsfs.Feature;
import com.logicalclocks.hsfs.FeatureGroup;
import com.logicalclocks.hsfs.FeatureGroupCommit;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.HudiOperationType;
import com.logicalclocks.hsfs.engine.Utils;
import com.logicalclocks.hsfs.metadata.FeatureGroupApi;
import java.io.IOException;
import java.sql.Timestamp;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.HoodieDataSourceHelpers;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import scala.collection.Seq;

public class HudiEngine {
    private static final String HUDI_SPARK_FORMAT = "org.apache.hudi";
    private static final String HUDI_TABLE_NAME = "hoodie.table.name";
    private static final String HUDI_TABLE_STORAGE_TYPE = "hoodie.datasource.write.storage.type";
    private static final String HUDI_TABLE_OPERATION = "hoodie.datasource.write.operation";
    private static final String HUDI_KEY_GENERATOR_OPT_KEY = "hoodie.datasource.write.keygenerator.class";
    private static final String HUDI_COMPLEX_KEY_GENERATOR_OPT_VAL = "org.apache.hudi.keygen.CustomKeyGenerator";
    private static final String HUDI_RECORD_KEY = "hoodie.datasource.write.recordkey.field";
    private static final String HUDI_PARTITION_FIELD = "hoodie.datasource.write.partitionpath.field";
    private static final String HUDI_PRECOMBINE_FIELD = "hoodie.datasource.write.precombine.field";
    private static final String HUDI_HIVE_SYNC_ENABLE = "hoodie.datasource.hive_sync.enable";
    private static final String HUDI_HIVE_SYNC_TABLE = "hoodie.datasource.hive_sync.table";
    private static final String HUDI_HIVE_SYNC_DB = "hoodie.datasource.hive_sync.database";
    private static final String HUDI_HIVE_SYNC_JDBC_URL = "hoodie.datasource.hive_sync.jdbcurl";
    private static final String HUDI_HIVE_SYNC_PARTITION_FIELDS = "hoodie.datasource.hive_sync.partition_fields";
    private static final String HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY = "hoodie.datasource.hive_sync.partition_extractor_class";
    private static final String DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL = "org.apache.hudi.hive.MultiPartKeysValueExtractor";
    private static final String HIVE_NON_PARTITION_EXTRACTOR_CLASS_OPT_VAL = "org.apache.hudi.hive.NonPartitionedExtractor";
    private static final String HIVE_AUTO_CREATE_DATABASE_OPT_KEY = "hoodie.datasource.hive_sync.auto_create_database";
    private static final String HIVE_AUTO_CREATE_DATABASE_OPT_VAL = "false";
    private static final String HUDI_COPY_ON_WRITE = "COPY_ON_WRITE";
    private static final String HUDI_QUERY_TYPE_OPT_KEY = "hoodie.datasource.query.type";
    private static final String HUDI_QUERY_TYPE_INCREMENTAL_OPT_VAL = "incremental";
    private static final String HUDI_BEGIN_INSTANTTIME_OPT_KEY = "hoodie.datasource.read.begin.instanttime";
    private static final String HUDI_END_INSTANTTIME_OPT_KEY = "hoodie.datasource.read.end.instanttime";
    private static final String PAYLOAD_CLASS_OPT_KEY = "hoodie.datasource.write.payload.class";
    private static final String PAYLOAD_CLASS_OPT_VAL = "org.apache.hudi.common.model.EmptyHoodieRecordPayload";
    private Utils utils = new Utils();
    private FeatureGroupApi featureGroupApi = new FeatureGroupApi();
    private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
    private FeatureGroupCommit fgCommitMetadata = new FeatureGroupCommit();

    public void saveHudiFeatureGroup(SparkSession sparkSession, FeatureGroup featureGroup, Dataset<Row> dataset, HudiOperationType operation, Map<String, String> writeOptions, Integer validationId) throws IOException, FeatureStoreException, ParseException {
        Map<String, String> hudiArgs = this.setupHudiWriteOpts(featureGroup, operation, writeOptions);
        dataset.write().format(HUDI_SPARK_FORMAT).options(hudiArgs).mode(SaveMode.Append).save(featureGroup.getLocation());
        FeatureGroupCommit fgCommit = this.getLastCommitMetadata(sparkSession, featureGroup.getLocation());
        fgCommit.setValidationId(validationId);
        this.featureGroupApi.featureGroupCommit(featureGroup, fgCommit);
    }

    public FeatureGroupCommit deleteRecord(SparkSession sparkSession, FeatureGroup featureGroup, Dataset<Row> deleteDF, Map<String, String> writeOptions) throws IOException, FeatureStoreException, ParseException {
        Map<String, String> hudiArgs = this.setupHudiWriteOpts(featureGroup, HudiOperationType.UPSERT, writeOptions);
        hudiArgs.put(PAYLOAD_CLASS_OPT_KEY, PAYLOAD_CLASS_OPT_VAL);
        deleteDF.write().format(HUDI_SPARK_FORMAT).options(hudiArgs).mode(SaveMode.Append).save(featureGroup.getLocation());
        FeatureGroupCommit fgCommit = this.getLastCommitMetadata(sparkSession, featureGroup.getLocation());
        FeatureGroupCommit apiFgCommit = this.featureGroupApi.featureGroupCommit(featureGroup, fgCommit);
        apiFgCommit.setCommitID(apiFgCommit.getCommitID());
        return apiFgCommit;
    }

    public void registerTemporaryTable(SparkSession sparkSession, FeatureGroup featureGroup, String alias, Long startTimestamp, Long endTimestamp, Map<String, String> readOptions) {
        Map<String, String> hudiArgs = this.setupHudiReadOpts(startTimestamp, endTimestamp, readOptions);
        sparkSession.read().format(HUDI_SPARK_FORMAT).options(hudiArgs).load(featureGroup.getLocation()).createOrReplaceTempView(alias);
    }

    private FeatureGroupCommit getLastCommitMetadata(SparkSession sparkSession, String basePath) throws IOException, FeatureStoreException, ParseException {
        FileSystem hopsfsConf = FileSystem.get((Configuration)sparkSession.sparkContext().hadoopConfiguration());
        HoodieTimeline commitTimeline = HoodieDataSourceHelpers.allCompletedCommitsCompactions((FileSystem)hopsfsConf, (String)basePath);
        this.fgCommitMetadata.setCommitDateString(((HoodieInstant)commitTimeline.lastInstant().get()).getTimestamp());
        this.fgCommitMetadata.setCommitTime(this.utils.getTimeStampFromDateString(((HoodieInstant)commitTimeline.lastInstant().get()).getTimestamp()));
        byte[] commitsToReturn = (byte[])commitTimeline.getInstantDetails((HoodieInstant)commitTimeline.lastInstant().get()).get();
        HoodieCommitMetadata commitMetadata = (HoodieCommitMetadata)HoodieCommitMetadata.fromBytes((byte[])commitsToReturn, HoodieCommitMetadata.class);
        this.fgCommitMetadata.setRowsUpdated(commitMetadata.fetchTotalUpdateRecordsWritten());
        this.fgCommitMetadata.setRowsInserted(commitMetadata.fetchTotalInsertRecordsWritten());
        this.fgCommitMetadata.setRowsDeleted(commitMetadata.getTotalRecordsDeleted());
        return this.fgCommitMetadata;
    }

    private Map<String, String> setupHudiWriteOpts(FeatureGroup featureGroup, HudiOperationType operation, Map<String, String> writeOptions) throws IOException, FeatureStoreException {
        HashMap<String, String> hudiArgs = new HashMap<String, String>();
        hudiArgs.put(HUDI_TABLE_STORAGE_TYPE, HUDI_COPY_ON_WRITE);
        hudiArgs.put(HUDI_KEY_GENERATOR_OPT_KEY, HUDI_COMPLEX_KEY_GENERATOR_OPT_VAL);
        Seq<String> primaryColumns = this.utils.getPrimaryColumns(featureGroup);
        hudiArgs.put(HUDI_RECORD_KEY, primaryColumns.mkString(","));
        String tableName = this.utils.getFgName(featureGroup);
        hudiArgs.put(HUDI_TABLE_NAME, tableName);
        Seq<String> partitionColumns = this.utils.getPartitionColumns(featureGroup);
        if (!partitionColumns.isEmpty()) {
            hudiArgs.put(HUDI_PARTITION_FIELD, partitionColumns.mkString(":SIMPLE,") + ":SIMPLE");
            hudiArgs.put(HUDI_HIVE_SYNC_PARTITION_FIELDS, partitionColumns.mkString(","));
            hudiArgs.put(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL);
        } else {
            hudiArgs.put(HUDI_PARTITION_FIELD, "");
            hudiArgs.put(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, HIVE_NON_PARTITION_EXTRACTOR_CLASS_OPT_VAL);
        }
        String precombineKey = featureGroup.getFeatures().stream().filter(Feature::getHudiPrecombineKey).findFirst().orElseThrow(() -> new FeatureStoreException("Can't find hudi precombine key")).getName();
        hudiArgs.put(HUDI_PRECOMBINE_FIELD, precombineKey);
        hudiArgs.put(HUDI_HIVE_SYNC_ENABLE, "true");
        hudiArgs.put(HUDI_HIVE_SYNC_TABLE, tableName);
        hudiArgs.put(HUDI_HIVE_SYNC_JDBC_URL, this.utils.getHiveServerConnection(featureGroup));
        hudiArgs.put(HUDI_HIVE_SYNC_DB, featureGroup.getFeatureStore().getName());
        hudiArgs.put(HIVE_AUTO_CREATE_DATABASE_OPT_KEY, HIVE_AUTO_CREATE_DATABASE_OPT_VAL);
        hudiArgs.put(HUDI_TABLE_OPERATION, operation.getValue());
        if (writeOptions != null && !writeOptions.isEmpty()) {
            hudiArgs.putAll(writeOptions);
        }
        return hudiArgs;
    }

    private Map<String, String> setupHudiReadOpts(Long startTimestamp, Long endTimestamp, Map<String, String> readOptions) {
        HashMap<String, String> hudiArgs = new HashMap<String, String>();
        if (startTimestamp != null) {
            hudiArgs.put(HUDI_BEGIN_INSTANTTIME_OPT_KEY, this.timeStampToHudiFormat(startTimestamp));
        } else {
            hudiArgs.put(HUDI_BEGIN_INSTANTTIME_OPT_KEY, this.timeStampToHudiFormat(0L));
        }
        hudiArgs.put(HUDI_END_INSTANTTIME_OPT_KEY, this.timeStampToHudiFormat(endTimestamp));
        hudiArgs.put(HUDI_QUERY_TYPE_OPT_KEY, HUDI_QUERY_TYPE_INCREMENTAL_OPT_VAL);
        if (readOptions != null && !readOptions.isEmpty()) {
            hudiArgs.putAll(readOptions);
        }
        return hudiArgs;
    }

    public String timeStampToHudiFormat(Long commitedOnTimeStamp) {
        Timestamp commitedOnDate = new Timestamp(commitedOnTimeStamp);
        return this.dateFormat.format(commitedOnDate);
    }
}

