/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.util;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.BiPredicate;
import org.apache.avro.Schema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.schema.FilebasedSchemaProvider;
import org.apache.hudi.sink.transform.ChainedTransformer;
import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamerUtil {
    private static final Logger LOG = LoggerFactory.getLogger(StreamerUtil.class);

    public static TypedProperties appendKafkaProps(FlinkStreamerConfig config) {
        TypedProperties properties = StreamerUtil.getProps(config);
        properties.put((Object)"bootstrap.servers", (Object)config.kafkaBootstrapServers);
        properties.put((Object)"group.id", (Object)config.kafkaGroupId);
        return properties;
    }

    public static TypedProperties getProps(FlinkStreamerConfig cfg) {
        if (cfg.propsFilePath.isEmpty()) {
            return new TypedProperties();
        }
        return StreamerUtil.readConfig(HadoopConfigurations.getHadoopConf(cfg), new StoragePath(cfg.propsFilePath), cfg.configs).getProps();
    }

    public static TypedProperties buildProperties(List<String> props) {
        TypedProperties properties = DFSPropertiesConfiguration.getGlobalProps();
        props.forEach(x -> {
            String[] kv = x.split("=");
            ValidationUtils.checkArgument((kv.length == 2 ? 1 : 0) != 0);
            properties.setProperty(kv[0], kv[1]);
        });
        return properties;
    }

    public static Schema getSourceSchema(Configuration conf) {
        if (conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH).isPresent()) {
            return new FilebasedSchemaProvider(conf).getSourceSchema();
        }
        if (conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA).isPresent()) {
            String schemaStr = (String)conf.get(FlinkOptions.SOURCE_AVRO_SCHEMA);
            return new Schema.Parser().parse(schemaStr);
        }
        String errorMsg = String.format("Either option '%s' or '%s' should be specified for avro schema deserialization", FlinkOptions.SOURCE_AVRO_SCHEMA_PATH.key(), FlinkOptions.SOURCE_AVRO_SCHEMA.key());
        throw new HoodieException(errorMsg);
    }

    public static DFSPropertiesConfiguration readConfig(org.apache.hadoop.conf.Configuration hadoopConfig, StoragePath cfgPath, List<String> overriddenProps) {
        DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration(hadoopConfig, cfgPath);
        try {
            if (!overriddenProps.isEmpty()) {
                LOG.info("Adding overridden properties to file properties.");
                conf.addPropsFromStream(new BufferedReader(new StringReader(String.join((CharSequence)"\n", overriddenProps))), cfgPath);
            }
        }
        catch (IOException ioe) {
            throw new HoodieIOException("Unexpected error adding config overrides", ioe);
        }
        return conf;
    }

    public static HoodiePayloadConfig getPayloadConfig(Configuration conf) {
        return HoodiePayloadConfig.newBuilder().withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME)).withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD)).withPayloadEventTimeField(conf.getString(FlinkOptions.PRECOMBINE_FIELD)).build();
    }

    public static HoodieIndexConfig getIndexConfig(Configuration conf) {
        return HoodieIndexConfig.newBuilder().withIndexType(OptionsResolver.getIndexType(conf)).withBucketNum(String.valueOf(conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS))).withRecordKeyField(conf.getString(FlinkOptions.RECORD_KEY_FIELD)).withIndexKeyField(OptionsResolver.getIndexKeyField(conf)).withBucketIndexEngineType(OptionsResolver.getBucketEngineType(conf)).withEngineType(EngineType.FLINK).build();
    }

    public static Option<HoodieLockConfig> getLockConfig(Configuration conf) {
        if (OptionsResolver.isLockRequired(conf) && !conf.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key())) {
            return Option.of((Object)HoodieLockConfig.newBuilder().fromProperties((Properties)FileSystemBasedLockProvider.getLockConfig((String)conf.getString(FlinkOptions.PATH))).withConflictResolutionStrategy(OptionsResolver.getConflictResolutionStrategy(conf)).build());
        }
        return Option.empty();
    }

    public static HoodieTimeGeneratorConfig getTimeGeneratorConfig(Configuration conf) {
        TypedProperties properties = StreamerUtil.flinkConf2TypedProperties(conf);
        Option<HoodieLockConfig> lockConfig = StreamerUtil.getLockConfig(conf);
        if (lockConfig.isPresent()) {
            properties.putAll((Map)((HoodieLockConfig)lockConfig.get()).getProps());
        }
        return HoodieTimeGeneratorConfig.newBuilder().withPath(conf.getString(FlinkOptions.PATH)).fromProperties((Properties)properties).build();
    }

    public static TypedProperties flinkConf2TypedProperties(Configuration conf) {
        Configuration flatConf = FlinkOptions.flatOptions(conf);
        TypedProperties properties = new TypedProperties();
        flatConf.addAllToProperties((Properties)properties);
        for (ConfigOption<?> option : FlinkOptions.optionalOptions()) {
            if (flatConf.contains(option) || !option.hasDefaultValue()) continue;
            properties.put((Object)option.key(), option.defaultValue());
        }
        properties.put((Object)HoodieTableConfig.TYPE.key(), (Object)conf.getString(FlinkOptions.TABLE_TYPE));
        return properties;
    }

    public static HoodieTableMetaClient initTableIfNotExists(Configuration conf) throws IOException {
        return StreamerUtil.initTableIfNotExists(conf, HadoopConfigurations.getHadoopConf(conf));
    }

    public static HoodieTableMetaClient initTableIfNotExists(Configuration conf, org.apache.hadoop.conf.Configuration hadoopConf) throws IOException {
        String basePath = conf.getString(FlinkOptions.PATH);
        if (!StreamerUtil.tableExists(basePath, hadoopConf)) {
            HoodieTableMetaClient.newTableBuilder().setTableCreateSchema(conf.getString(FlinkOptions.SOURCE_AVRO_SCHEMA)).setTableType(conf.getString(FlinkOptions.TABLE_TYPE)).setTableName(conf.getString(FlinkOptions.TABLE_NAME)).setTableVersion(conf.getInteger(FlinkOptions.WRITE_TABLE_VERSION)).setDatabaseName(conf.getString(FlinkOptions.DATABASE_NAME)).setRecordKeyFields(conf.getString(FlinkOptions.RECORD_KEY_FIELD, null)).setPayloadClassName(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME)).setPreCombineField(OptionsResolver.getPreCombineField(conf)).setArchiveLogFolder((String)HoodieTableConfig.TIMELINE_HISTORY_PATH.defaultValue()).setPartitionFields(conf.getString(FlinkOptions.PARTITION_PATH_FIELD, null)).setKeyGeneratorClassProp(conf.getOptional(FlinkOptions.KEYGEN_CLASS_NAME).orElse(SimpleAvroKeyGenerator.class.getName())).setHiveStylePartitioningEnable(Boolean.valueOf(conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING))).setUrlEncodePartitioning(Boolean.valueOf(conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING))).setCDCEnabled(conf.getBoolean(FlinkOptions.CDC_ENABLED)).setCDCSupplementalLoggingMode(conf.getString(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE)).setPopulateMetaFields(OptionsResolver.isPopulateMetaFields(conf)).initTable(HadoopFSUtils.getStorageConfWithCopy((org.apache.hadoop.conf.Configuration)hadoopConf), basePath);
            LOG.info("Table initialized under base path {}", (Object)basePath);
        } else {
            LOG.info("Table [path={}, name={}] already exists, no need to initialize the table", (Object)basePath, (Object)conf.getString(FlinkOptions.TABLE_NAME));
        }
        return StreamerUtil.createMetaClient(conf, hadoopConf);
    }

    public static boolean tableExists(String basePath, org.apache.hadoop.conf.Configuration hadoopConf) {
        FileSystem fs = HadoopFSUtils.getFs((String)basePath, (org.apache.hadoop.conf.Configuration)hadoopConf);
        try {
            return fs.exists(new Path(basePath, ".hoodie")) && fs.exists(new Path(new Path(basePath, ".hoodie"), "hoodie.properties"));
        }
        catch (IOException e) {
            throw new HoodieException("Error while checking whether table exists under path:" + basePath, (Throwable)e);
        }
    }

    public static boolean partitionExists(String tablePath, String partitionPath, org.apache.hadoop.conf.Configuration hadoopConf) {
        FileSystem fs = HadoopFSUtils.getFs((String)tablePath, (org.apache.hadoop.conf.Configuration)hadoopConf);
        try {
            return fs.exists(new Path(tablePath, partitionPath));
        }
        catch (IOException e) {
            throw new HoodieException(String.format("Error while checking whether partition exists under table path [%s] and partition path [%s]", tablePath, partitionPath), (Throwable)e);
        }
    }

    public static String generateBucketKey(String partitionPath, String fileId) {
        return partitionPath + "_" + fileId;
    }

    public static HoodieTableMetaClient metaClientForReader(Configuration conf, org.apache.hadoop.conf.Configuration hadoopConf) {
        String basePath = conf.getString(FlinkOptions.PATH);
        if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING) && !StreamerUtil.tableExists(basePath, hadoopConf)) {
            return null;
        }
        return StreamerUtil.createMetaClient(basePath, hadoopConf);
    }

    public static HoodieTableMetaClient createMetaClient(String basePath, org.apache.hadoop.conf.Configuration hadoopConf) {
        return HoodieTableMetaClient.builder().setBasePath(basePath).setConf(HadoopFSUtils.getStorageConfWithCopy((org.apache.hadoop.conf.Configuration)hadoopConf)).build();
    }

    public static HoodieTableMetaClient createMetaClient(Configuration conf) {
        return StreamerUtil.createMetaClient(conf, HadoopConfigurations.getHadoopConf(conf));
    }

    public static HoodieTableMetaClient createMetaClient(Configuration conf, org.apache.hadoop.conf.Configuration hadoopConf) {
        return HoodieTableMetaClient.builder().setBasePath(conf.getString(FlinkOptions.PATH)).setConf(HadoopFSUtils.getStorageConfWithCopy((org.apache.hadoop.conf.Configuration)hadoopConf)).setTimeGeneratorConfig(StreamerUtil.getTimeGeneratorConfig(conf)).build();
    }

    public static Option<HoodieTableConfig> getTableConfig(String basePath, org.apache.hadoop.conf.Configuration hadoopConf) {
        HoodieStorage storage = HoodieStorageUtils.getStorage((String)basePath, (StorageConfiguration)HadoopFSUtils.getStorageConf((org.apache.hadoop.conf.Configuration)hadoopConf));
        StoragePath metaPath = new StoragePath(basePath, ".hoodie");
        try {
            if (storage.exists(new StoragePath(metaPath, "hoodie.properties"))) {
                return Option.of((Object)new HoodieTableConfig(storage, metaPath, null, null, null));
            }
        }
        catch (IOException e) {
            throw new HoodieIOException("Get table config error", e);
        }
        return Option.empty();
    }

    public static Option<String> medianInstantTime(String highVal, String lowVal) {
        long low;
        long high = ((Date)TimelineUtils.parseDateFromInstantTimeSafely((String)highVal).orElseThrow(() -> new HoodieException("Get instant time diff with interval [" + highVal + "] error"))).getTime();
        ValidationUtils.checkArgument((high > (low = ((Date)TimelineUtils.parseDateFromInstantTimeSafely((String)lowVal).orElseThrow(() -> new HoodieException("Get instant time diff with interval [" + lowVal + "] error"))).getTime()) ? 1 : 0) != 0, (String)("Instant [" + highVal + "] should have newer timestamp than instant [" + lowVal + "]"));
        long median = low + (high - low) / 2L;
        String instantTime = TimelineUtils.formatDate((Date)new Date(median));
        if (InstantComparison.compareTimestamps((String)lowVal, (BiPredicate)InstantComparison.GREATER_THAN_OR_EQUALS, (String)instantTime) || InstantComparison.compareTimestamps((String)highVal, (BiPredicate)InstantComparison.LESSER_THAN_OR_EQUALS, (String)instantTime)) {
            return Option.empty();
        }
        return Option.of((Object)instantTime);
    }

    public static long instantTimeDiffSeconds(String newInstantTime, String oldInstantTime) {
        long newTimestamp = ((Date)TimelineUtils.parseDateFromInstantTimeSafely((String)newInstantTime).orElseThrow(() -> new HoodieException("Get instant time diff with interval [" + oldInstantTime + ", " + newInstantTime + "] error"))).getTime();
        long oldTimestamp = ((Date)TimelineUtils.parseDateFromInstantTimeSafely((String)oldInstantTime).orElseThrow(() -> new HoodieException("Get instant time diff with interval [" + oldInstantTime + ", " + newInstantTime + "] error"))).getTime();
        return (newTimestamp - oldTimestamp) / 1000L;
    }

    public static Option<Transformer> createTransformer(List<String> classNames) throws IOException {
        try {
            ArrayList<Transformer> transformers = new ArrayList<Transformer>();
            for (String className : (List)Option.ofNullable(classNames).orElse(Collections.emptyList())) {
                transformers.add((Transformer)ReflectionUtils.loadClass((String)className));
            }
            return transformers.isEmpty() ? Option.empty() : Option.of((Object)new ChainedTransformer(transformers));
        }
        catch (Throwable e) {
            throw new IOException("Could not load transformer class(es) " + classNames, e);
        }
    }

    public static boolean isValidFile(StoragePathInfo pathInfo) {
        String extension = FSUtils.getFileExtension((String)pathInfo.getPath().toString());
        if (HoodieFileFormat.PARQUET.getFileExtension().equals(extension)) {
            return pathInfo.getLength() > (long)ParquetFileWriter.MAGIC.length;
        }
        if (HoodieFileFormat.ORC.getFileExtension().equals(extension)) {
            return pathInfo.getLength() > (long)"ORC".length();
        }
        if (HoodieFileFormat.HOODIE_LOG.getFileExtension().equals(extension)) {
            return pathInfo.getLength() > (long)HoodieLogFormat.MAGIC.length;
        }
        return pathInfo.getLength() > 0L;
    }

    public static String getLastPendingInstant(HoodieTableMetaClient metaClient) {
        return StreamerUtil.getLastPendingInstant(metaClient, true);
    }

    public static String getLastPendingInstant(HoodieTableMetaClient metaClient, boolean reloadTimeline) {
        if (reloadTimeline) {
            metaClient.reloadActiveTimeline();
        }
        return (String)metaClient.getCommitsTimeline().filterPendingExcludingCompaction().lastInstant().map(HoodieInstant::requestedTime).orElse(null);
    }

    public static String getLastCompletedInstant(HoodieTableMetaClient metaClient) {
        return (String)metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::requestedTime).orElse(null);
    }

    public static boolean haveSuccessfulCommits(HoodieTableMetaClient metaClient) {
        return !metaClient.getCommitsTimeline().filterCompletedInstants().empty();
    }

    public static long getMaxCompactionMemoryInBytes(Configuration conf) {
        return (long)conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024L * 1024L;
    }

    public static Schema getTableAvroSchema(HoodieTableMetaClient metaClient, boolean includeMetadataFields) throws Exception {
        TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
        return schemaUtil.getTableAvroSchema(includeMetadataFields);
    }

    public static Schema getLatestTableSchema(String path, org.apache.hadoop.conf.Configuration hadoopConf) {
        if (StringUtils.isNullOrEmpty((String)path) || !StreamerUtil.tableExists(path, hadoopConf)) {
            return null;
        }
        try {
            HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(path, hadoopConf);
            return StreamerUtil.getTableAvroSchema(metaClient, false);
        }
        catch (Exception e) {
            LOG.warn("Error while resolving the latest table schema", (Throwable)e);
            return null;
        }
    }

    public static boolean fileExists(HoodieStorage storage, StoragePath path) {
        try {
            return storage.exists(path);
        }
        catch (IOException e) {
            throw new HoodieException("Exception while checking file " + path + " existence", (Throwable)e);
        }
    }

    public static boolean isWriteCommit(HoodieTableType tableType, HoodieInstant instant, HoodieTimeline timeline) {
        return tableType == HoodieTableType.MERGE_ON_READ ? !instant.getAction().equals("commit") : !ClusteringUtils.isCompletedClusteringInstant((HoodieInstant)instant, (HoodieTimeline)timeline);
    }

    public static void checkPreCombineKey(Configuration conf, List<String> fields) {
        String preCombineField = (String)conf.get(FlinkOptions.PRECOMBINE_FIELD);
        if (!fields.contains(preCombineField)) {
            if (OptionsResolver.isDefaultHoodieRecordPayloadClazz(conf)) {
                throw new HoodieValidationException("Option '" + FlinkOptions.PRECOMBINE_FIELD.key() + "' is required for payload class: " + DefaultHoodieRecordPayload.class.getName());
            }
            if (preCombineField.equals(FlinkOptions.PRECOMBINE_FIELD.defaultValue())) {
                conf.setString(FlinkOptions.PRECOMBINE_FIELD, "no_precombine");
            } else if (!preCombineField.equals("no_precombine")) {
                throw new HoodieValidationException("Field " + preCombineField + " does not exist in the table schema.Please check '" + FlinkOptions.PRECOMBINE_FIELD.key() + "' option.");
            }
        }
    }

    public static void checkKeygenGenerator(boolean isComplexHoodieKey, Configuration conf) {
        if (isComplexHoodieKey && FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.KEYGEN_CLASS_NAME)) {
            conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, ComplexAvroKeyGenerator.class.getName());
            LOG.info("Table option [{}] is reset to {} because record key or partition path has two or more fields", (Object)FlinkOptions.KEYGEN_CLASS_NAME.key(), (Object)ComplexAvroKeyGenerator.class.getName());
        }
    }

    public static HoodieMetadataConfig metadataConfig(Configuration conf) {
        Properties properties = new Properties();
        properties.put(HoodieMetadataConfig.ENABLE.key(), (Object)conf.getBoolean(FlinkOptions.METADATA_ENABLED));
        return HoodieMetadataConfig.newBuilder().fromProperties(properties).build();
    }
}

