package org.apache.hudi.table;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.DataTypeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/table/HoodieTableFactory.class */
public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieTableFactory.class);
    public static final String FACTORY_ID = "hudi";

    public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
        Configuration fromMap = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
        ResolvedSchema resolvedSchema = context.getCatalogTable().getResolvedSchema();
        sanityCheck(fromMap, resolvedSchema);
        setupConfOptions(fromMap, context.getObjectIdentifier(), context.getCatalogTable(), resolvedSchema);
        return new HoodieTableSource(resolvedSchema, new Path((String) fromMap.getOptional(FlinkOptions.PATH).orElseThrow(() -> {
            return new ValidationException("Option [path] should not be empty.");
        })), context.getCatalogTable().getPartitionKeys(), fromMap.getString(FlinkOptions.PARTITION_DEFAULT_NAME), fromMap);
    }

    public DynamicTableSink createDynamicTableSink(DynamicTableFactory.Context context) {
        Configuration fromMap = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
        ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(fromMap.getString(FlinkOptions.PATH)), "Option [path] should not be empty.");
        ResolvedSchema resolvedSchema = context.getCatalogTable().getResolvedSchema();
        sanityCheck(fromMap, resolvedSchema);
        setupConfOptions(fromMap, context.getObjectIdentifier(), context.getCatalogTable(), resolvedSchema);
        return new HoodieTableSink(fromMap, resolvedSchema);
    }

    public String factoryIdentifier() {
        return "hudi";
    }

    public Set<ConfigOption<?>> requiredOptions() {
        return Collections.singleton(FlinkOptions.PATH);
    }

    public Set<ConfigOption<?>> optionalOptions() {
        return FlinkOptions.optionalOptions();
    }

    private void sanityCheck(Configuration configuration, ResolvedSchema resolvedSchema) {
        List columnNames = resolvedSchema.getColumnNames();
        if (!resolvedSchema.getPrimaryKey().isPresent()) {
            String[] split = ((String) configuration.get(FlinkOptions.RECORD_KEY_FIELD)).split(",");
            if (split.length == 1 && ((String) FlinkOptions.RECORD_KEY_FIELD.defaultValue()).equals(split[0]) && !columnNames.contains(split[0])) {
                throw new HoodieValidationException("Primary key definition is required, use either PRIMARY KEY syntax or option '" + FlinkOptions.RECORD_KEY_FIELD.key() + "' to specify.");
            }
            Arrays.stream(split).filter(str -> {
                return !columnNames.contains(str);
            }).findAny().ifPresent(str2 -> {
                throw new HoodieValidationException("Field '" + str2 + "' specified in option '" + FlinkOptions.RECORD_KEY_FIELD.key() + "' does not exist in the table schema.");
            });
        }
        String str3 = (String) configuration.get(FlinkOptions.PRECOMBINE_FIELD);
        if (columnNames.contains(str3)) {
            return;
        }
        if (OptionsResolver.isDefaultHoodieRecordPayloadClazz(configuration)) {
            throw new HoodieValidationException("Option '" + FlinkOptions.PRECOMBINE_FIELD.key() + "' is required for payload class: " + DefaultHoodieRecordPayload.class.getName());
        }
        if (str3.equals(FlinkOptions.PRECOMBINE_FIELD.defaultValue())) {
            configuration.setString(FlinkOptions.PRECOMBINE_FIELD, FlinkOptions.NO_PRE_COMBINE);
        } else if (!str3.equals(FlinkOptions.NO_PRE_COMBINE)) {
            throw new HoodieValidationException("Field " + str3 + " does not exist in the table schema.Please check '" + FlinkOptions.PRECOMBINE_FIELD.key() + "' option.");
        }
    }

    private static void setupConfOptions(Configuration configuration, ObjectIdentifier objectIdentifier, CatalogTable catalogTable, ResolvedSchema resolvedSchema) {
        configuration.setString(FlinkOptions.TABLE_NAME.key(), objectIdentifier.getObjectName());
        setupHoodieKeyOptions(configuration, catalogTable);
        setupCompactionOptions(configuration);
        setupHiveOptions(configuration, objectIdentifier);
        setupReadOptions(configuration);
        setupWriteOptions(configuration);
        inferAvroSchema(configuration, resolvedSchema.toPhysicalRowDataType().notNull().getLogicalType());
    }

    private static void setupHoodieKeyOptions(Configuration configuration, CatalogTable catalogTable) {
        List list = (List) catalogTable.getSchema().getPrimaryKey().map((v0) -> {
            return v0.getColumns();
        }).orElse(Collections.emptyList());
        if (list.size() > 0) {
            configuration.setString(FlinkOptions.RECORD_KEY_FIELD, String.join(",", list));
        }
        List partitionKeys = catalogTable.getPartitionKeys();
        if (partitionKeys.size() > 0) {
            configuration.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",", partitionKeys));
        }
        if (configuration.getString(FlinkOptions.INDEX_TYPE).equals(HoodieIndex.IndexType.BUCKET.name())) {
            if (configuration.getString(FlinkOptions.INDEX_KEY_FIELD).isEmpty()) {
                configuration.setString(FlinkOptions.INDEX_KEY_FIELD, configuration.getString(FlinkOptions.RECORD_KEY_FIELD));
            } else if (!((Set) Arrays.stream(configuration.getString(FlinkOptions.RECORD_KEY_FIELD).split(",")).collect(Collectors.toSet())).containsAll((Set) Arrays.stream(configuration.getString(FlinkOptions.INDEX_KEY_FIELD).split(",")).collect(Collectors.toSet()))) {
                throw new HoodieValidationException(FlinkOptions.INDEX_KEY_FIELD + " should be a subset of or equal to the recordKey fields");
            }
        }
        String[] split = configuration.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",");
        String[] split2 = configuration.getString(FlinkOptions.RECORD_KEY_FIELD).split(",");
        if (split.length == 1) {
            String str = split[0];
            if (str.isEmpty()) {
                configuration.setString(FlinkOptions.KEYGEN_CLASS_NAME, NonpartitionedAvroKeyGenerator.class.getName());
                LOG.info("Table option [{}] is reset to {} because this is a non-partitioned table", FlinkOptions.KEYGEN_CLASS_NAME.key(), NonpartitionedAvroKeyGenerator.class.getName());
                return;
            } else {
                DataType dataType = (DataType) catalogTable.getSchema().getFieldDataType(str).orElseThrow(() -> {
                    return new HoodieValidationException("Field " + str + " does not exist");
                });
                if (split2.length <= 1 && DataTypeUtils.isDatetimeType(dataType)) {
                    setupTimestampKeygenOptions(configuration, dataType);
                    return;
                }
            }
        }
        if ((split2.length > 1 || split.length > 1) && FlinkOptions.isDefaultValueDefined(configuration, FlinkOptions.KEYGEN_CLASS_NAME)) {
            configuration.setString(FlinkOptions.KEYGEN_CLASS_NAME, ComplexAvroKeyGenerator.class.getName());
            LOG.info("Table option [{}] is reset to {} because record key or partition path has two or more fields", FlinkOptions.KEYGEN_CLASS_NAME.key(), ComplexAvroKeyGenerator.class.getName());
        }
    }

    public static void setupTimestampKeygenOptions(Configuration configuration, DataType dataType) {
        if (configuration.contains(FlinkOptions.KEYGEN_CLASS_NAME)) {
            return;
        }
        configuration.setString(FlinkOptions.KEYGEN_CLASS_NAME, TimestampBasedAvroKeyGenerator.class.getName());
        LOG.info("Table option [{}] is reset to {} because datetime partitioning turns on", FlinkOptions.KEYGEN_CLASS_NAME.key(), TimestampBasedAvroKeyGenerator.class.getName());
        if (DataTypeUtils.isTimestampType(dataType)) {
            int precision = DataTypeUtils.precision(dataType.getLogicalType());
            if (precision == 0) {
                configuration.setString(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP, TimestampBasedAvroKeyGenerator.TimestampType.UNIX_TIMESTAMP.name());
            } else if (precision == 3) {
                configuration.setString(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP, TimestampBasedAvroKeyGenerator.TimestampType.EPOCHMILLISECONDS.name());
            }
            configuration.setString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, (String) configuration.getOptional(FlinkOptions.PARTITION_FORMAT).orElse(FlinkOptions.PARTITION_FORMAT_HOUR));
        } else {
            configuration.setString(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP, TimestampBasedAvroKeyGenerator.TimestampType.SCALAR.name());
            configuration.setString(KeyGeneratorOptions.Config.INPUT_TIME_UNIT, TimeUnit.DAYS.toString());
            configuration.setString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, (String) configuration.getOptional(FlinkOptions.PARTITION_FORMAT).orElse(FlinkOptions.PARTITION_FORMAT_DAY));
            configuration.setString(KeyGeneratorOptions.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, FlinkOptions.PARTITION_FORMAT_DAY);
        }
        configuration.setString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP, "UTC");
    }

    private static void setupCompactionOptions(Configuration configuration) {
        int integer = configuration.getInteger(FlinkOptions.CLEAN_RETAIN_COMMITS);
        if (integer >= configuration.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS)) {
            LOG.info("Table option [{}] is reset to {} to be greater than {}={},\nto avoid risk of missing data from few instants in incremental pull", new Object[]{FlinkOptions.ARCHIVE_MIN_COMMITS.key(), Integer.valueOf(integer + 10), FlinkOptions.CLEAN_RETAIN_COMMITS.key(), Integer.valueOf(integer)});
            configuration.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, integer + 10);
            configuration.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, integer + 20);
        }
    }

    private static void setupHiveOptions(Configuration configuration, ObjectIdentifier objectIdentifier) {
        if (!configuration.contains(FlinkOptions.HIVE_SYNC_DB)) {
            configuration.setString(FlinkOptions.HIVE_SYNC_DB, objectIdentifier.getDatabaseName());
        }
        if (configuration.contains(FlinkOptions.HIVE_SYNC_TABLE)) {
            return;
        }
        configuration.setString(FlinkOptions.HIVE_SYNC_TABLE, objectIdentifier.getObjectName());
    }

    private static void setupReadOptions(Configuration configuration) {
        if (configuration.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
            return;
        }
        if (configuration.getOptional(FlinkOptions.READ_START_COMMIT).isPresent() || configuration.getOptional(FlinkOptions.READ_END_COMMIT).isPresent()) {
            configuration.setString(FlinkOptions.QUERY_TYPE, FlinkOptions.QUERY_TYPE_INCREMENTAL);
        }
    }

    private static void setupWriteOptions(Configuration configuration) {
        if (FlinkOptions.isDefaultValueDefined(configuration, FlinkOptions.OPERATION) && OptionsResolver.isCowTable(configuration)) {
            configuration.setBoolean(FlinkOptions.PRE_COMBINE, true);
        }
    }

    private static void inferAvroSchema(Configuration configuration, LogicalType logicalType) {
        if (configuration.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH).isPresent() || configuration.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA).isPresent()) {
            return;
        }
        configuration.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, AvroSchemaConverter.convertToSchema(logicalType).toString());
    }
}
