/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.configuration;

import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy;
import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.hive.ddl.HiveSyncMode;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;

@ConfigClassProperty(name="Flink Options", groupName=ConfigGroups.Names.FLINK_SQL, description="Flink jobs using the SQL can be configured through the options in WITH clause. The actual datasource level configs are listed below.")
public class FlinkOptions
extends HoodieConfig {
    public static final ConfigOption<String> PATH = ConfigOptions.key("path").stringType().noDefaultValue().withDescription("Base path for the target hoodie table.\nThe path would be created if it does not exist,\notherwise a Hoodie table expects to be initialized successfully");
    public static final ConfigOption<String> DATABASE_NAME = ConfigOptions.key(HoodieTableConfig.DATABASE_NAME.key()).stringType().noDefaultValue().withDescription("Database name to register to Hive metastore");
    public static final ConfigOption<String> TABLE_NAME = ConfigOptions.key(HoodieWriteConfig.TBL_NAME.key()).stringType().noDefaultValue().withDescription("Table name to register to Hive metastore");
    public static final String TABLE_TYPE_COPY_ON_WRITE = HoodieTableType.COPY_ON_WRITE.name();
    public static final String TABLE_TYPE_MERGE_ON_READ = HoodieTableType.MERGE_ON_READ.name();
    public static final ConfigOption<String> TABLE_TYPE = ConfigOptions.key("table.type").stringType().defaultValue(TABLE_TYPE_COPY_ON_WRITE).withDescription("Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ");
    public static final String NO_PRE_COMBINE = "no_precombine";
    public static final ConfigOption<String> PRECOMBINE_FIELD = ConfigOptions.key("precombine.field").stringType().defaultValue("ts").withFallbackKeys("write.precombine.field").withDescription("Field used in preCombining before actual write. When two records have the same\nkey value, we will pick the one with the largest value for the precombine field,\ndetermined by Object.compareTo(..)");
    public static final ConfigOption<String> PAYLOAD_CLASS_NAME = ConfigOptions.key("payload.class").stringType().defaultValue(EventTimeAvroPayload.class.getName()).withFallbackKeys("write.payload.class").withDescription("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting.\nThis will render any value set for the option in-effective");
    public static final ConfigOption<String> PARTITION_DEFAULT_NAME = ConfigOptions.key("partition.default_name").stringType().defaultValue("__HIVE_DEFAULT_PARTITION__").withDescription("The default partition name in case the dynamic partition column value is null/empty string");
    public static final ConfigOption<Boolean> CHANGELOG_ENABLED = ConfigOptions.key("changelog.enabled").booleanType().defaultValue(false).withDescription("Whether to keep all the intermediate changes, we try to keep all the changes of a record when enabled:\n1). The sink accept the UPDATE_BEFORE message;\n2). The source try to emit every changes of a record.\nThe semantics is best effort because the compaction job would finally merge all changes of a record into one.\n default false to have UPSERT semantics");
    public static final ConfigOption<Boolean> METADATA_ENABLED = ConfigOptions.key("metadata.enabled").booleanType().defaultValue(false).withDescription("Enable the internal metadata table which serves table metadata like level file listings, default disabled");
    public static final ConfigOption<Integer> METADATA_COMPACTION_DELTA_COMMITS = ConfigOptions.key("metadata.compaction.delta_commits").intType().defaultValue(10).withDescription("Max delta commits for metadata table to trigger compaction, default 10");
    public static final ConfigOption<String> INDEX_TYPE = ConfigOptions.key("index.type").stringType().defaultValue(HoodieIndex.IndexType.FLINK_STATE.name()).withDescription("Index type of Flink write job, default is using state backed index.");
    public static final ConfigOption<Boolean> INDEX_BOOTSTRAP_ENABLED = ConfigOptions.key("index.bootstrap.enabled").booleanType().defaultValue(false).withDescription("Whether to bootstrap the index state from existing hoodie table, default false");
    public static final ConfigOption<Double> INDEX_STATE_TTL = ConfigOptions.key("index.state.ttl").doubleType().defaultValue(0.0).withDescription("Index state ttl in days, default stores the index permanently");
    public static final ConfigOption<Boolean> INDEX_GLOBAL_ENABLED = ConfigOptions.key("index.global.enabled").booleanType().defaultValue(true).withDescription("Whether to update index for the old partition path\nif same key record with different partition path came in, default true");
    public static final ConfigOption<String> INDEX_PARTITION_REGEX = ConfigOptions.key("index.partition.regex").stringType().defaultValue(".*").withDescription("Whether to load partitions in state if partition path matching\uff0c default `*`");
    public static final ConfigOption<Integer> READ_TASKS = ConfigOptions.key("read.tasks").intType().noDefaultValue().withDescription("Parallelism of tasks that do actual read, default is the parallelism of the execution environment");
    public static final ConfigOption<String> SOURCE_AVRO_SCHEMA_PATH = ConfigOptions.key("source.avro-schema.path").stringType().noDefaultValue().withDescription("Source avro schema file path, the parsed schema is used for deserialization");
    public static final ConfigOption<String> SOURCE_AVRO_SCHEMA = ConfigOptions.key("source.avro-schema").stringType().noDefaultValue().withDescription("Source avro schema string, the parsed schema is used for deserialization");
    public static final String QUERY_TYPE_SNAPSHOT = "snapshot";
    public static final String QUERY_TYPE_READ_OPTIMIZED = "read_optimized";
    public static final String QUERY_TYPE_INCREMENTAL = "incremental";
    public static final ConfigOption<String> QUERY_TYPE = ConfigOptions.key("hoodie.datasource.query.type").stringType().defaultValue("snapshot").withDescription("Decides how data files need to be read, in\n1) Snapshot mode (obtain latest view, based on row & columnar data);\n2) incremental mode (new data since an instantTime);\n3) Read Optimized mode (obtain latest view, based on columnar data)\n.Default: snapshot");
    public static final String REALTIME_SKIP_MERGE = "skip_merge";
    public static final String REALTIME_PAYLOAD_COMBINE = "payload_combine";
    public static final ConfigOption<String> MERGE_TYPE = ConfigOptions.key("hoodie.datasource.merge.type").stringType().defaultValue("payload_combine").withDescription("For Snapshot query on merge on read table. Use this key to define how the payloads are merged, in\n1) skip_merge: read the base file records plus the log file records;\n2) payload_combine: read the base file records first, for each record in base file, checks whether the key is in the\n   log file records(combines the two records with same key for base and log file records), then read the left log file records");
    public static final ConfigOption<Boolean> UTC_TIMEZONE = ConfigOptions.key("read.utc-timezone").booleanType().defaultValue(true).withDescription("Use UTC timezone or local timezone to the conversion between epoch time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x use UTC timezone, by default true");
    public static final ConfigOption<Boolean> READ_AS_STREAMING = ConfigOptions.key("read.streaming.enabled").booleanType().defaultValue(false).withDescription("Whether to read as streaming source, default false");
    public static final ConfigOption<Integer> READ_STREAMING_CHECK_INTERVAL = ConfigOptions.key("read.streaming.check-interval").intType().defaultValue(60).withDescription("Check interval for streaming read of SECOND, default 1 minute");
    public static final ConfigOption<Boolean> READ_STREAMING_SKIP_COMPACT = ConfigOptions.key("read.streaming.skip_compaction").booleanType().defaultValue(false).withDescription("Whether to skip compaction instants for streaming read,\nthere are two cases that this option can be used to avoid reading duplicates:\n1) you are definitely sure that the consumer reads faster than any compaction instants, usually with delta time compaction strategy that is long enough, for e.g, one week;\n2) changelog mode is enabled, this option is a solution to keep data integrity");
    public static final String START_COMMIT_EARLIEST = "earliest";
    public static final ConfigOption<String> READ_START_COMMIT = ConfigOptions.key("read.start-commit").stringType().noDefaultValue().withDescription("Start commit instant for reading, the commit time format should be 'yyyyMMddHHmmss', by default reading from the latest instant for streaming read");
    public static final ConfigOption<String> READ_END_COMMIT = ConfigOptions.key("read.end-commit").stringType().noDefaultValue().withDescription("End commit instant for reading, the commit time format should be 'yyyyMMddHHmmss'");
    public static final ConfigOption<Boolean> READ_DATA_SKIPPING_ENABLED = ConfigOptions.key("read.data.skipping.enabled").booleanType().defaultValue(false).withDescription("Enables data-skipping allowing queries to leverage indexes to reduce the search space byskipping over files");
    public static final ConfigOption<Boolean> INSERT_CLUSTER = ConfigOptions.key("write.insert.cluster").booleanType().defaultValue(false).withDescription("Whether to merge small files for insert mode, if true, the write throughput will decrease because the read/write of existing small file, only valid for COW table, default false");
    public static final ConfigOption<String> OPERATION = ConfigOptions.key("write.operation").stringType().defaultValue(WriteOperationType.UPSERT.value()).withDescription("The write operation, that this write should do");
    public static final ConfigOption<Boolean> PRE_COMBINE = ConfigOptions.key("write.precombine").booleanType().defaultValue(false).withDescription("Flag to indicate whether to drop duplicates before insert/upsert.\nBy default these cases will accept duplicates, to gain extra performance:\n1) insert operation;\n2) upsert for MOR table, the MOR table deduplicate on reading");
    public static final ConfigOption<Integer> RETRY_TIMES = ConfigOptions.key("write.retry.times").intType().defaultValue(3).withDescription("Flag to indicate how many times streaming job should retry for a failed checkpoint batch.\nBy default 3");
    public static final ConfigOption<Long> RETRY_INTERVAL_MS = ConfigOptions.key("write.retry.interval.ms").longType().defaultValue(2000L).withDescription("Flag to indicate how long (by millisecond) before a retry should issued for failed checkpoint batch.\nBy default 2000 and it will be doubled by every retry");
    public static final ConfigOption<Boolean> IGNORE_FAILED = ConfigOptions.key("write.ignore.failed").booleanType().defaultValue(false).withDescription("Flag to indicate whether to ignore any non exception error (e.g. writestatus error). within a checkpoint batch. \nBy default false. Turning this on, could hide the write status errors while the flink checkpoint moves ahead. \nSo, would recommend users to use this with caution.");
    public static final ConfigOption<String> RECORD_KEY_FIELD = ConfigOptions.key(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).stringType().defaultValue("uuid").withDescription("Record key field. Value to be used as the `recordKey` component of `HoodieKey`.\nActual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using the dot notation eg: `a.b.c`");
    public static final ConfigOption<String> INDEX_KEY_FIELD = ConfigOptions.key(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key()).stringType().defaultValue("").withDescription("Index key field. Value to be used as hashing to find the bucket ID. Should be a subset of or equal to the recordKey fields.\nActual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using the dot notation eg: `a.b.c`");
    public static final ConfigOption<Integer> BUCKET_INDEX_NUM_BUCKETS = ConfigOptions.key(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key()).intType().defaultValue(4).withDescription("Hudi bucket number per partition. Only affected if using Hudi bucket index.");
    public static final ConfigOption<String> PARTITION_PATH_FIELD = ConfigOptions.key(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).stringType().defaultValue("").withDescription("Partition path field. Value to be used at the `partitionPath` component of `HoodieKey`.\nActual value obtained by invoking .toString(), default ''");
    public static final ConfigOption<Boolean> URL_ENCODE_PARTITIONING = ConfigOptions.key(KeyGeneratorOptions.URL_ENCODE_PARTITIONING.key()).booleanType().defaultValue(false).withDescription("Whether to encode the partition path url, default false");
    public static final ConfigOption<Boolean> HIVE_STYLE_PARTITIONING = ConfigOptions.key(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key()).booleanType().defaultValue(false).withDescription("Whether to use Hive style partitioning.\nIf set true, the names of partition folders follow <partition_column_name>=<partition_value> format.\nBy default false (the names of partition folders are only partition values)");
    public static final ConfigOption<String> KEYGEN_CLASS_NAME = ConfigOptions.key(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key()).stringType().noDefaultValue().withDescription("Key generator class, that implements will extract the key out of incoming record");
    public static final ConfigOption<String> KEYGEN_TYPE = ConfigOptions.key(HoodieWriteConfig.KEYGENERATOR_TYPE.key()).stringType().defaultValue(KeyGeneratorType.SIMPLE.name()).withDescription("Key generator type, that implements will extract the key out of incoming record");
    public static final String PARTITION_FORMAT_HOUR = "yyyyMMddHH";
    public static final String PARTITION_FORMAT_DAY = "yyyyMMdd";
    public static final String PARTITION_FORMAT_DASHED_DAY = "yyyy-MM-dd";
    public static final ConfigOption<String> PARTITION_FORMAT = ConfigOptions.key("write.partition.format").stringType().noDefaultValue().withDescription("Partition path format, only valid when 'write.datetime.partitioning' is true, default is:\n1) 'yyyyMMddHH' for timestamp(3) WITHOUT TIME ZONE, LONG, FLOAT, DOUBLE, DECIMAL;\n2) 'yyyyMMdd' for DATE and INT.");
    public static final ConfigOption<Integer> INDEX_BOOTSTRAP_TASKS = ConfigOptions.key("write.index_bootstrap.tasks").intType().noDefaultValue().withDescription("Parallelism of tasks that do index bootstrap, default same as the write task parallelism");
    public static final ConfigOption<Integer> BUCKET_ASSIGN_TASKS = ConfigOptions.key("write.bucket_assign.tasks").intType().noDefaultValue().withDescription("Parallelism of tasks that do bucket assign, default same as the write task parallelism");
    public static final ConfigOption<Integer> WRITE_TASKS = ConfigOptions.key("write.tasks").intType().noDefaultValue().withDescription("Parallelism of tasks that do actual write, default is the parallelism of the execution environment");
    public static final ConfigOption<Double> WRITE_TASK_MAX_SIZE = ConfigOptions.key("write.task.max.size").doubleType().defaultValue(1024.0).withDescription("Maximum memory in MB for a write task, when the threshold hits,\nit flushes the max size data bucket to avoid OOM, default 1GB");
    public static final ConfigOption<Long> WRITE_RATE_LIMIT = ConfigOptions.key("write.rate.limit").longType().defaultValue(0L).withDescription("Write record rate limit per second to prevent traffic jitter and improve stability, default 0 (no limit)");
    public static final ConfigOption<Double> WRITE_BATCH_SIZE = ConfigOptions.key("write.batch.size").doubleType().defaultValue(256.0).withDescription("Batch buffer size in MB to flush data into the underneath filesystem, default 256MB");
    public static final ConfigOption<Integer> WRITE_LOG_BLOCK_SIZE = ConfigOptions.key("write.log_block.size").intType().defaultValue(128).withDescription("Max log block size in MB for log file, default 128MB");
    public static final ConfigOption<Long> WRITE_LOG_MAX_SIZE = ConfigOptions.key("write.log.max.size").longType().defaultValue(1024L).withDescription("Maximum size allowed in MB for a log file before it is rolled over to the next version, default 1GB");
    public static final ConfigOption<Integer> WRITE_PARQUET_BLOCK_SIZE = ConfigOptions.key("write.parquet.block.size").intType().defaultValue(120).withDescription("Parquet RowGroup size. It's recommended to make this large enough that scan costs can be amortized by packing enough column values into a single row group.");
    public static final ConfigOption<Integer> WRITE_PARQUET_MAX_FILE_SIZE = ConfigOptions.key("write.parquet.max.file.size").intType().defaultValue(120).withDescription("Target size for parquet files produced by Hudi write phases. For DFS, this needs to be aligned with the underlying filesystem block size for optimal performance.");
    public static final ConfigOption<Integer> WRITE_PARQUET_PAGE_SIZE = ConfigOptions.key("write.parquet.page.size").intType().defaultValue(1).withDescription("Parquet page size. Page is the unit of read within a parquet file. Within a block, pages are compressed separately.");
    public static final ConfigOption<Integer> WRITE_MERGE_MAX_MEMORY = ConfigOptions.key("write.merge.max_memory").intType().defaultValue(100).withDescription("Max memory in MB for merge, default 100MB");
    public static final ConfigOption<Long> WRITE_COMMIT_ACK_TIMEOUT = ConfigOptions.key("write.commit.ack.timeout").longType().defaultValue(-1L).withDescription("Timeout limit for a writer task after it finishes a checkpoint and\nwaits for the instant commit success, only for internal use");
    public static final ConfigOption<Boolean> WRITE_BULK_INSERT_SHUFFLE_INPUT = ConfigOptions.key("write.bulk_insert.shuffle_input").booleanType().defaultValue(true).withDescription("Whether to shuffle the inputs by specific fields for bulk insert tasks, default true");
    public static final ConfigOption<Boolean> WRITE_BULK_INSERT_SORT_INPUT = ConfigOptions.key("write.bulk_insert.sort_input").booleanType().defaultValue(true).withDescription("Whether to sort the inputs by specific fields for bulk insert tasks, default true");
    public static final ConfigOption<Integer> WRITE_SORT_MEMORY = ConfigOptions.key("write.sort.memory").intType().defaultValue(128).withDescription("Sort memory in MB, default 128MB");
    public static final ConfigOption<Boolean> COMPACTION_SCHEDULE_ENABLED = ConfigOptions.key("compaction.schedule.enabled").booleanType().defaultValue(true).withDescription("Schedule the compaction plan, enabled by default for MOR");
    public static final ConfigOption<Boolean> COMPACTION_ASYNC_ENABLED = ConfigOptions.key("compaction.async.enabled").booleanType().defaultValue(true).withDescription("Async Compaction, enabled by default for MOR");
    public static final ConfigOption<Integer> COMPACTION_TASKS = ConfigOptions.key("compaction.tasks").intType().noDefaultValue().withDescription("Parallelism of tasks that do actual compaction, default same as the write task parallelism");
    public static final String NUM_COMMITS = "num_commits";
    public static final String TIME_ELAPSED = "time_elapsed";
    public static final String NUM_AND_TIME = "num_and_time";
    public static final String NUM_OR_TIME = "num_or_time";
    public static final ConfigOption<String> COMPACTION_TRIGGER_STRATEGY = ConfigOptions.key("compaction.trigger.strategy").stringType().defaultValue("num_commits").withDescription("Strategy to trigger compaction, options are 'num_commits': trigger compaction when reach N delta commits;\n'time_elapsed': trigger compaction when time elapsed > N seconds since last compaction;\n'num_and_time': trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied;\n'num_or_time': trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied.\nDefault is 'num_commits'");
    public static final ConfigOption<Integer> COMPACTION_DELTA_COMMITS = ConfigOptions.key("compaction.delta_commits").intType().defaultValue(5).withDescription("Max delta commits needed to trigger compaction, default 5 commits");
    public static final ConfigOption<Integer> COMPACTION_DELTA_SECONDS = ConfigOptions.key("compaction.delta_seconds").intType().defaultValue(3600).withDescription("Max delta seconds time needed to trigger compaction, default 1 hour");
    public static final ConfigOption<Integer> COMPACTION_TIMEOUT_SECONDS = ConfigOptions.key("compaction.timeout.seconds").intType().defaultValue(1200).withDescription("Max timeout time in seconds for online compaction to rollback, default 20 minutes");
    public static final ConfigOption<Integer> COMPACTION_MAX_MEMORY = ConfigOptions.key("compaction.max_memory").intType().defaultValue(100).withDescription("Max memory in MB for compaction spillable map, default 100MB");
    public static final ConfigOption<Long> COMPACTION_TARGET_IO = ConfigOptions.key("compaction.target_io").longType().defaultValue(512000L).withDescription("Target IO in MB for per compaction (both read and write), default 500 GB");
    public static final ConfigOption<Boolean> CLEAN_ASYNC_ENABLED = ConfigOptions.key("clean.async.enabled").booleanType().defaultValue(true).withDescription("Whether to cleanup the old commits immediately on new commits, enabled by default");
    public static final ConfigOption<String> CLEAN_POLICY = ConfigOptions.key("clean.policy").stringType().defaultValue(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).withDescription("Clean policy to manage the Hudi table. Available option: KEEP_LATEST_COMMITS, KEEP_LATEST_FILE_VERSIONS, KEEP_LATEST_BY_HOURS.Default is KEEP_LATEST_COMMITS.");
    public static final ConfigOption<Integer> CLEAN_RETAIN_COMMITS = ConfigOptions.key("clean.retain_commits").intType().defaultValue(30).withDescription("Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\nThis also directly translates into how much you can incrementally pull on this table, default 30");
    public static final ConfigOption<Integer> CLEAN_RETAIN_HOURS = ConfigOptions.key("clean.retain_hours").intType().defaultValue(24).withDescription("Number of hours for which commits need to be retained. This config provides a more flexible option ascompared to number of commits retained for cleaning service. Setting this property ensures all the files, but the latest in a file group, corresponding to commits with commit times older than the configured number of hours to be retained are cleaned.");
    public static final ConfigOption<Integer> CLEAN_RETAIN_FILE_VERSIONS = ConfigOptions.key("clean.retain_file_versions").intType().defaultValue(5).withDescription("Number of file versions to retain. default 5");
    public static final ConfigOption<Integer> ARCHIVE_MAX_COMMITS = ConfigOptions.key("archive.max_commits").intType().defaultValue(50).withDescription("Max number of commits to keep before archiving older commits into a sequential log, default 50");
    public static final ConfigOption<Integer> ARCHIVE_MIN_COMMITS = ConfigOptions.key("archive.min_commits").intType().defaultValue(40).withDescription("Min number of commits to keep before archiving older commits into a sequential log, default 40");
    public static final ConfigOption<Boolean> CLUSTERING_SCHEDULE_ENABLED = ConfigOptions.key("clustering.schedule.enabled").booleanType().defaultValue(false).withDescription("Schedule the cluster plan, default false");
    public static final ConfigOption<Boolean> CLUSTERING_ASYNC_ENABLED = ConfigOptions.key("clustering.async.enabled").booleanType().defaultValue(false).withDescription("Async Clustering, default false");
    public static final ConfigOption<Integer> CLUSTERING_DELTA_COMMITS = ConfigOptions.key("clustering.delta_commits").intType().defaultValue(4).withDescription("Max delta commits needed to trigger clustering, default 4 commits");
    public static final ConfigOption<Integer> CLUSTERING_TASKS = ConfigOptions.key("clustering.tasks").intType().noDefaultValue().withDescription("Parallelism of tasks that do actual clustering, default same as the write task parallelism");
    public static final ConfigOption<Integer> CLUSTERING_TARGET_PARTITIONS = ConfigOptions.key("clustering.plan.strategy.daybased.lookback.partitions").intType().defaultValue(2).withDescription("Number of partitions to list to create ClusteringPlan, default is 2");
    public static final ConfigOption<String> CLUSTERING_PLAN_STRATEGY_CLASS = ConfigOptions.key("clustering.plan.strategy.class").stringType().defaultValue(FlinkSizeBasedClusteringPlanStrategy.class.getName()).withDescription("Config to provide a strategy class (subclass of ClusteringPlanStrategy) to create clustering plan i.e select what file groups are being clustered. Default strategy, looks at the last N (determined by " + CLUSTERING_TARGET_PARTITIONS.key() + ") day based partitions picks the small file slices within those partitions.");
    public static final ConfigOption<String> CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME = ConfigOptions.key("clustering.plan.partition.filter.mode").stringType().defaultValue(ClusteringPlanPartitionFilterMode.NONE.name()).withDescription("Partition filter mode used in the creation of clustering plan. Available values are - NONE: do not filter table partition and thus the clustering plan will include all partitions that have clustering candidate.RECENT_DAYS: keep a continuous range of partitions, worked together with configs '" + HoodieClusteringConfig.DAYBASED_LOOKBACK_PARTITIONS.key() + "' and '" + HoodieClusteringConfig.PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST.key() + ".SELECTED_PARTITIONS: keep partitions that are in the specified range ['" + HoodieClusteringConfig.PARTITION_FILTER_BEGIN_PARTITION.key() + "', '" + HoodieClusteringConfig.PARTITION_FILTER_END_PARTITION.key() + "'].DAY_ROLLING: clustering partitions on a rolling basis by the hour to avoid clustering all partitions each time, which strategy sorts the partitions asc and chooses the partition of which index is divided by 24 and the remainder is equal to the current hour.");
    public static final ConfigOption<Long> CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES = ConfigOptions.key("clustering.plan.strategy.target.file.max.bytes").longType().defaultValue(0x40000000L).withDescription("Each group can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups, default 1 GB");
    public static final ConfigOption<Long> CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT = ConfigOptions.key("clustering.plan.strategy.small.file.limit").longType().defaultValue(600L).withDescription("Files smaller than the size specified here are candidates for clustering, default 600 MB");
    public static final ConfigOption<Integer> CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST = ConfigOptions.key("clustering.plan.strategy.daybased.skipfromlatest.partitions").intType().defaultValue(0).withDescription("Number of partitions to skip from latest when choosing partitions to create ClusteringPlan");
    public static final ConfigOption<String> CLUSTERING_SORT_COLUMNS = ConfigOptions.key("clustering.plan.strategy.sort.columns").stringType().defaultValue("").withDescription("Columns to sort the data by when clustering");
    public static final ConfigOption<Integer> CLUSTERING_MAX_NUM_GROUPS = ConfigOptions.key("clustering.plan.strategy.max.num.groups").intType().defaultValue(30).withDescription("Maximum number of groups to create as part of ClusteringPlan. Increasing groups will increase parallelism, default is 30");
    public static final ConfigOption<Boolean> HIVE_SYNC_ENABLED = ConfigOptions.key("hive_sync.enabled").booleanType().defaultValue(false).withFallbackKeys("hive_sync.enable").withDescription("Asynchronously sync Hive meta to HMS, default false");
    public static final ConfigOption<String> HIVE_SYNC_DB = ConfigOptions.key("hive_sync.db").stringType().defaultValue("default").withDescription("Database name for hive sync, default 'default'");
    public static final ConfigOption<String> HIVE_SYNC_TABLE = ConfigOptions.key("hive_sync.table").stringType().defaultValue("unknown").withDescription("Table name for hive sync, default 'unknown'");
    public static final ConfigOption<String> HIVE_SYNC_FILE_FORMAT = ConfigOptions.key("hive_sync.file_format").stringType().defaultValue("PARQUET").withDescription("File format for hive sync, default 'PARQUET'");
    public static final ConfigOption<String> HIVE_SYNC_MODE = ConfigOptions.key("hive_sync.mode").stringType().defaultValue(HiveSyncMode.HMS.name()).withDescription("Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql, default 'hms'");
    public static final ConfigOption<String> HIVE_SYNC_USERNAME = ConfigOptions.key("hive_sync.username").stringType().defaultValue("hive").withDescription("Username for hive sync, default 'hive'");
    public static final ConfigOption<String> HIVE_SYNC_PASSWORD = ConfigOptions.key("hive_sync.password").stringType().defaultValue("hive").withDescription("Password for hive sync, default 'hive'");
    public static final ConfigOption<String> HIVE_SYNC_JDBC_URL = ConfigOptions.key("hive_sync.jdbc_url").stringType().defaultValue("jdbc:hive2://localhost:10000").withDescription("Jdbc URL for hive sync, default 'jdbc:hive2://localhost:10000'");
    public static final ConfigOption<String> HIVE_SYNC_METASTORE_URIS = ConfigOptions.key("hive_sync.metastore.uris").stringType().defaultValue("").withDescription("Metastore uris for hive sync, default ''");
    public static final ConfigOption<String> HIVE_SYNC_PARTITION_FIELDS = ConfigOptions.key("hive_sync.partition_fields").stringType().defaultValue("").withDescription("Partition fields for hive sync, default ''");
    public static final ConfigOption<String> HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME = ConfigOptions.key("hive_sync.partition_extractor_class").stringType().defaultValue(MultiPartKeysValueExtractor.class.getName()).withDescription("Tool to extract the partition value from HDFS path, default 'MultiPartKeysValueExtractor'");
    public static final ConfigOption<Boolean> HIVE_SYNC_ASSUME_DATE_PARTITION = ConfigOptions.key("hive_sync.assume_date_partitioning").booleanType().defaultValue(false).withDescription("Assume partitioning is yyyy/mm/dd, default false");
    public static final ConfigOption<Boolean> HIVE_SYNC_USE_JDBC = ConfigOptions.key("hive_sync.use_jdbc").booleanType().defaultValue(true).withDescription("Use JDBC when hive synchronization is enabled, default true");
    public static final ConfigOption<Boolean> HIVE_SYNC_AUTO_CREATE_DB = ConfigOptions.key("hive_sync.auto_create_db").booleanType().defaultValue(true).withDescription("Auto create hive database if it does not exists, default true");
    public static final ConfigOption<Boolean> HIVE_SYNC_IGNORE_EXCEPTIONS = ConfigOptions.key("hive_sync.ignore_exceptions").booleanType().defaultValue(false).withDescription("Ignore exceptions during hive synchronization, default false");
    public static final ConfigOption<Boolean> HIVE_SYNC_SKIP_RO_SUFFIX = ConfigOptions.key("hive_sync.skip_ro_suffix").booleanType().defaultValue(false).withDescription("Skip the _ro suffix for Read optimized table when registering, default false");
    public static final ConfigOption<Boolean> HIVE_SYNC_SUPPORT_TIMESTAMP = ConfigOptions.key("hive_sync.support_timestamp").booleanType().defaultValue(true).withDescription("INT64 with original type TIMESTAMP_MICROS is converted to hive timestamp type.\nDisabled by default for backward compatibility.");
    public static final ConfigOption<String> HIVE_SYNC_TABLE_PROPERTIES = ConfigOptions.key("hive_sync.table_properties").stringType().noDefaultValue().withDescription("Additional properties to store with table, the data format is k1=v1\nk2=v2");
    public static final ConfigOption<String> HIVE_SYNC_TABLE_SERDE_PROPERTIES = ConfigOptions.key("hive_sync.serde_properties").stringType().noDefaultValue().withDescription("Serde properties to hive table, the data format is k1=v1\nk2=v2");
    public static final ConfigOption<String> HIVE_SYNC_CONF_DIR = ConfigOptions.key("hive_sync.conf.dir").stringType().noDefaultValue().withDescription("The hive configuration directory, where the hive-site.xml lies in, the file should be put on the client machine");
    private static final String PROPERTIES_PREFIX = "properties.";

    private FlinkOptions() {
    }

    public static Map<String, String> getPropertiesWithPrefix(Map<String, String> options, String prefix) {
        HashMap<String, String> hoodieProperties = new HashMap<String, String>();
        if (FlinkOptions.hasPropertyOptions(options, prefix)) {
            options.keySet().stream().filter(key -> key.startsWith(prefix)).forEach(key -> {
                String value = (String)options.get(key);
                String subKey = key.substring(prefix.length());
                hoodieProperties.put(subKey, value);
            });
        }
        return hoodieProperties;
    }

    public static Configuration flatOptions(Configuration conf) {
        HashMap<String, String> propsMap = new HashMap<String, String>();
        conf.toMap().forEach((key, value) -> {
            String subKey = key.startsWith(PROPERTIES_PREFIX) ? key.substring(PROPERTIES_PREFIX.length()) : key;
            propsMap.put(subKey, (String)value);
        });
        return FlinkOptions.fromMap(propsMap);
    }

    private static boolean hasPropertyOptions(Map<String, String> options, String prefix) {
        return options.keySet().stream().anyMatch(k -> k.startsWith(prefix));
    }

    public static Configuration fromMap(Map<String, String> map) {
        Configuration configuration = new Configuration();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            configuration.setString(entry.getKey().trim(), entry.getValue());
        }
        return configuration;
    }

    public static <T> boolean isDefaultValueDefined(Configuration conf, ConfigOption<T> option) {
        return !conf.getOptional(option).isPresent() || conf.get(option).equals(option.defaultValue());
    }

    public static Set<ConfigOption<?>> optionalOptions() {
        HashSet options = new HashSet(FlinkOptions.allOptions());
        options.remove(PATH);
        return options;
    }

    public static List<ConfigOption<?>> allOptions() {
        Field[] declaredFields = FlinkOptions.class.getDeclaredFields();
        ArrayList options = new ArrayList();
        for (Field field2 : declaredFields) {
            if (!Modifier.isStatic(field2.getModifiers()) || !field2.getType().equals(ConfigOption.class)) continue;
            try {
                options.add((ConfigOption)field2.get(ConfigOption.class));
            }
            catch (IllegalAccessException e) {
                throw new HoodieException("Error while fetching static config option", e);
            }
        }
        return options;
    }
}

