/*
 * Decompiled with CFR 0.152.
 */
package com.logicalclocks.hsfs.spark.engine.hudi;

import com.logicalclocks.hsfs.Feature;
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.FeatureGroupCommit;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.HudiOperationType;
import com.logicalclocks.hsfs.constructor.FeatureGroupAlias;
import com.logicalclocks.hsfs.engine.FeatureGroupUtils;
import com.logicalclocks.hsfs.metadata.FeatureGroupApi;
import com.logicalclocks.hsfs.spark.FeatureGroup;
import com.logicalclocks.hsfs.spark.FeatureStore;
import com.logicalclocks.hsfs.spark.StreamFeatureGroup;
import com.logicalclocks.hsfs.spark.engine.hudi.DeltaStreamerConfig;
import java.io.IOException;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.HoodieDataSourceHelpers;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.parquet.Strings;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.json.JSONArray;
import scala.collection.Seq;

public class HudiEngine {
    public static final String HUDI_SPARK_FORMAT = "org.apache.hudi";
    protected static final String HUDI_BASE_PATH = "hoodie.base.path";
    protected static final String HUDI_TABLE_NAME = "hoodie.table.name";
    protected static final String HUDI_TABLE_STORAGE_TYPE = "hoodie.datasource.write.storage.type";
    protected static final String HUDI_TABLE_OPERATION = "hoodie.datasource.write.operation";
    protected static final String HUDI_KEY_GENERATOR_OPT_KEY = "hoodie.datasource.write.keygenerator.class";
    protected static final String HUDI_COMPLEX_KEY_GENERATOR_OPT_VAL = "org.apache.hudi.keygen.CustomKeyGenerator";
    protected static final String HUDI_RECORD_KEY = "hoodie.datasource.write.recordkey.field";
    protected static final String HUDI_PARTITION_FIELD = "hoodie.datasource.write.partitionpath.field";
    protected static final String HUDI_PRECOMBINE_FIELD = "hoodie.datasource.write.precombine.field";
    protected static final String HUDI_HIVE_SYNC_ENABLE = "hoodie.datasource.hive_sync.enable";
    protected static final String HUDI_HIVE_SYNC_TABLE = "hoodie.datasource.hive_sync.table";
    protected static final String HUDI_HIVE_SYNC_DB = "hoodie.datasource.hive_sync.database";
    protected static final String HUDI_HIVE_SYNC_MODE = "hoodie.datasource.hive_sync.mode";
    protected static final String HUDI_HIVE_SYNC_MODE_VAL = "hms";
    protected static final String HUDI_HIVE_SYNC_PARTITION_FIELDS = "hoodie.datasource.hive_sync.partition_fields";
    protected static final String HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY = "hoodie.datasource.hive_sync.partition_extractor_class";
    private static final String HUDI_HIVE_SYNC_SUPPORT_TIMESTAMP = "hoodie.datasource.hive_sync.support_timestamp";
    protected static final String DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL = "org.apache.hudi.hive.MultiPartKeysValueExtractor";
    protected static final String HIVE_NON_PARTITION_EXTRACTOR_CLASS_OPT_VAL = "org.apache.hudi.hive.NonPartitionedExtractor";
    protected static final String HIVE_AUTO_CREATE_DATABASE_OPT_KEY = "hoodie.datasource.hive_sync.auto_create_database";
    protected static final String HIVE_AUTO_CREATE_DATABASE_OPT_VAL = "false";
    protected static final String HUDI_COPY_ON_WRITE = "COPY_ON_WRITE";
    protected static final String HUDI_QUERY_TYPE_OPT_KEY = "hoodie.datasource.query.type";
    protected static final String HUDI_QUERY_TYPE_INCREMENTAL_OPT_VAL = "incremental";
    protected static final String HUDI_QUERY_TYPE_SNAPSHOT_OPT_VAL = "snapshot";
    protected static final String HUDI_QUERY_TIME_TRAVEL_AS_OF_INSTANT = "as.of.instant";
    protected static final String HUDI_BEGIN_INSTANTTIME_OPT_KEY = "hoodie.datasource.read.begin.instanttime";
    protected static final String HUDI_END_INSTANTTIME_OPT_KEY = "hoodie.datasource.read.end.instanttime";
    protected static final String HUDI_WRITE_INSERT_DROP_DUPLICATES = "hoodie.datasource.write.insert.drop.duplicates";
    protected static final String PAYLOAD_CLASS_OPT_KEY = "hoodie.datasource.write.payload.class";
    protected static final String PAYLOAD_CLASS_OPT_VAL = "org.apache.hudi.common.model.EmptyHoodieRecordPayload";
    protected static final String HUDI_KAFKA_TOPIC = "hoodie.deltastreamer.source.kafka.topic";
    protected static final String COMMIT_METADATA_KEYPREFIX_OPT_KEY = "hoodie.datasource.write.commitmeta.key.prefix";
    protected static final String DELTASTREAMER_CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
    protected static final String INITIAL_CHECKPOINT_STRING = "initialCheckPointString";
    protected static final String FEATURE_GROUP_SCHEMA = "com.logicalclocks.hsfs.spark.StreamFeatureGroup.avroSchema";
    protected static final String FEATURE_GROUP_ENCODED_SCHEMA = "com.logicalclocks.hsfs.spark.StreamFeatureGroup.encodedAvroSchema";
    protected static final String FEATURE_GROUP_COMPLEX_FEATURES = "com.logicalclocks.hsfs.spark.StreamFeatureGroup.complexFeatures";
    protected static final String KAFKA_SOURCE = "com.logicalclocks.hsfs.spark.engine.hudi.DeltaStreamerKafkaSource";
    protected static final String SCHEMA_PROVIDER = "com.logicalclocks.hsfs.spark.engine.hudi.DeltaStreamerSchemaProvider";
    protected static final String DELTA_STREAMER_TRANSFORMER = "com.logicalclocks.hsfs.spark.engine.hudi.DeltaStreamerTransformer";
    protected static final String DELTA_SOURCE_ORDERING_FIELD_OPT_KEY = "sourceOrderingField";
    protected static final String MIN_SYNC_INTERVAL_SECONDS = "minSyncIntervalSeconds";
    protected static final String SPARK_MASTER = "yarn";
    protected static final String PROJECT_ID = "projectId";
    protected static final String FEATURE_STORE_NAME = "featureStoreName";
    protected static final String SUBJECT_ID = "subjectId";
    protected static final String FEATURE_GROUP_ID = "featureGroupId";
    protected static final String FEATURE_GROUP_NAME = "featureGroupName";
    protected static final String FEATURE_GROUP_VERSION = "featureGroupVersion";
    protected static final String FUNCTION_TYPE = "functionType";
    protected static final String STREAMING_QUERY = "streamingQuery";
    private static final Map<String, String> HUDI_DEFAULT_PARALLELISM = new HashMap<String, String>(){
        {
            this.put("hoodie.bulkinsert.shuffle.parallelism", "5");
            this.put("hoodie.insert.shuffle.parallelism", "5");
            this.put("hoodie.upsert.shuffle.parallelism", "5");
        }
    };
    private static HudiEngine INSTANCE = null;
    private FeatureGroupUtils utils = new FeatureGroupUtils();
    private FeatureGroupApi featureGroupApi = new FeatureGroupApi();
    private FeatureGroupCommit fgCommitMetadata = new FeatureGroupCommit();
    private DeltaStreamerConfig deltaStreamerConfig = new DeltaStreamerConfig();

    public static synchronized HudiEngine getInstance() {
        if (INSTANCE == null) {
            INSTANCE = new HudiEngine();
        }
        return INSTANCE;
    }

    private HudiEngine() {
    }

    public void saveHudiFeatureGroup(SparkSession sparkSession, FeatureGroupBase featureGroup, Dataset<Row> dataset, HudiOperationType operation, Map<String, String> writeOptions, Integer validationId) throws IOException, FeatureStoreException, ParseException {
        Map<String, String> hudiArgs = this.setupHudiWriteOpts(featureGroup, operation, writeOptions);
        dataset.write().format(HUDI_SPARK_FORMAT).options(hudiArgs).mode(SaveMode.Append).save(featureGroup.getLocation());
        FeatureGroupCommit fgCommit = this.getLastCommitMetadata(sparkSession, featureGroup.getLocation());
        if (fgCommit != null) {
            fgCommit.setValidationId(validationId);
            this.featureGroupApi.featureGroupCommit(featureGroup, fgCommit);
        }
    }

    public FeatureGroupCommit deleteRecord(SparkSession sparkSession, FeatureGroupBase featureGroup, Dataset<Row> deleteDF, Map<String, String> writeOptions) throws IOException, FeatureStoreException, ParseException {
        Map<String, String> hudiArgs = this.setupHudiWriteOpts(featureGroup, HudiOperationType.UPSERT, writeOptions);
        hudiArgs.put(PAYLOAD_CLASS_OPT_KEY, PAYLOAD_CLASS_OPT_VAL);
        deleteDF.write().format(HUDI_SPARK_FORMAT).options(hudiArgs).mode(SaveMode.Append).save(featureGroup.getLocation());
        FeatureGroupCommit fgCommit = this.getLastCommitMetadata(sparkSession, featureGroup.getLocation());
        if (fgCommit != null) {
            FeatureGroupCommit apiFgCommit = this.featureGroupApi.featureGroupCommit(featureGroup, fgCommit);
            apiFgCommit.setCommitID(apiFgCommit.getCommitID());
            return apiFgCommit;
        }
        throw new FeatureStoreException("No commit information was found for this feature group");
    }

    private FeatureGroupCommit getLastCommitMetadata(SparkSession sparkSession, String basePath) throws IOException, FeatureStoreException, ParseException {
        FileSystem hopsfsConf = FileSystem.get((Configuration)sparkSession.sparkContext().hadoopConfiguration());
        HoodieTimeline commitTimeline = HoodieDataSourceHelpers.allCompletedCommitsCompactions((FileSystem)hopsfsConf, (String)basePath);
        Option lastInstant = commitTimeline.lastInstant();
        if (lastInstant.isPresent()) {
            this.fgCommitMetadata.setCommitDateString(((HoodieInstant)lastInstant.get()).getTimestamp());
            this.fgCommitMetadata.setCommitTime(FeatureGroupUtils.getTimeStampFromDateString((String)((HoodieInstant)lastInstant.get()).getTimestamp()));
            this.fgCommitMetadata.setLastActiveCommitTime(FeatureGroupUtils.getTimeStampFromDateString((String)((HoodieInstant)commitTimeline.firstInstant().get()).getTimestamp()));
            byte[] commitsToReturn = (byte[])commitTimeline.getInstantDetails((HoodieInstant)lastInstant.get()).get();
            HoodieCommitMetadata commitMetadata = (HoodieCommitMetadata)HoodieCommitMetadata.fromBytes((byte[])commitsToReturn, HoodieCommitMetadata.class);
            this.fgCommitMetadata.setRowsUpdated(Long.valueOf(commitMetadata.fetchTotalUpdateRecordsWritten()));
            this.fgCommitMetadata.setRowsInserted(Long.valueOf(commitMetadata.fetchTotalInsertRecordsWritten()));
            this.fgCommitMetadata.setRowsDeleted(Long.valueOf(commitMetadata.getTotalRecordsDeleted()));
            return this.fgCommitMetadata;
        }
        return null;
    }

    private Map<String, String> setupHudiWriteOpts(FeatureGroupBase featureGroup, HudiOperationType operation, Map<String, String> writeOptions) throws FeatureStoreException {
        HashMap<String, String> hudiArgs = new HashMap<String, String>();
        hudiArgs.put(HUDI_TABLE_STORAGE_TYPE, HUDI_COPY_ON_WRITE);
        hudiArgs.put(HUDI_KEY_GENERATOR_OPT_KEY, HUDI_COMPLEX_KEY_GENERATOR_OPT_VAL);
        String primaryColumns = this.utils.getPrimaryColumns(featureGroup).mkString(",");
        if (!Strings.isNullOrEmpty((String)featureGroup.getEventTime())) {
            primaryColumns = primaryColumns + "," + featureGroup.getEventTime();
        }
        hudiArgs.put(HUDI_RECORD_KEY, primaryColumns);
        String tableName = this.utils.getFgName(featureGroup);
        hudiArgs.put(HUDI_TABLE_NAME, tableName);
        Seq partitionColumns = this.utils.getPartitionColumns(featureGroup);
        if (!partitionColumns.isEmpty()) {
            hudiArgs.put(HUDI_PARTITION_FIELD, partitionColumns.mkString(":SIMPLE,") + ":SIMPLE");
            hudiArgs.put(HUDI_HIVE_SYNC_PARTITION_FIELDS, partitionColumns.mkString(","));
            hudiArgs.put(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL);
        } else {
            hudiArgs.put(HUDI_PARTITION_FIELD, "");
            hudiArgs.put(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, HIVE_NON_PARTITION_EXTRACTOR_CLASS_OPT_VAL);
        }
        List features = featureGroup.getFeatures();
        String precombineKey = features.stream().filter(Feature::getHudiPrecombineKey).findFirst().orElseThrow(() -> new FeatureStoreException("Can't find hudi precombine key")).getName();
        hudiArgs.put(HUDI_PRECOMBINE_FIELD, precombineKey);
        hudiArgs.put(HUDI_HIVE_SYNC_ENABLE, "true");
        hudiArgs.put(HUDI_HIVE_SYNC_MODE, HUDI_HIVE_SYNC_MODE_VAL);
        hudiArgs.put(HUDI_HIVE_SYNC_TABLE, tableName);
        hudiArgs.put(HUDI_HIVE_SYNC_DB, featureGroup.getFeatureStore().getName());
        hudiArgs.put(HIVE_AUTO_CREATE_DATABASE_OPT_KEY, HIVE_AUTO_CREATE_DATABASE_OPT_VAL);
        hudiArgs.put(HUDI_HIVE_SYNC_SUPPORT_TIMESTAMP, "true");
        if (operation != null) {
            hudiArgs.put(HUDI_TABLE_OPERATION, operation.getValue());
        }
        hudiArgs.putAll(HUDI_DEFAULT_PARALLELISM);
        if (writeOptions != null && !writeOptions.isEmpty()) {
            hudiArgs.putAll(writeOptions);
        }
        return hudiArgs;
    }

    public Map<String, String> setupHudiReadOpts(Long startTimestamp, Long endTimestamp, Map<String, String> readOptions) {
        HashMap<String, String> hudiArgs = new HashMap<String, String>();
        if (endTimestamp == null && (startTimestamp == null || startTimestamp == 0L)) {
            hudiArgs.put(HUDI_QUERY_TYPE_OPT_KEY, HUDI_QUERY_TYPE_SNAPSHOT_OPT_VAL);
        } else if (endTimestamp != null && (startTimestamp == null || startTimestamp == 0L)) {
            hudiArgs.put(HUDI_QUERY_TYPE_OPT_KEY, HUDI_QUERY_TYPE_SNAPSHOT_OPT_VAL);
            hudiArgs.put(HUDI_QUERY_TIME_TRAVEL_AS_OF_INSTANT, this.utils.timeStampToHudiFormat(endTimestamp));
        } else if (endTimestamp == null && startTimestamp != null) {
            hudiArgs.put(HUDI_QUERY_TYPE_OPT_KEY, HUDI_QUERY_TYPE_INCREMENTAL_OPT_VAL);
            hudiArgs.put(HUDI_BEGIN_INSTANTTIME_OPT_KEY, this.utils.timeStampToHudiFormat(startTimestamp));
        } else {
            hudiArgs.put(HUDI_QUERY_TYPE_OPT_KEY, HUDI_QUERY_TYPE_INCREMENTAL_OPT_VAL);
            hudiArgs.put(HUDI_BEGIN_INSTANTTIME_OPT_KEY, this.utils.timeStampToHudiFormat(startTimestamp));
            hudiArgs.put(HUDI_END_INSTANTTIME_OPT_KEY, this.utils.timeStampToHudiFormat(endTimestamp));
        }
        if (readOptions != null && !readOptions.isEmpty()) {
            hudiArgs.putAll(readOptions);
        }
        return hudiArgs;
    }

    private void createEmptyTable(SparkSession sparkSession, StreamFeatureGroup streamFeatureGroup) throws IOException, FeatureStoreException {
        Configuration configuration = sparkSession.sparkContext().hadoopConfiguration();
        Properties properties = new Properties();
        properties.putAll(this.setupHudiWriteOpts(streamFeatureGroup, null, null));
        HoodieTableMetaClient.initTableAndGetMetaClient((Configuration)configuration, (String)streamFeatureGroup.getLocation(), (Properties)properties);
    }

    public void reconcileHudiSchema(SparkSession sparkSession, FeatureGroupAlias featureGroupAlias, Map<String, String> hudiArgs) throws FeatureStoreException {
        String[] hiveSchema;
        String fgTableName = this.utils.getTableName(featureGroupAlias.getFeatureGroup());
        String[] hudiSchema = sparkSession.table(featureGroupAlias.getAlias()).columns();
        if (!this.sparkSchemasMatch(hudiSchema, hiveSchema = sparkSession.table(fgTableName).columns())) {
            Dataset dataframe = sparkSession.table(fgTableName).limit(0);
            FeatureStore featureStore = (FeatureStore)featureGroupAlias.getFeatureGroup().getFeatureStore();
            try {
                FeatureGroup fullFG = featureStore.getFeatureGroup(featureGroupAlias.getFeatureGroup().getName(), featureGroupAlias.getFeatureGroup().getVersion());
                this.saveHudiFeatureGroup(sparkSession, fullFG, (Dataset<Row>)dataframe, HudiOperationType.UPSERT, new HashMap<String, String>(), null);
            }
            catch (IOException | ParseException e) {
                throw new FeatureStoreException("Error while reconciling HUDI schema.", (Throwable)e);
            }
            sparkSession.read().format(HUDI_SPARK_FORMAT).options(hudiArgs).load(featureGroupAlias.getFeatureGroup().getLocation()).createOrReplaceTempView(featureGroupAlias.getAlias());
        }
    }

    public boolean sparkSchemasMatch(String[] schema1, String[] schema2) {
        if (schema1 == null || schema2 == null) {
            return false;
        }
        if (schema1.length != schema2.length) {
            return false;
        }
        Arrays.sort(schema1);
        Arrays.sort(schema2);
        for (int i = 0; i < schema1.length; ++i) {
            if (schema1[i].equals(schema2[i])) continue;
            return false;
        }
        return true;
    }

    public void streamToHoodieTable(SparkSession sparkSession, StreamFeatureGroup streamFeatureGroup, Map<String, String> writeOptions) throws Exception {
        Map<String, String> hudiWriteOpts = this.setupHudiWriteOpts(streamFeatureGroup, HudiOperationType.UPSERT, writeOptions);
        hudiWriteOpts.put(PROJECT_ID, String.valueOf(streamFeatureGroup.getFeatureStore().getProjectId()));
        hudiWriteOpts.put(FEATURE_STORE_NAME, streamFeatureGroup.getFeatureStore().getName());
        hudiWriteOpts.put(SUBJECT_ID, String.valueOf(streamFeatureGroup.getSubject().getId()));
        hudiWriteOpts.put(FEATURE_GROUP_ID, String.valueOf(streamFeatureGroup.getId()));
        hudiWriteOpts.put(FEATURE_GROUP_NAME, streamFeatureGroup.getName());
        hudiWriteOpts.put(FEATURE_GROUP_VERSION, String.valueOf(streamFeatureGroup.getVersion()));
        hudiWriteOpts.put(HUDI_TABLE_NAME, this.utils.getFgName((FeatureGroupBase)streamFeatureGroup));
        hudiWriteOpts.put(HUDI_BASE_PATH, streamFeatureGroup.getLocation());
        hudiWriteOpts.put(HUDI_KAFKA_TOPIC, streamFeatureGroup.getOnlineTopicName());
        hudiWriteOpts.put(FEATURE_GROUP_SCHEMA, streamFeatureGroup.getAvroSchema());
        hudiWriteOpts.put(FEATURE_GROUP_ENCODED_SCHEMA, streamFeatureGroup.getEncodedAvroSchema());
        hudiWriteOpts.put(FEATURE_GROUP_COMPLEX_FEATURES, new JSONArray((Collection)streamFeatureGroup.getComplexFeatures()).toString());
        hudiWriteOpts.put(DELTA_SOURCE_ORDERING_FIELD_OPT_KEY, hudiWriteOpts.get(HUDI_PRECOMBINE_FIELD));
        hudiWriteOpts.put("group.id", String.valueOf(streamFeatureGroup.getId()));
        Path basePath = new Path(streamFeatureGroup.getLocation());
        FileSystem fs = basePath.getFileSystem(sparkSession.sparkContext().hadoopConfiguration());
        if (!fs.exists(new Path(basePath, ".hoodie"))) {
            this.createEmptyTable(sparkSession, streamFeatureGroup);
            hudiWriteOpts.put("auto.offset.reset", "earliest");
        }
        if (this.getLastCommitMetadata(sparkSession, streamFeatureGroup.getLocation()) == null) {
            hudiWriteOpts.put("auto.offset.reset", "earliest");
        }
        this.deltaStreamerConfig.streamToHoodieTable(hudiWriteOpts, sparkSession);
        FeatureGroupCommit fgCommit = this.getLastCommitMetadata(sparkSession, streamFeatureGroup.getLocation());
        if (fgCommit != null) {
            this.featureGroupApi.featureGroupCommit((FeatureGroupBase)streamFeatureGroup, fgCommit);
            streamFeatureGroup.computeStatistics();
        }
    }
}

