package com.logicalclocks.hsfs.spark.engine.hudi;

import com.damnhandy.uri.template.UriTemplate;
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.FeatureGroupCommit;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.HudiOperationType;
import com.logicalclocks.hsfs.constructor.FeatureGroupAlias;
import com.logicalclocks.hsfs.engine.FeatureGroupUtils;
import com.logicalclocks.hsfs.metadata.FeatureGroupApi;
import com.logicalclocks.hsfs.metadata.KafkaApi;
import com.logicalclocks.hsfs.metadata.StorageConnectorApi;
import com.logicalclocks.hsfs.spark.FeatureStore;
import com.logicalclocks.hsfs.spark.StreamFeatureGroup;
import java.io.IOException;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.HoodieDataSourceHelpers;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.parquet.Strings;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.json.JSONArray;
import scala.collection.Seq;

/* loaded from: input_file:com/logicalclocks/hsfs/spark/engine/hudi/HudiEngine.class */
public class HudiEngine {
    public static final String HUDI_SPARK_FORMAT = "org.apache.hudi";
    protected static final String HUDI_BASE_PATH = "hoodie.base.path";
    protected static final String HUDI_TABLE_NAME = "hoodie.table.name";
    protected static final String HUDI_TABLE_STORAGE_TYPE = "hoodie.datasource.write.storage.type";
    protected static final String HUDI_TABLE_OPERATION = "hoodie.datasource.write.operation";
    protected static final String HUDI_KEY_GENERATOR_OPT_KEY = "hoodie.datasource.write.keygenerator.class";
    protected static final String HUDI_COMPLEX_KEY_GENERATOR_OPT_VAL = "org.apache.hudi.keygen.CustomKeyGenerator";
    protected static final String HUDI_RECORD_KEY = "hoodie.datasource.write.recordkey.field";
    protected static final String HUDI_PARTITION_FIELD = "hoodie.datasource.write.partitionpath.field";
    protected static final String HUDI_PRECOMBINE_FIELD = "hoodie.datasource.write.precombine.field";
    protected static final String HUDI_HIVE_SYNC_ENABLE = "hoodie.datasource.hive_sync.enable";
    protected static final String HUDI_HIVE_SYNC_TABLE = "hoodie.datasource.hive_sync.table";
    protected static final String HUDI_HIVE_SYNC_DB = "hoodie.datasource.hive_sync.database";
    protected static final String HUDI_HIVE_SYNC_MODE = "hoodie.datasource.hive_sync.mode";
    protected static final String HUDI_HIVE_SYNC_MODE_VAL = "hms";
    protected static final String HUDI_HIVE_SYNC_PARTITION_FIELDS = "hoodie.datasource.hive_sync.partition_fields";
    protected static final String HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY = "hoodie.datasource.hive_sync.partition_extractor_class";
    private static final String HUDI_HIVE_SYNC_SUPPORT_TIMESTAMP = "hoodie.datasource.hive_sync.support_timestamp";
    protected static final String DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL = "org.apache.hudi.hive.MultiPartKeysValueExtractor";
    protected static final String HIVE_NON_PARTITION_EXTRACTOR_CLASS_OPT_VAL = "org.apache.hudi.hive.NonPartitionedExtractor";
    protected static final String HIVE_AUTO_CREATE_DATABASE_OPT_KEY = "hoodie.datasource.hive_sync.auto_create_database";
    protected static final String HIVE_AUTO_CREATE_DATABASE_OPT_VAL = "false";
    protected static final String HUDI_COPY_ON_WRITE = "COPY_ON_WRITE";
    protected static final String HUDI_QUERY_TYPE_OPT_KEY = "hoodie.datasource.query.type";
    protected static final String HUDI_QUERY_TYPE_INCREMENTAL_OPT_VAL = "incremental";
    protected static final String HUDI_QUERY_TYPE_SNAPSHOT_OPT_VAL = "snapshot";
    protected static final String HUDI_QUERY_TIME_TRAVEL_AS_OF_INSTANT = "as.of.instant";
    protected static final String HUDI_BEGIN_INSTANTTIME_OPT_KEY = "hoodie.datasource.read.begin.instanttime";
    protected static final String HUDI_END_INSTANTTIME_OPT_KEY = "hoodie.datasource.read.end.instanttime";
    protected static final String HUDI_WRITE_INSERT_DROP_DUPLICATES = "hoodie.datasource.write.insert.drop.duplicates";
    protected static final String PAYLOAD_CLASS_OPT_KEY = "hoodie.datasource.write.payload.class";
    protected static final String PAYLOAD_CLASS_OPT_VAL = "org.apache.hudi.common.model.EmptyHoodieRecordPayload";
    protected static final String HUDI_KAFKA_TOPIC = "hoodie.deltastreamer.source.kafka.topic";
    protected static final String COMMIT_METADATA_KEYPREFIX_OPT_KEY = "hoodie.datasource.write.commitmeta.key.prefix";
    protected static final String DELTASTREAMER_CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
    protected static final String INITIAL_CHECKPOINT_STRING = "initialCheckPointString";
    protected static final String FEATURE_GROUP_SCHEMA = "com.logicalclocks.hsfs.spark.StreamFeatureGroup.avroSchema";
    protected static final String FEATURE_GROUP_ENCODED_SCHEMA = "com.logicalclocks.hsfs.spark.StreamFeatureGroup.encodedAvroSchema";
    protected static final String FEATURE_GROUP_COMPLEX_FEATURES = "com.logicalclocks.hsfs.spark.StreamFeatureGroup.complexFeatures";
    protected static final String KAFKA_SOURCE = "com.logicalclocks.hsfs.spark.engine.hudi.DeltaStreamerKafkaSource";
    protected static final String SCHEMA_PROVIDER = "com.logicalclocks.hsfs.spark.engine.hudi.DeltaStreamerSchemaProvider";
    protected static final String DELTA_STREAMER_TRANSFORMER = "com.logicalclocks.hsfs.spark.engine.hudi.DeltaStreamerTransformer";
    protected static final String DELTA_SOURCE_ORDERING_FIELD_OPT_KEY = "sourceOrderingField";
    protected static final String MIN_SYNC_INTERVAL_SECONDS = "minSyncIntervalSeconds";
    protected static final String SPARK_MASTER = "yarn";
    protected static final String PROJECT_ID = "projectId";
    protected static final String FEATURE_STORE_NAME = "featureStoreName";
    protected static final String SUBJECT_ID = "subjectId";
    protected static final String FEATURE_GROUP_NAME = "featureGroupName";
    protected static final String FEATURE_GROUP_VERSION = "featureGroupVersion";
    protected static final String FUNCTION_TYPE = "functionType";
    protected static final String STREAMING_QUERY = "streamingQuery";
    private static final Map<String, String> HUDI_DEFAULT_PARALLELISM = new HashMap<String, String>() { // from class: com.logicalclocks.hsfs.spark.engine.hudi.HudiEngine.1
        {
            put("hoodie.bulkinsert.shuffle.parallelism", "5");
            put("hoodie.insert.shuffle.parallelism", "5");
            put("hoodie.upsert.shuffle.parallelism", "5");
        }
    };
    private FeatureGroupUtils utils = new FeatureGroupUtils();
    private FeatureGroupApi featureGroupApi = new FeatureGroupApi();
    private FeatureGroupCommit fgCommitMetadata = new FeatureGroupCommit();
    private DeltaStreamerConfig deltaStreamerConfig = new DeltaStreamerConfig();
    private KafkaApi kafkaApi = new KafkaApi();
    private StorageConnectorApi storageConnectorApi = new StorageConnectorApi();

    public void saveHudiFeatureGroup(SparkSession sparkSession, FeatureGroupBase featureGroupBase, Dataset<Row> dataset, HudiOperationType hudiOperationType, Map<String, String> map, Integer num) throws IOException, FeatureStoreException, ParseException {
        dataset.write().format(HUDI_SPARK_FORMAT).options(setupHudiWriteOpts(featureGroupBase, hudiOperationType, map)).mode(SaveMode.Append).save(featureGroupBase.getLocation());
        FeatureGroupCommit lastCommitMetadata = getLastCommitMetadata(sparkSession, featureGroupBase.getLocation());
        if (lastCommitMetadata != null) {
            lastCommitMetadata.setValidationId(num);
            this.featureGroupApi.featureGroupCommit(featureGroupBase, lastCommitMetadata);
        }
    }

    public FeatureGroupCommit deleteRecord(SparkSession sparkSession, FeatureGroupBase featureGroupBase, Dataset<Row> dataset, Map<String, String> map) throws IOException, FeatureStoreException, ParseException {
        Map<String, String> map2 = setupHudiWriteOpts(featureGroupBase, HudiOperationType.UPSERT, map);
        map2.put(PAYLOAD_CLASS_OPT_KEY, PAYLOAD_CLASS_OPT_VAL);
        dataset.write().format(HUDI_SPARK_FORMAT).options(map2).mode(SaveMode.Append).save(featureGroupBase.getLocation());
        FeatureGroupCommit lastCommitMetadata = getLastCommitMetadata(sparkSession, featureGroupBase.getLocation());
        if (lastCommitMetadata == null) {
            throw new FeatureStoreException("No commit information was found for this feature group");
        }
        FeatureGroupCommit featureGroupCommit = this.featureGroupApi.featureGroupCommit(featureGroupBase, lastCommitMetadata);
        featureGroupCommit.setCommitID(featureGroupCommit.getCommitID());
        return featureGroupCommit;
    }

    public void registerTemporaryTable(SparkSession sparkSession, FeatureGroupAlias featureGroupAlias, Map<String, String> map) {
    }

    private FeatureGroupCommit getLastCommitMetadata(SparkSession sparkSession, String str) throws IOException, FeatureStoreException, ParseException {
        HoodieTimeline allCompletedCommitsCompactions = HoodieDataSourceHelpers.allCompletedCommitsCompactions(FileSystem.get(sparkSession.sparkContext().hadoopConfiguration()), str);
        Option lastInstant = allCompletedCommitsCompactions.lastInstant();
        if (!lastInstant.isPresent()) {
            return null;
        }
        this.fgCommitMetadata.setCommitDateString(((HoodieInstant) lastInstant.get()).getTimestamp());
        this.fgCommitMetadata.setCommitTime(FeatureGroupUtils.getTimeStampFromDateString(((HoodieInstant) lastInstant.get()).getTimestamp()));
        this.fgCommitMetadata.setLastActiveCommitTime(FeatureGroupUtils.getTimeStampFromDateString(((HoodieInstant) allCompletedCommitsCompactions.firstInstant().get()).getTimestamp()));
        HoodieCommitMetadata hoodieCommitMetadata = (HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) allCompletedCommitsCompactions.getInstantDetails((HoodieInstant) lastInstant.get()).get(), HoodieCommitMetadata.class);
        this.fgCommitMetadata.setRowsUpdated(Long.valueOf(hoodieCommitMetadata.fetchTotalUpdateRecordsWritten()));
        this.fgCommitMetadata.setRowsInserted(Long.valueOf(hoodieCommitMetadata.fetchTotalInsertRecordsWritten()));
        this.fgCommitMetadata.setRowsDeleted(Long.valueOf(hoodieCommitMetadata.getTotalRecordsDeleted()));
        return this.fgCommitMetadata;
    }

    private Map<String, String> setupHudiWriteOpts(FeatureGroupBase featureGroupBase, HudiOperationType hudiOperationType, Map<String, String> map) throws FeatureStoreException {
        HashMap hashMap = new HashMap();
        hashMap.put(HUDI_TABLE_STORAGE_TYPE, HUDI_COPY_ON_WRITE);
        hashMap.put(HUDI_KEY_GENERATOR_OPT_KEY, HUDI_COMPLEX_KEY_GENERATOR_OPT_VAL);
        String mkString = this.utils.getPrimaryColumns(featureGroupBase).mkString(UriTemplate.DEFAULT_SEPARATOR);
        if (!Strings.isNullOrEmpty(featureGroupBase.getEventTime())) {
            mkString = mkString + UriTemplate.DEFAULT_SEPARATOR + featureGroupBase.getEventTime();
        }
        hashMap.put(HUDI_RECORD_KEY, mkString);
        String fgName = this.utils.getFgName(featureGroupBase);
        hashMap.put(HUDI_TABLE_NAME, fgName);
        Seq<String> partitionColumns = this.utils.getPartitionColumns(featureGroupBase);
        if (partitionColumns.isEmpty()) {
            hashMap.put(HUDI_PARTITION_FIELD, "");
            hashMap.put(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, HIVE_NON_PARTITION_EXTRACTOR_CLASS_OPT_VAL);
        } else {
            hashMap.put(HUDI_PARTITION_FIELD, partitionColumns.mkString(":SIMPLE,") + ":SIMPLE");
            hashMap.put(HUDI_HIVE_SYNC_PARTITION_FIELDS, partitionColumns.mkString(UriTemplate.DEFAULT_SEPARATOR));
            hashMap.put(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL);
        }
        hashMap.put(HUDI_PRECOMBINE_FIELD, featureGroupBase.getFeatures().stream().filter((v0) -> {
            return v0.getHudiPrecombineKey();
        }).findFirst().orElseThrow(() -> {
            return new FeatureStoreException("Can't find hudi precombine key");
        }).getName());
        hashMap.put(HUDI_HIVE_SYNC_ENABLE, "true");
        hashMap.put(HUDI_HIVE_SYNC_MODE, HUDI_HIVE_SYNC_MODE_VAL);
        hashMap.put(HUDI_HIVE_SYNC_TABLE, fgName);
        hashMap.put(HUDI_HIVE_SYNC_DB, featureGroupBase.getFeatureStore().getName());
        hashMap.put(HIVE_AUTO_CREATE_DATABASE_OPT_KEY, HIVE_AUTO_CREATE_DATABASE_OPT_VAL);
        hashMap.put(HUDI_HIVE_SYNC_SUPPORT_TIMESTAMP, "true");
        if (hudiOperationType != null) {
            hashMap.put(HUDI_TABLE_OPERATION, hudiOperationType.getValue());
        }
        hashMap.putAll(HUDI_DEFAULT_PARALLELISM);
        if (map != null && !map.isEmpty()) {
            hashMap.putAll(map);
        }
        return hashMap;
    }

    public Map<String, String> setupHudiReadOpts(Long l, Long l2, Map<String, String> map) {
        HashMap hashMap = new HashMap();
        if (l2 == null && (l == null || l.longValue() == 0)) {
            hashMap.put(HUDI_QUERY_TYPE_OPT_KEY, HUDI_QUERY_TYPE_SNAPSHOT_OPT_VAL);
        } else if (l2 != null && l == null) {
            hashMap.put(HUDI_QUERY_TYPE_OPT_KEY, HUDI_QUERY_TYPE_SNAPSHOT_OPT_VAL);
            hashMap.put(HUDI_QUERY_TIME_TRAVEL_AS_OF_INSTANT, this.utils.timeStampToHudiFormat(l2));
        } else if (l2 != null || l == null) {
            hashMap.put(HUDI_QUERY_TYPE_OPT_KEY, HUDI_QUERY_TYPE_INCREMENTAL_OPT_VAL);
            hashMap.put(HUDI_BEGIN_INSTANTTIME_OPT_KEY, this.utils.timeStampToHudiFormat(l));
            hashMap.put(HUDI_END_INSTANTTIME_OPT_KEY, this.utils.timeStampToHudiFormat(l2));
        } else {
            hashMap.put(HUDI_QUERY_TYPE_OPT_KEY, HUDI_QUERY_TYPE_INCREMENTAL_OPT_VAL);
            hashMap.put(HUDI_BEGIN_INSTANTTIME_OPT_KEY, this.utils.timeStampToHudiFormat(l));
        }
        if (map != null && !map.isEmpty()) {
            hashMap.putAll(map);
        }
        return hashMap;
    }

    private void createEmptyTable(SparkSession sparkSession, StreamFeatureGroup streamFeatureGroup) throws IOException, FeatureStoreException {
        Configuration hadoopConfiguration = sparkSession.sparkContext().hadoopConfiguration();
        Properties properties = new Properties();
        properties.putAll(setupHudiWriteOpts(streamFeatureGroup, null, null));
        HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConfiguration, streamFeatureGroup.getLocation(), properties);
    }

    public void reconcileHudiSchema(SparkSession sparkSession, FeatureGroupAlias featureGroupAlias, Map<String, String> map) throws FeatureStoreException {
        String tableName = this.utils.getTableName(featureGroupAlias.getFeatureGroup());
        if (sparkSchemasMatch(sparkSession.table(featureGroupAlias.getAlias()).columns(), sparkSession.table(tableName).columns())) {
            return;
        }
        try {
            saveHudiFeatureGroup(sparkSession, ((FeatureStore) featureGroupAlias.getFeatureGroup().getFeatureStore()).getFeatureGroup(featureGroupAlias.getFeatureGroup().getName(), featureGroupAlias.getFeatureGroup().getVersion()), sparkSession.table(tableName).limit(0), HudiOperationType.UPSERT, new HashMap(), null);
            sparkSession.read().format(HUDI_SPARK_FORMAT).options(map).load(featureGroupAlias.getFeatureGroup().getLocation()).createOrReplaceTempView(featureGroupAlias.getAlias());
        } catch (IOException | ParseException e) {
            throw new FeatureStoreException("Error while reconciling HUDI schema.", e);
        }
    }

    public boolean sparkSchemasMatch(String[] strArr, String[] strArr2) {
        if (strArr == null || strArr2 == null || strArr.length != strArr2.length) {
            return false;
        }
        Arrays.sort(strArr);
        Arrays.sort(strArr2);
        for (int i = 0; i < strArr.length; i++) {
            if (!strArr[i].equals(strArr2[i])) {
                return false;
            }
        }
        return true;
    }

    public void streamToHoodieTable(SparkSession sparkSession, StreamFeatureGroup streamFeatureGroup, Map<String, String> map) throws Exception {
        Map<String, String> map2 = setupHudiWriteOpts(streamFeatureGroup, HudiOperationType.UPSERT, map);
        map2.put(PROJECT_ID, String.valueOf(streamFeatureGroup.getFeatureStore().getProjectId()));
        map2.put(FEATURE_STORE_NAME, streamFeatureGroup.getFeatureStore().getName());
        map2.put(SUBJECT_ID, String.valueOf(streamFeatureGroup.getSubject().getId()));
        map2.put(FEATURE_GROUP_NAME, streamFeatureGroup.getName());
        map2.put(FEATURE_GROUP_VERSION, String.valueOf(streamFeatureGroup.getVersion()));
        map2.put(HUDI_TABLE_NAME, this.utils.getFgName(streamFeatureGroup));
        map2.put(HUDI_BASE_PATH, streamFeatureGroup.getLocation());
        map2.put(HUDI_KAFKA_TOPIC, streamFeatureGroup.getOnlineTopicName());
        map2.put(FEATURE_GROUP_SCHEMA, streamFeatureGroup.getAvroSchema());
        map2.put(FEATURE_GROUP_ENCODED_SCHEMA, streamFeatureGroup.getEncodedAvroSchema());
        map2.put(FEATURE_GROUP_COMPLEX_FEATURES, new JSONArray((Collection<?>) streamFeatureGroup.getComplexFeatures()).toString());
        map2.put(DELTA_SOURCE_ORDERING_FIELD_OPT_KEY, map2.get(HUDI_PRECOMBINE_FIELD));
        Path path = new Path(streamFeatureGroup.getLocation());
        if (!path.getFileSystem(sparkSession.sparkContext().hadoopConfiguration()).exists(new Path(path, ".hoodie"))) {
            createEmptyTable(sparkSession, streamFeatureGroup);
            map2.put("auto.offset.reset", "earliest");
        }
        if (getLastCommitMetadata(sparkSession, streamFeatureGroup.getLocation()) == null) {
            map2.put("auto.offset.reset", "earliest");
        }
        this.deltaStreamerConfig.streamToHoodieTable(map2, sparkSession);
        FeatureGroupCommit lastCommitMetadata = getLastCommitMetadata(sparkSession, streamFeatureGroup.getLocation());
        if (lastCommitMetadata != null) {
            this.featureGroupApi.featureGroupCommit(streamFeatureGroup, lastCommitMetadata);
            streamFeatureGroup.computeStatistics();
        }
    }
}
