/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.catalog;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.ParquetTableSchemaResolver;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.sync.common.util.SparkDataSourceTableUtils;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TableOptionProperties {
    private static final Logger LOG = LoggerFactory.getLogger(TableOptionProperties.class);
    public static final String SPARK_SOURCE_PROVIDER = "spark.sql.sources.provider";
    public static final String SPARK_VERSION = "spark.version";
    public static final String DEFAULT_SPARK_VERSION = "spark3.5.1";
    static final Map<String, String> VALUE_MAPPING = new HashMap<String, String>();
    static final Map<String, String> KEY_MAPPING = new HashMap<String, String>();
    private static final String FILE_NAME = "table_option.properties";
    public static final String PK_CONSTRAINT_NAME = "pk.constraint.name";
    public static final String PK_COLUMNS = "pk.columns";
    public static final String COMMENT = "comment";
    public static final String PARTITION_COLUMNS = "partition.columns";
    public static final List<String> NON_OPTION_KEYS = Arrays.asList("pk.constraint.name", "pk.columns", "comment", "partition.columns");

    public static void createProperties(String basePath, Configuration hadoopConf, Map<String, String> options) throws IOException {
        Path propertiesFilePath = TableOptionProperties.writePropertiesFile(basePath, hadoopConf, options, false);
        LOG.info(String.format("Create file %s success.", propertiesFilePath));
    }

    public static void overwriteProperties(String basePath, Configuration hadoopConf, Map<String, String> options) throws IOException {
        Path propertiesFilePath = TableOptionProperties.writePropertiesFile(basePath, hadoopConf, options, true);
        LOG.info(String.format("Update file %s success.", propertiesFilePath));
    }

    private static Path writePropertiesFile(String basePath, Configuration hadoopConf, Map<String, String> options, boolean isOverwrite) throws IOException {
        Path propertiesFilePath = TableOptionProperties.getPropertiesFilePath(basePath);
        FileSystem fs = HadoopFSUtils.getFs(basePath, hadoopConf);
        try (FSDataOutputStream outputStream = fs.create(propertiesFilePath, isOverwrite);){
            Properties properties2 = new Properties();
            properties2.putAll(options);
            properties2.store((OutputStream)outputStream, "Table option properties saved on " + new Date(System.currentTimeMillis()));
        }
        return propertiesFilePath;
    }

    public static Map<String, String> loadFromProperties(String basePath, Configuration hadoopConf) {
        Path propertiesFilePath = TableOptionProperties.getPropertiesFilePath(basePath);
        HashMap<String, String> options = new HashMap<String, String>();
        Properties props = new Properties();
        FileSystem fs = HadoopFSUtils.getFs(basePath, hadoopConf);
        try (FSDataInputStream inputStream = fs.open(propertiesFilePath);){
            props.load((InputStream)inputStream);
            for (String name2 : props.stringPropertyNames()) {
                options.put(name2, props.getProperty(name2));
            }
        }
        catch (IOException e) {
            throw new HoodieIOException(String.format("Could not load table option properties from %s", propertiesFilePath), e);
        }
        LOG.info(String.format("Loading table option properties from %s success.", propertiesFilePath));
        return options;
    }

    private static Path getPropertiesFilePath(String basePath) {
        String auxPath = basePath + "/" + ".hoodie/.aux";
        return new Path(auxPath, FILE_NAME);
    }

    public static String getPkConstraintName(Map<String, String> options) {
        return options.get(PK_CONSTRAINT_NAME);
    }

    public static List<String> getPkColumns(Map<String, String> options) {
        if (options.containsKey(PK_COLUMNS)) {
            return Arrays.stream(options.get(PK_COLUMNS).split(",")).collect(Collectors.toList());
        }
        return Collections.emptyList();
    }

    public static List<String> getPartitionColumns(Map<String, String> options) {
        if (options.containsKey(PARTITION_COLUMNS)) {
            return Arrays.stream(options.get(PARTITION_COLUMNS).split(",")).collect(Collectors.toList());
        }
        return Collections.emptyList();
    }

    public static String getComment(Map<String, String> options) {
        return options.get(COMMENT);
    }

    public static Map<String, String> getTableOptions(Map<String, String> options) {
        HashMap<String, String> copied = new HashMap<String, String>(options);
        NON_OPTION_KEYS.forEach(copied::remove);
        return copied;
    }

    public static Map<String, String> translateFlinkTableProperties2Spark(CatalogTable catalogTable, Configuration hadoopConf, Map<String, String> properties2, List<String> partitionKeys, boolean withOperationField) {
        RowType rowType = TableOptionProperties.supplementMetaFields((RowType)catalogTable.getSchema().toPhysicalRowDataType().getLogicalType(), withOperationField);
        Schema schema = AvroSchemaConverter.convertToSchema((LogicalType)rowType);
        MessageType messageType = ParquetTableSchemaResolver.convertAvroSchemaToParquet(schema, hadoopConf);
        String sparkVersion = catalogTable.getOptions().getOrDefault(SPARK_VERSION, DEFAULT_SPARK_VERSION);
        Map<String, String> sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties(partitionKeys, sparkVersion, 4000, messageType);
        properties2.putAll(sparkTableProperties);
        return properties2.entrySet().stream().filter(e -> KEY_MAPPING.containsKey(e.getKey()) && !catalogTable.getOptions().containsKey(KEY_MAPPING.get(e.getKey()))).collect(Collectors.toMap(e -> KEY_MAPPING.get(e.getKey()), e -> {
            if (((String)e.getKey()).equalsIgnoreCase(FlinkOptions.TABLE_TYPE.key())) {
                String sparkTableType = VALUE_MAPPING.get(e.getValue());
                if (sparkTableType == null) {
                    throw new HoodieValidationException(String.format("%s's value is invalid", e.getKey()));
                }
                return sparkTableType;
            }
            return (String)e.getValue();
        }));
    }

    private static RowType supplementMetaFields(RowType rowType, boolean withOperationField) {
        ArrayList<String> metaFields = new ArrayList<String>(HoodieRecord.HOODIE_META_COLUMNS);
        if (withOperationField) {
            metaFields.add(HoodieRecord.OPERATION_METADATA_FIELD);
        }
        ArrayList<RowType.RowField> rowFields = new ArrayList<RowType.RowField>();
        for (String metaField : metaFields) {
            rowFields.add(new RowType.RowField(metaField, (LogicalType)new VarCharType(10000)));
        }
        rowFields.addAll(rowType.getFields());
        return new RowType(false, rowFields);
    }

    public static Map<String, String> translateSparkTableProperties2Flink(Map<String, String> options) {
        if (options.containsKey(FactoryUtil.CONNECTOR.key())) {
            return options;
        }
        return options.entrySet().stream().filter(e -> KEY_MAPPING.containsKey(e.getKey())).collect(Collectors.toMap(e -> KEY_MAPPING.get(e.getKey()), e -> ((String)e.getKey()).equalsIgnoreCase("type") ? VALUE_MAPPING.get(e.getValue()) : (String)e.getValue()));
    }

    public static Map<String, String> translateSparkTableProperties2Flink(Table hiveTable) {
        return TableOptionProperties.translateSparkTableProperties2Flink(hiveTable.getParameters());
    }

    static {
        VALUE_MAPPING.put("mor", HoodieTableType.MERGE_ON_READ.name());
        VALUE_MAPPING.put("cow", HoodieTableType.COPY_ON_WRITE.name());
        VALUE_MAPPING.put(HoodieTableType.MERGE_ON_READ.name(), "mor");
        VALUE_MAPPING.put(HoodieTableType.COPY_ON_WRITE.name(), "cow");
        KEY_MAPPING.put("type", FlinkOptions.TABLE_TYPE.key());
        KEY_MAPPING.put("primaryKey", FlinkOptions.RECORD_KEY_FIELD.key());
        KEY_MAPPING.put("preCombineField", FlinkOptions.PRECOMBINE_FIELD.key());
        KEY_MAPPING.put("payloadClass", FlinkOptions.PAYLOAD_CLASS_NAME.key());
        KEY_MAPPING.put(SPARK_SOURCE_PROVIDER, FactoryUtil.CONNECTOR.key());
        KEY_MAPPING.put(FlinkOptions.KEYGEN_CLASS_NAME.key(), FlinkOptions.KEYGEN_CLASS_NAME.key());
        KEY_MAPPING.put(FlinkOptions.TABLE_TYPE.key(), "type");
        KEY_MAPPING.put(FlinkOptions.RECORD_KEY_FIELD.key(), "primaryKey");
        KEY_MAPPING.put(FlinkOptions.PRECOMBINE_FIELD.key(), "preCombineField");
        KEY_MAPPING.put(FlinkOptions.PAYLOAD_CLASS_NAME.key(), "payloadClass");
    }
}

