/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connectors.hive;

import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connectors.hive.HiveOptions;
import org.apache.flink.connectors.hive.HiveTablePartition;
import org.apache.flink.connectors.hive.HiveTableSource;
import org.apache.flink.connectors.hive.JobConfWrapper;
import org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader;
import org.apache.flink.connectors.hive.read.HivePartitionFetcherContextBase;
import org.apache.flink.connectors.hive.util.HivePartitionUtils;
import org.apache.flink.connectors.hive.util.JobConfUtils;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.filesystem.FileSystemConnectorOptions;
import org.apache.flink.table.filesystem.FileSystemLookupFunction;
import org.apache.flink.table.filesystem.PartitionFetcher;
import org.apache.flink.table.filesystem.PartitionReader;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.mapred.JobConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveLookupTableSource
extends HiveTableSource
implements LookupTableSource {
    private static final Logger LOG = LoggerFactory.getLogger(HiveLookupTableSource.class);
    private static final Duration DEFAULT_LOOKUP_MONITOR_INTERVAL = Duration.ofHours(1L);
    private final Configuration configuration = new Configuration();
    private Duration hiveTableReloadInterval;

    public HiveLookupTableSource(JobConf jobConf, ReadableConfig flinkConf, ObjectPath tablePath, CatalogTable catalogTable) {
        super(jobConf, flinkConf, tablePath, catalogTable);
        catalogTable.getOptions().forEach((arg_0, arg_1) -> ((Configuration)this.configuration).setString(arg_0, arg_1));
        this.validateLookupConfigurations();
    }

    public LookupTableSource.LookupRuntimeProvider getLookupRuntimeProvider(LookupTableSource.LookupContext context) {
        return TableFunctionProvider.of(this.getLookupFunction(context.getKeys()));
    }

    @VisibleForTesting
    TableFunction<RowData> getLookupFunction(int[][] keys) {
        int[] keyIndices = new int[keys.length];
        int i = 0;
        for (int[] key : keys) {
            if (key.length > 1) {
                throw new UnsupportedOperationException("Hive lookup can not support nested key now.");
            }
            keyIndices[i] = key[0];
            ++i;
        }
        return this.getLookupFunction(keyIndices);
    }

    private void validateLookupConfigurations() {
        String partitionInclude = (String)this.configuration.get(FileSystemConnectorOptions.STREAMING_SOURCE_PARTITION_INCLUDE);
        if (this.isStreamingSource()) {
            Duration monitorInterval;
            Preconditions.checkArgument((!this.configuration.contains(FileSystemConnectorOptions.STREAMING_SOURCE_CONSUME_START_OFFSET) ? 1 : 0) != 0, (Object)String.format("The '%s' is not supported when set '%s' to 'latest'", FileSystemConnectorOptions.STREAMING_SOURCE_CONSUME_START_OFFSET.key(), FileSystemConnectorOptions.STREAMING_SOURCE_PARTITION_INCLUDE.key()));
            Duration duration = monitorInterval = this.configuration.get(FileSystemConnectorOptions.STREAMING_SOURCE_MONITOR_INTERVAL) == null ? DEFAULT_LOOKUP_MONITOR_INTERVAL : (Duration)this.configuration.get(FileSystemConnectorOptions.STREAMING_SOURCE_MONITOR_INTERVAL);
            if (monitorInterval.toMillis() < DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis()) {
                LOG.warn(String.format("Currently the recommended value of '%s' is at least '%s' when set '%s' to 'latest', but actual is '%s', this may produce big pressure to hive metastore.", FileSystemConnectorOptions.STREAMING_SOURCE_MONITOR_INTERVAL.key(), DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(), FileSystemConnectorOptions.STREAMING_SOURCE_PARTITION_INCLUDE.key(), monitorInterval.toMillis()));
            }
            this.hiveTableReloadInterval = monitorInterval;
        } else {
            Preconditions.checkArgument((boolean)"all".equals(partitionInclude), (Object)String.format("The only supported %s for lookup is '%s' in batch source, but actual is '%s'", FileSystemConnectorOptions.STREAMING_SOURCE_PARTITION_INCLUDE.key(), "all", partitionInclude));
            this.hiveTableReloadInterval = (Duration)this.configuration.get(FileSystemConnectorOptions.LOOKUP_JOIN_CACHE_TTL);
        }
    }

    private TableFunction<RowData> getLookupFunction(int[] keys) {
        String defaultPartitionName = JobConfUtils.getDefaultPartitionName(this.jobConf);
        HiveTablePartitionFetcherContext fetcherContext = new HiveTablePartitionFetcherContext(this.tablePath, this.hiveShim, new JobConfWrapper(this.jobConf), this.catalogTable.getPartitionKeys(), this.getProducedTableSchema().getFieldDataTypes(), this.getProducedTableSchema().getFieldNames(), this.configuration, defaultPartitionName);
        ObjectPath tableFullPath = this.tablePath;
        PartitionFetcher & Serializable partitionFetcher = this.catalogTable.getPartitionKeys().isEmpty() ? (PartitionFetcher & Serializable)context -> {
            ArrayList partValueList = new ArrayList();
            partValueList.add(context.getPartition(new ArrayList()).orElseThrow(() -> new IllegalArgumentException(String.format("Fetch partition fail for hive table %s.", tableFullPath))));
            return partValueList;
        } : (this.isStreamingSource() ? (PartitionFetcher & Serializable)context -> {
            ArrayList partValueList = new ArrayList();
            List comparablePartitionValues = context.getComparablePartitionValueList();
            if (comparablePartitionValues.size() <= 0) {
                throw new IllegalArgumentException(String.format("At least one partition is required when set '%s' to 'latest' in temporal join, but actual partition number is '%s' for hive table %s", FileSystemConnectorOptions.STREAMING_SOURCE_PARTITION_INCLUDE.key(), comparablePartitionValues.size(), tableFullPath));
            }
            comparablePartitionValues.sort((o1, o2) -> o2.getComparator().compareTo(o1.getComparator()));
            PartitionFetcher.Context.ComparablePartitionValue maxPartition = (PartitionFetcher.Context.ComparablePartitionValue)comparablePartitionValues.get(0);
            partValueList.add(context.getPartition((List)maxPartition.getPartitionValue()).orElseThrow(() -> new IllegalArgumentException(String.format("Fetch partition fail for hive table %s.", tableFullPath))));
            return partValueList;
        } : (PartitionFetcher & Serializable)context -> {
            ArrayList partValueList = new ArrayList();
            List comparablePartitionValues = context.getComparablePartitionValueList();
            for (PartitionFetcher.Context.ComparablePartitionValue comparablePartitionValue : comparablePartitionValues) {
                partValueList.add(context.getPartition((List)comparablePartitionValue.getPartitionValue()).orElseThrow(() -> new IllegalArgumentException(String.format("Fetch partition fail for hive table %s.", tableFullPath))));
            }
            return partValueList;
        });
        HiveInputFormatPartitionReader partitionReader = new HiveInputFormatPartitionReader(this.jobConf, this.hiveVersion, this.tablePath, this.getProducedTableSchema().getFieldDataTypes(), this.getProducedTableSchema().getFieldNames(), this.catalogTable.getPartitionKeys(), this.projectedFields, (Boolean)this.flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));
        return new FileSystemLookupFunction((PartitionFetcher)partitionFetcher, (PartitionFetcher.Context)fetcherContext, (PartitionReader)partitionReader, (RowType)this.getProducedTableSchema().toRowDataType().getLogicalType(), keys, this.hiveTableReloadInterval);
    }

    static class HiveTablePartitionFetcherContext
    extends HivePartitionFetcherContextBase<HiveTablePartition> {
        private static final long serialVersionUID = 1L;

        public HiveTablePartitionFetcherContext(ObjectPath tablePath, HiveShim hiveShim, JobConfWrapper confWrapper, List<String> partitionKeys, DataType[] fieldTypes, String[] fieldNames, Configuration configuration, String defaultPartitionName) {
            super(tablePath, hiveShim, confWrapper, partitionKeys, fieldTypes, fieldNames, configuration, defaultPartitionName);
        }

        public Optional<HiveTablePartition> getPartition(List<String> partValues) throws Exception {
            Preconditions.checkArgument((this.partitionKeys.size() == partValues.size() ? 1 : 0) != 0, (Object)String.format("The partition keys length should equal to partition values length, but partition keys length is %s and partition values length is %s", this.partitionKeys.size(), partValues.size()));
            if (this.partitionKeys.isEmpty()) {
                return Optional.of(new HiveTablePartition(this.tableSd, this.tableProps));
            }
            try {
                Partition partition = this.metaStoreClient.getPartition(this.tablePath.getDatabaseName(), this.tablePath.getObjectName(), partValues);
                HiveTablePartition hiveTablePartition = HivePartitionUtils.toHiveTablePartition(this.partitionKeys, this.tableProps, partition);
                return Optional.of(hiveTablePartition);
            }
            catch (NoSuchObjectException e) {
                return Optional.empty();
            }
        }
    }
}

