/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.catalog;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import org.apache.avro.Schema;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.PrincipalType;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hudi.adapter.HiveCatalogConstants;
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieCatalogException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
import org.apache.hudi.table.catalog.CatalogOptions;
import org.apache.hudi.table.catalog.HiveSchemaUtils;
import org.apache.hudi.table.catalog.HoodieCatalogUtil;
import org.apache.hudi.table.catalog.TableOptionProperties;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.DataTypeUtils;
import org.apache.hudi.util.StreamerUtil;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieHiveCatalog
extends AbstractCatalog {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieHiveCatalog.class);
    private final HiveConf hiveConf;
    private IMetaStoreClient client;
    private final String catalogPath;
    private final boolean external;

    public HoodieHiveCatalog(String catalogName, Configuration options) {
        this(catalogName, options, HoodieCatalogUtil.createHiveConf(options.getString(CatalogOptions.HIVE_CONF_DIR), options), false);
    }

    public HoodieHiveCatalog(String catalogName, Configuration options, HiveConf hiveConf, boolean allowEmbedded) {
        super(catalogName, options.getString(CatalogOptions.DEFAULT_DATABASE));
        this.hiveConf = hiveConf;
        this.catalogPath = options.getString(CatalogOptions.CATALOG_PATH, hiveConf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE));
        this.external = options.getBoolean(CatalogOptions.TABLE_EXTERNAL);
        if (!allowEmbedded) {
            Preconditions.checkArgument(!HoodieCatalogUtil.isEmbeddedMetastore(this.hiveConf), "Embedded metastore is not allowed. Make sure you have set a valid value for " + HiveConf.ConfVars.METASTOREURIS);
        }
        LOG.info("Created Hoodie Catalog '{}' in hms mode", (Object)catalogName);
    }

    public void open() throws CatalogException {
        if (this.client == null) {
            try {
                this.client = Hive.get((HiveConf)this.hiveConf).getMSC();
            }
            catch (Exception e) {
                throw new HoodieCatalogException("Failed to create hive metastore client", e);
            }
            LOG.info("Connected to Hive metastore");
        }
        if (!this.databaseExists(this.getDefaultDatabase())) {
            LOG.info("{} does not exist, will be created.", (Object)this.getDefaultDatabase());
            CatalogDatabaseImpl database = new CatalogDatabaseImpl(Collections.emptyMap(), "default database");
            try {
                this.createDatabase(this.getDefaultDatabase(), (CatalogDatabase)database, true);
            }
            catch (DatabaseAlreadyExistException e) {
                throw new HoodieCatalogException(this.getName(), e);
            }
        }
    }

    public void close() throws CatalogException {
        if (this.client != null) {
            this.client.close();
            this.client = null;
            LOG.info("Disconnect to hive metastore");
        }
    }

    public HiveConf getHiveConf() {
        return this.hiveConf;
    }

    public List<String> listDatabases() throws CatalogException {
        try {
            return this.client.getAllDatabases();
        }
        catch (TException e) {
            throw new HoodieCatalogException(String.format("Failed to list all databases in %s", this.getName()), e);
        }
    }

    private Database getHiveDatabase(String databaseName) throws DatabaseNotExistException {
        try {
            return this.client.getDatabase(databaseName);
        }
        catch (NoSuchObjectException e) {
            throw new DatabaseNotExistException(this.getName(), databaseName);
        }
        catch (TException e) {
            throw new HoodieCatalogException(String.format("Failed to get database %s from %s", databaseName, this.getName()), e);
        }
    }

    public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
        Database hiveDatabase = this.getHiveDatabase(databaseName);
        HashMap<String, String> properties2 = new HashMap<String, String>(hiveDatabase.getParameters());
        properties2.put("hive.database.location-uri", hiveDatabase.getLocationUri());
        return new CatalogDatabaseImpl(properties2, hiveDatabase.getDescription());
    }

    public boolean databaseExists(String databaseName) throws CatalogException {
        try {
            return this.client.getDatabase(databaseName) != null;
        }
        catch (NoSuchObjectException e) {
            return false;
        }
        catch (TException e) {
            throw new HoodieCatalogException(String.format("Failed to determine whether database %s exists or not", databaseName), e);
        }
    }

    public void createDatabase(String databaseName, CatalogDatabase database, boolean ignoreIfExists) throws DatabaseAlreadyExistException, CatalogException {
        Preconditions.checkArgument(!org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly(databaseName), "Database name can not null or empty");
        Preconditions.checkNotNull(database, "database cannot be null");
        Map properties2 = database.getProperties();
        String dbLocationUri = (String)properties2.remove("hive.database.location-uri");
        if (dbLocationUri == null && this.catalogPath != null) {
            dbLocationUri = new Path(this.catalogPath, databaseName).toString();
        }
        Database hiveDatabase = new Database(databaseName, database.getComment(), dbLocationUri, properties2);
        try {
            this.client.createDatabase(hiveDatabase);
        }
        catch (AlreadyExistsException e) {
            if (!ignoreIfExists) {
                throw new DatabaseAlreadyExistException(this.getName(), hiveDatabase.getName());
            }
        }
        catch (TException e) {
            throw new HoodieCatalogException(String.format("Failed to create database %s", hiveDatabase.getName()), e);
        }
    }

    public void dropDatabase(String name2, boolean ignoreIfNotExists, boolean cascade) throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
        try {
            this.client.dropDatabase(name2, true, ignoreIfNotExists, cascade);
        }
        catch (NoSuchObjectException e) {
            if (!ignoreIfNotExists) {
                throw new DatabaseNotExistException(this.getName(), name2);
            }
        }
        catch (InvalidOperationException e) {
            throw new DatabaseNotEmptyException(this.getName(), name2);
        }
        catch (TException e) {
            throw new HoodieCatalogException(String.format("Failed to drop database %s", name2), e);
        }
    }

    public void alterDatabase(String databaseName, CatalogDatabase newDatabase, boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException {
        Database hiveDB;
        Preconditions.checkArgument(!org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly(databaseName), "Database name cannot be null or empty");
        Preconditions.checkNotNull(newDatabase, "New database cannot be null");
        try {
            hiveDB = this.getHiveDatabase(databaseName);
        }
        catch (DatabaseNotExistException e) {
            if (!ignoreIfNotExists) {
                throw new DatabaseNotExistException(this.getName(), databaseName);
            }
            return;
        }
        try {
            this.client.alterDatabase(databaseName, HoodieHiveCatalog.alterDatabase(hiveDB, newDatabase));
        }
        catch (TException e) {
            throw new HoodieCatalogException(String.format("Failed to alter database %s", databaseName), e);
        }
    }

    private static Database alterDatabase(Database hiveDB, CatalogDatabase newDatabase) {
        Map newParams = newDatabase.getProperties();
        String opStr = (String)newParams.remove("hive.alter.database.op");
        if (opStr == null) {
            opStr = HiveCatalogConstants.AlterHiveDatabaseOp.CHANGE_PROPS.name();
        }
        String newLocation = (String)newParams.remove("hive.database.location-uri");
        HiveCatalogConstants.AlterHiveDatabaseOp op = HiveCatalogConstants.AlterHiveDatabaseOp.valueOf((String)opStr);
        block0 : switch (op) {
            case CHANGE_PROPS: {
                hiveDB.setParameters(newParams);
                break;
            }
            case CHANGE_LOCATION: {
                hiveDB.setLocationUri(newLocation);
                break;
            }
            case CHANGE_OWNER: {
                String ownerName = (String)newParams.remove("hive.database.owner.name");
                String ownerType = (String)newParams.remove("hive.database.owner.type");
                hiveDB.setOwnerName(ownerName);
                switch (ownerType) {
                    case "role": {
                        hiveDB.setOwnerType(PrincipalType.ROLE);
                        break block0;
                    }
                    case "user": {
                        hiveDB.setOwnerType(PrincipalType.USER);
                        break block0;
                    }
                }
                throw new CatalogException("Unsupported database owner type: " + ownerType);
            }
            default: {
                throw new CatalogException("Unsupported alter database op:" + opStr);
            }
        }
        if (hiveDB.getParameters() != null) {
            hiveDB.getParameters().remove("is_generic");
        }
        return hiveDB;
    }

    private Table isHoodieTable(Table hiveTable) {
        if (!hiveTable.getParameters().getOrDefault("spark.sql.sources.provider", "").equalsIgnoreCase("hudi") && !this.isFlinkHoodieTable(hiveTable)) {
            throw new HoodieCatalogException(String.format("Table %s is not a hoodie table", hiveTable.getTableName()));
        }
        return hiveTable;
    }

    private boolean isFlinkHoodieTable(Table hiveTable) {
        return hiveTable.getParameters().getOrDefault(FactoryUtil.CONNECTOR.key(), "").equalsIgnoreCase("hudi");
    }

    @VisibleForTesting
    public Table getHiveTable(ObjectPath tablePath) throws TableNotExistException {
        try {
            Table hiveTable = this.client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
            return this.isHoodieTable(hiveTable);
        }
        catch (NoSuchObjectException e) {
            throw new TableNotExistException(this.getName(), tablePath);
        }
        catch (TException e) {
            throw new HoodieCatalogException(String.format("Failed to get table %s from Hive metastore", tablePath.getObjectName()), e);
        }
    }

    private Table translateSparkTable2Flink(ObjectPath tablePath, Table hiveTable) {
        if (!this.isFlinkHoodieTable(hiveTable)) {
            try {
                Map parameters2 = hiveTable.getParameters();
                parameters2.putAll(TableOptionProperties.translateSparkTableProperties2Flink(hiveTable));
                String path = hiveTable.getSd().getLocation();
                parameters2.put(FlinkOptions.PATH.key(), path);
                if (!parameters2.containsKey(FlinkOptions.HIVE_STYLE_PARTITIONING.key())) {
                    boolean hiveStyle;
                    Option<HoodieTableConfig> tableConfig = StreamerUtil.getTableConfig(path, (org.apache.hadoop.conf.Configuration)this.hiveConf);
                    if (tableConfig.isPresent() && tableConfig.get().contains(FlinkOptions.HIVE_STYLE_PARTITIONING.key())) {
                        hiveStyle = Boolean.parseBoolean(tableConfig.get().getHiveStylePartitioningEnable());
                    } else {
                        Path hoodieTablePath = new Path(path);
                        hiveStyle = Arrays.stream(HadoopFSUtils.getFs(hoodieTablePath, (org.apache.hadoop.conf.Configuration)this.hiveConf).listStatus(hoodieTablePath)).map(fileStatus -> fileStatus.getPath().getName()).filter(f -> !f.equals(".hoodie") && !f.equals("default")).anyMatch(FilePathUtils::isHiveStylePartitioning);
                    }
                    if (hiveStyle) {
                        parameters2.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), "true");
                    }
                }
                this.client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), hiveTable);
            }
            catch (Exception e) {
                throw new HoodieCatalogException("Failed to update table schema", e);
            }
        }
        return hiveTable;
    }

    public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
        org.apache.flink.table.api.Schema schema;
        Preconditions.checkNotNull(tablePath, "Table path cannot be null");
        Table hiveTable = this.translateSparkTable2Flink(tablePath, this.getHiveTable(tablePath));
        String path = hiveTable.getSd().getLocation();
        Map parameters2 = hiveTable.getParameters();
        Schema latestTableSchema = StreamerUtil.getLatestTableSchema(path, (org.apache.hadoop.conf.Configuration)this.hiveConf);
        if (latestTableSchema != null) {
            String pkColumnsStr = (String)parameters2.get(FlinkOptions.RECORD_KEY_FIELD.key());
            List<String> pkColumns = StringUtils.isNullOrEmpty(pkColumnsStr) ? null : StringUtils.split(pkColumnsStr, ",");
            DataType tableDataType = DataTypeUtils.ensureColumnsAsNonNullable(AvroSchemaConverter.convertToDataType(latestTableSchema), pkColumns);
            Schema.Builder builder = org.apache.flink.table.api.Schema.newBuilder().fromRowDataType(tableDataType);
            String pkConstraintName = (String)parameters2.get("pk.constraint.name");
            if (!StringUtils.isNullOrEmpty(pkConstraintName)) {
                builder.primaryKeyNamed(pkConstraintName, pkColumns);
            } else if (pkColumns != null) {
                builder.primaryKey(pkColumns);
            }
            schema = builder.build();
        } else {
            LOG.warn("{} does not have any hoodie schema, and use hive table schema to infer the table schema", (Object)tablePath);
            schema = HiveSchemaUtils.convertTableSchema(hiveTable);
        }
        Map<String, String> options = this.supplementOptions(tablePath, parameters2);
        return CatalogTable.of((org.apache.flink.table.api.Schema)schema, (String)((String)parameters2.get("comment")), HiveSchemaUtils.getFieldNames(hiveTable.getPartitionKeys()), options);
    }

    public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
        Preconditions.checkNotNull(tablePath, "Table path cannot be null");
        Preconditions.checkNotNull(table, "Table cannot be null");
        if (!this.databaseExists(tablePath.getDatabaseName())) {
            throw new DatabaseNotExistException(this.getName(), tablePath.getDatabaseName());
        }
        if (!table.getOptions().getOrDefault(FactoryUtil.CONNECTOR.key(), "").equalsIgnoreCase("hudi")) {
            throw new HoodieCatalogException(String.format("Unsupported connector identity %s, supported identity is %s", table.getOptions().getOrDefault(FactoryUtil.CONNECTOR.key(), ""), "hudi"));
        }
        if (table instanceof CatalogView) {
            throw new HoodieCatalogException("CREATE VIEW is not supported.");
        }
        this.validateParameterConsistency(table);
        try {
            boolean isMorTable = OptionsResolver.isMorTable(table.getOptions());
            Table hiveTable = this.instantiateHiveTable(tablePath, table, this.inferTablePath(tablePath, table), isMorTable);
            this.client.createTable(hiveTable);
            this.initTableIfNotExists(tablePath, (CatalogTable)table);
        }
        catch (AlreadyExistsException e) {
            if (!ignoreIfExists) {
                throw new TableAlreadyExistException(this.getName(), tablePath, (Throwable)e);
            }
        }
        catch (Exception e) {
            throw new HoodieCatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e);
        }
    }

    private void initTableIfNotExists(ObjectPath tablePath, CatalogTable catalogTable) {
        Configuration flinkConf = Configuration.fromMap(catalogTable.getOptions());
        String avroSchema = AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPersistedRowDataType().getLogicalType(), AvroSchemaUtils.getAvroRecordQualifiedName(tablePath.getObjectName())).toString();
        flinkConf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, avroSchema);
        if (catalogTable.getUnresolvedSchema().getPrimaryKey().isPresent() && !flinkConf.contains(FlinkOptions.RECORD_KEY_FIELD)) {
            String pkColumns = String.join((CharSequence)",", ((Schema.UnresolvedPrimaryKey)catalogTable.getUnresolvedSchema().getPrimaryKey().get()).getColumnNames());
            flinkConf.setString(FlinkOptions.RECORD_KEY_FIELD, pkColumns);
        }
        if (catalogTable.isPartitioned() && !flinkConf.contains(FlinkOptions.PARTITION_PATH_FIELD)) {
            String partitions = String.join((CharSequence)",", catalogTable.getPartitionKeys());
            flinkConf.setString(FlinkOptions.PARTITION_PATH_FIELD, partitions);
            String[] pks = flinkConf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",");
            boolean complexHoodieKey = pks.length > 1 || catalogTable.getPartitionKeys().size() > 1;
            StreamerUtil.checkKeygenGenerator(complexHoodieKey, flinkConf);
        }
        if (!catalogTable.isPartitioned()) {
            flinkConf.setString(FlinkOptions.KEYGEN_CLASS_NAME.key(), NonpartitionedAvroKeyGenerator.class.getName());
        }
        if (!flinkConf.getOptional(FlinkOptions.PATH).isPresent()) {
            flinkConf.setString(FlinkOptions.PATH, this.inferTablePath(tablePath, (CatalogBaseTable)catalogTable));
        }
        flinkConf.setString(FlinkOptions.TABLE_NAME, tablePath.getObjectName());
        ArrayList<String> fields2 = new ArrayList<String>();
        catalogTable.getUnresolvedSchema().getColumns().forEach(column -> fields2.add(column.getName()));
        StreamerUtil.checkPreCombineKey(flinkConf, fields2);
        try {
            StreamerUtil.initTableIfNotExists(flinkConf, (org.apache.hadoop.conf.Configuration)this.hiveConf);
        }
        catch (IOException e) {
            throw new HoodieCatalogException("Initialize table exception.", e);
        }
    }

    @VisibleForTesting
    public String inferTablePath(ObjectPath tablePath, CatalogBaseTable table) {
        String location = table.getOptions().getOrDefault(FlinkOptions.PATH.key(), "");
        if (StringUtils.isNullOrEmpty(location)) {
            try {
                Path dbLocation = new Path(this.client.getDatabase(tablePath.getDatabaseName()).getLocationUri());
                location = new Path(dbLocation, tablePath.getObjectName()).toString();
            }
            catch (TException e) {
                throw new HoodieCatalogException(String.format("Failed to infer hoodie table path for table %s", tablePath), e);
            }
        }
        return location;
    }

    private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table, String location, boolean useRealTimeInputFormat) throws IOException {
        Table hiveTable = org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable((String)tablePath.getDatabaseName(), (String)tablePath.getObjectName());
        hiveTable.setOwner(UserGroupInformation.getCurrentUser().getShortUserName());
        hiveTable.setCreateTime((int)(System.currentTimeMillis() / 1000L));
        HashMap<String, String> properties2 = new HashMap<String, String>(table.getOptions());
        if (properties2.containsKey(FlinkOptions.INDEX_TYPE.key()) && !properties2.containsKey(HoodieIndexConfig.INDEX_TYPE.key())) {
            properties2.put(HoodieIndexConfig.INDEX_TYPE.key(), (String)properties2.get(FlinkOptions.INDEX_TYPE.key()));
        }
        properties2.remove(FlinkOptions.INDEX_TYPE.key());
        this.hiveConf.getAllProperties().forEach((BiConsumer<? super Object, ? super Object>)((BiConsumer<Object, Object>)(k, v) -> properties2.put("hadoop." + k, String.valueOf(v))));
        if (this.external) {
            hiveTable.setTableType(TableType.EXTERNAL_TABLE.toString());
            properties2.put("EXTERNAL", "TRUE");
        }
        if (table.getComment() != null) {
            properties2.put("comment", table.getComment());
        }
        if (table.getUnresolvedSchema().getPrimaryKey().isPresent() && !properties2.containsKey(FlinkOptions.RECORD_KEY_FIELD.key())) {
            String pkColumns = String.join((CharSequence)",", ((Schema.UnresolvedPrimaryKey)table.getUnresolvedSchema().getPrimaryKey().get()).getColumnNames());
            properties2.put("pk.constraint.name", ((Schema.UnresolvedPrimaryKey)table.getUnresolvedSchema().getPrimaryKey().get()).getConstraintName());
            properties2.put(FlinkOptions.RECORD_KEY_FIELD.key(), pkColumns);
        }
        if (!properties2.containsKey(FlinkOptions.PATH.key())) {
            properties2.put(FlinkOptions.PATH.key(), location);
        }
        StorageDescriptor sd = new StorageDescriptor();
        boolean withOperationField = Boolean.parseBoolean(table.getOptions().getOrDefault(FlinkOptions.CHANGELOG_ENABLED.key(), "false"));
        List<FieldSchema> allColumns = HiveSchemaUtils.toHiveFieldSchema(table.getSchema(), withOperationField);
        CatalogTable catalogTable = (CatalogTable)table;
        List<String> partitionKeys = HoodieCatalogUtil.getPartitionKeys(catalogTable);
        if (partitionKeys.size() > 0) {
            Pair<List<FieldSchema>, List<FieldSchema>> splitSchemas = HiveSchemaUtils.splitSchemaByPartitionKeys(allColumns, partitionKeys);
            List<FieldSchema> regularColumns = splitSchemas.getLeft();
            List<FieldSchema> partitionColumns = splitSchemas.getRight();
            sd.setCols(regularColumns);
            hiveTable.setPartitionKeys(partitionColumns);
        } else {
            sd.setCols(allColumns);
            hiveTable.setPartitionKeys(Collections.emptyList());
        }
        HoodieFileFormat baseFileFormat = HoodieFileFormat.PARQUET;
        String inputFormatClassName = HoodieInputFormatUtils.getInputFormatClassName(baseFileFormat, useRealTimeInputFormat);
        String outputFormatClassName = HoodieInputFormatUtils.getOutputFormatClassName(baseFileFormat);
        String serDeClassName = HoodieInputFormatUtils.getSerDeClassName(baseFileFormat);
        sd.setInputFormat(inputFormatClassName);
        sd.setOutputFormat(outputFormatClassName);
        HashMap<String, String> serdeProperties = new HashMap<String, String>();
        serdeProperties.put("path", location);
        serdeProperties.put("hoodie.query.as.ro.table", String.valueOf(!useRealTimeInputFormat));
        serdeProperties.put("serialization.format", "1");
        serdeProperties.putAll(TableOptionProperties.translateFlinkTableProperties2Spark(catalogTable, (org.apache.hadoop.conf.Configuration)this.hiveConf, properties2, partitionKeys, withOperationField));
        sd.setSerdeInfo(new SerDeInfo(null, serDeClassName, serdeProperties));
        sd.setLocation(location);
        hiveTable.setSd(sd);
        hiveTable.setParameters(properties2);
        return hiveTable;
    }

    public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException {
        Preconditions.checkArgument(!org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly(databaseName), "Database name cannot be null or empty");
        try {
            return this.client.getAllTables(databaseName);
        }
        catch (UnknownDBException e) {
            throw new DatabaseNotExistException(this.getName(), databaseName);
        }
        catch (TException e) {
            throw new HoodieCatalogException(String.format("Failed to list tables in database %s", databaseName), e);
        }
    }

    public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException {
        throw new HoodieCatalogException("Hoodie catalog does not support to listViews");
    }

    public boolean tableExists(ObjectPath tablePath) throws CatalogException {
        Preconditions.checkNotNull(tablePath, "Table path cannot be null");
        try {
            return this.client.tableExists(tablePath.getDatabaseName(), tablePath.getObjectName());
        }
        catch (UnknownDBException e) {
            return false;
        }
        catch (TException e) {
            throw new CatalogException(String.format("Failed to check whether table %s exists or not.", tablePath.getFullName()), (Throwable)e);
        }
    }

    public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
        Preconditions.checkNotNull(tablePath, "Table path cannot be null");
        try {
            this.client.dropTable(tablePath.getDatabaseName(), tablePath.getObjectName(), true, ignoreIfNotExists);
        }
        catch (NoSuchObjectException e) {
            if (!ignoreIfNotExists) {
                throw new TableNotExistException(this.getName(), tablePath);
            }
        }
        catch (TException e) {
            throw new HoodieCatalogException(String.format("Failed to drop table %s", tablePath.getFullName()), e);
        }
    }

    public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) throws TableNotExistException, TableAlreadyExistException, CatalogException {
        Preconditions.checkNotNull(tablePath, "Table path cannot be null");
        Preconditions.checkArgument(!org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly(newTableName), "New table name cannot be null or empty");
        try {
            if (this.tableExists(tablePath)) {
                ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName);
                if (this.tableExists(newPath)) {
                    throw new TableAlreadyExistException(this.getName(), newPath);
                }
                Table hiveTable = this.getHiveTable(tablePath);
                StorageDescriptor sd = hiveTable.getSd();
                String location = sd.getLocation();
                HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(location).setConf(HadoopFSUtils.getStorageConfWithCopy((org.apache.hadoop.conf.Configuration)this.hiveConf)).build();
                HoodieTableMetaClient.newTableBuilder().fromProperties(metaClient.getTableConfig().getProps()).setTableName(newTableName).initTable(HadoopFSUtils.getStorageConfWithCopy((org.apache.hadoop.conf.Configuration)this.hiveConf), location);
                hiveTable.setTableName(newTableName);
                this.client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), hiveTable);
            } else if (!ignoreIfNotExists) {
                throw new TableNotExistException(this.getName(), tablePath);
            }
        }
        catch (Exception e) {
            throw new HoodieCatalogException(String.format("Failed to rename table %s", tablePath.getFullName()), e);
        }
    }

    public void alterTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
        HoodieCatalogUtil.alterTable(this, tablePath, newCatalogTable, Collections.emptyList(), ignoreIfNotExists, (org.apache.hadoop.conf.Configuration)this.hiveConf, this::inferTablePath, this::refreshHMSTable);
    }

    public void alterTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable, List tableChanges, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
        HoodieCatalogUtil.alterTable(this, tablePath, newCatalogTable, tableChanges, ignoreIfNotExists, (org.apache.hadoop.conf.Configuration)this.hiveConf, this::inferTablePath, this::refreshHMSTable);
    }

    public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath) throws TableNotExistException, TableNotPartitionedException, CatalogException {
        return Collections.emptyList();
    }

    public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException {
        return Collections.emptyList();
    }

    public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> expressions) throws TableNotExistException, TableNotPartitionedException, CatalogException {
        return Collections.emptyList();
    }

    public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {
        throw new HoodieCatalogException("Not supported.");
    }

    public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
        throw new HoodieCatalogException("Not supported.");
    }

    public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException {
        throw new HoodieCatalogException("Not supported.");
    }

    public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
        CatalogBaseTable table;
        Preconditions.checkNotNull(tablePath, "Table path cannot be null");
        Preconditions.checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null");
        try {
            table = this.getTable(tablePath);
        }
        catch (TableNotExistException e) {
            if (!ignoreIfNotExists) {
                throw new PartitionNotExistException(this.getName(), tablePath, partitionSpec, (Throwable)e);
            }
            return;
        }
        try (HoodieFlinkWriteClient<?> writeClient = HoodieCatalogUtil.createWriteClient(tablePath, table, (org.apache.hadoop.conf.Configuration)this.hiveConf, this::inferTablePath);){
            boolean hiveStylePartitioning = Boolean.parseBoolean((String)table.getOptions().get(FlinkOptions.HIVE_STYLE_PARTITIONING.key()));
            writeClient.deletePartitions(Collections.singletonList(HoodieCatalogUtil.inferPartitionPath(hiveStylePartitioning, partitionSpec)), writeClient.createNewInstantTime()).forEach(writeStatus -> {
                if (writeStatus.hasErrors()) {
                    throw new HoodieMetadataException(String.format("Failed to commit metadata table records at file id %s.", writeStatus.getFileId()));
                }
            });
            this.client.dropPartition(tablePath.getDatabaseName(), tablePath.getObjectName(), HoodieCatalogUtil.getOrderedPartitionValues(this.getName(), this.getHiveConf(), partitionSpec, ((CatalogTable)table).getPartitionKeys(), tablePath), true);
        }
        catch (NoSuchObjectException e) {
            if (!ignoreIfNotExists) {
                throw new PartitionNotExistException(this.getName(), tablePath, partitionSpec, (Throwable)e);
            }
        }
        catch (PartitionSpecInvalidException | MetaException e) {
            throw new PartitionNotExistException(this.getName(), tablePath, partitionSpec, e);
        }
        catch (Exception e) {
            throw new CatalogException(String.format("Failed to drop partition %s of table %s", partitionSpec, tablePath), (Throwable)e);
        }
    }

    public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
        throw new HoodieCatalogException("Not supported.");
    }

    public List<String> listFunctions(String databaseName) throws DatabaseNotExistException, CatalogException {
        return Collections.emptyList();
    }

    public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException {
        throw new FunctionNotExistException(this.getName(), functionPath);
    }

    public boolean functionExists(ObjectPath functionPath) throws CatalogException {
        return false;
    }

    public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists) throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException {
        throw new HoodieCatalogException("Not supported.");
    }

    public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists) throws FunctionNotExistException, CatalogException {
        throw new HoodieCatalogException("Not supported.");
    }

    public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) throws FunctionNotExistException, CatalogException {
        throw new HoodieCatalogException("Not supported.");
    }

    public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException {
        return CatalogTableStatistics.UNKNOWN;
    }

    public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException {
        return CatalogColumnStatistics.UNKNOWN;
    }

    public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {
        return CatalogTableStatistics.UNKNOWN;
    }

    public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {
        return CatalogColumnStatistics.UNKNOWN;
    }

    public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
        throw new HoodieCatalogException("Not supported.");
    }

    public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException, TablePartitionedException {
        throw new HoodieCatalogException("Not supported.");
    }

    public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
        throw new HoodieCatalogException("Not supported.");
    }

    public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
        throw new HoodieCatalogException("Not supported.");
    }

    private void refreshHMSTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable) {
        try {
            boolean isMorTable = OptionsResolver.isMorTable(newCatalogTable.getOptions());
            Table hiveTable = this.instantiateHiveTable(tablePath, newCatalogTable, this.inferTablePath(tablePath, newCatalogTable), isMorTable);
            this.client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), hiveTable);
        }
        catch (Exception e) {
            LOG.error("Failed to alter table {}", (Object)tablePath.getObjectName(), (Object)e);
            throw new HoodieCatalogException(String.format("Failed to alter table %s", tablePath.getObjectName()), e);
        }
    }

    private Map<String, String> supplementOptions(ObjectPath tablePath, Map<String, String> options) {
        if (HoodieCatalogUtil.isEmbeddedMetastore(this.hiveConf)) {
            return options;
        }
        HashMap<String, String> newOptions = new HashMap<String, String>(options);
        newOptions.putIfAbsent(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true");
        newOptions.putIfAbsent(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(), this.hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS));
        newOptions.putIfAbsent(FlinkOptions.HIVE_SYNC_MODE.key(), "hms");
        newOptions.putIfAbsent(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP.key(), "true");
        newOptions.computeIfAbsent(FlinkOptions.HIVE_SYNC_DB.key(), k -> tablePath.getDatabaseName());
        newOptions.computeIfAbsent(FlinkOptions.HIVE_SYNC_TABLE.key(), k -> tablePath.getObjectName());
        return newOptions;
    }

    public void validateParameterConsistency(CatalogBaseTable table) {
        HashMap properties2 = new HashMap(table.getOptions());
        String pkError = String.format("Primary key fields definition has inconsistency between pk statement and option '%s'", FlinkOptions.RECORD_KEY_FIELD.key());
        if (table.getUnresolvedSchema().getPrimaryKey().isPresent() && properties2.containsKey(FlinkOptions.RECORD_KEY_FIELD.key())) {
            List pks = ((Schema.UnresolvedPrimaryKey)table.getUnresolvedSchema().getPrimaryKey().get()).getColumnNames();
            String[] pkFromOptions = ((String)properties2.get(FlinkOptions.RECORD_KEY_FIELD.key())).split(",");
            if (pkFromOptions.length != pks.size()) {
                throw new HoodieValidationException(pkError);
            }
            for (String field2 : pkFromOptions) {
                if (pks.contains(field2)) continue;
                throw new HoodieValidationException(pkError);
            }
        }
        String partitionKeyError = String.format("Partition key fields definition has inconsistency between partition key statement and option '%s'", FlinkOptions.PARTITION_PATH_FIELD.key());
        CatalogTable catalogTable = (CatalogTable)table;
        if (catalogTable.isPartitioned() && properties2.containsKey(FlinkOptions.PARTITION_PATH_FIELD.key())) {
            List partitions = catalogTable.getPartitionKeys();
            String[] partitionsFromOptions = ((String)properties2.get(FlinkOptions.PARTITION_PATH_FIELD.key())).split(",");
            if (partitionsFromOptions.length != partitions.size()) {
                throw new HoodieValidationException(pkError);
            }
            for (String field3 : partitionsFromOptions) {
                if (partitions.contains(field3)) continue;
                throw new HoodieValidationException(partitionKeyError);
            }
        }
    }

    @VisibleForTesting
    public IMetaStoreClient getClient() {
        return this.client;
    }
}

