/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.adapter.DataStreamScanProviderAdapter;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsInference;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.sink.utils.Pipelines;
import org.apache.hudi.source.ExpressionEvaluators;
import org.apache.hudi.source.ExpressionPredicates;
import org.apache.hudi.source.FileIndex;
import org.apache.hudi.source.IncrementalInputSplits;
import org.apache.hudi.source.StreamReadMonitoringFunction;
import org.apache.hudi.source.StreamReadOperator;
import org.apache.hudi.source.prune.DataPruner;
import org.apache.hudi.source.prune.PartitionPruners;
import org.apache.hudi.source.prune.PrimaryKeyPruners;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.table.format.InternalSchemaManager;
import org.apache.hudi.table.format.cdc.CdcInputFormat;
import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.table.format.mor.MergeOnReadTableState;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.ChangelogModes;
import org.apache.hudi.util.ExpressionUtils;
import org.apache.hudi.util.InputFormats;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieTableSource
implements ScanTableSource,
SupportsProjectionPushDown,
SupportsLimitPushDown,
SupportsFilterPushDown {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieTableSource.class);
    private static final int NO_LIMIT_CONSTANT = -1;
    private final transient org.apache.hadoop.conf.Configuration hadoopConf;
    private final transient HoodieTableMetaClient metaClient;
    private final long maxCompactionMemoryInBytes;
    private final ResolvedSchema schema;
    private final RowType tableRowType;
    private final Path path;
    private final List<String> partitionKeys;
    private final String defaultPartName;
    private final Configuration conf;
    private final InternalSchemaManager internalSchemaManager;
    private int[] requiredPos;
    private long limit;
    private List<ExpressionPredicates.Predicate> predicates;
    private DataPruner dataPruner;
    private PartitionPruners.PartitionPruner partitionPruner;
    private int dataBucket;
    private FileIndex fileIndex;

    public HoodieTableSource(ResolvedSchema schema, Path path, List<String> partitionKeys, String defaultPartName, Configuration conf) {
        this(schema, path, partitionKeys, defaultPartName, conf, null, null, null, -1, null, null, null, null);
    }

    public HoodieTableSource(ResolvedSchema schema, Path path, List<String> partitionKeys, String defaultPartName, Configuration conf, @Nullable List<ExpressionPredicates.Predicate> predicates, @Nullable DataPruner dataPruner, @Nullable PartitionPruners.PartitionPruner partitionPruner, int dataBucket, @Nullable int[] requiredPos, @Nullable Long limit, @Nullable HoodieTableMetaClient metaClient, @Nullable InternalSchemaManager internalSchemaManager) {
        this.schema = ResolvedSchema.of(schema.getColumns().stream().filter(Column::isPhysical).collect(Collectors.toList()));
        this.tableRowType = (RowType)((DataType)this.schema.toSourceRowDataType().notNull()).getLogicalType();
        this.path = path;
        this.partitionKeys = partitionKeys;
        this.defaultPartName = defaultPartName;
        this.conf = conf;
        this.predicates = predicates == null ? Collections.emptyList() : predicates;
        this.dataPruner = dataPruner;
        this.partitionPruner = partitionPruner;
        this.dataBucket = dataBucket;
        this.requiredPos = requiredPos == null ? IntStream.range(0, this.tableRowType.getFieldCount()).toArray() : requiredPos;
        this.limit = limit == null ? -1L : limit;
        this.hadoopConf = HadoopConfigurations.getHadoopConf(conf);
        this.metaClient = metaClient == null ? StreamerUtil.metaClientForReader(conf, this.hadoopConf) : metaClient;
        this.maxCompactionMemoryInBytes = StreamerUtil.getMaxCompactionMemoryInBytes(conf);
        this.internalSchemaManager = internalSchemaManager == null ? InternalSchemaManager.get(this.conf, this.metaClient) : internalSchemaManager;
    }

    public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext scanContext) {
        return new DataStreamScanProviderAdapter(){

            public boolean isBounded() {
                return !HoodieTableSource.this.conf.getBoolean(FlinkOptions.READ_AS_STREAMING);
            }

            public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
                TypeInformation typeInfo = TypeInfoDataTypeConverter.fromDataTypeToTypeInfo((DataType)HoodieTableSource.this.getProducedDataType());
                OptionsInference.setupSourceTasks(HoodieTableSource.this.conf, execEnv.getParallelism());
                if (HoodieTableSource.this.conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
                    StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction(HoodieTableSource.this.conf, FilePathUtils.toFlinkPath(HoodieTableSource.this.path), HoodieTableSource.this.tableRowType, HoodieTableSource.this.maxCompactionMemoryInBytes, HoodieTableSource.this.partitionPruner);
                    InputFormat<RowData, ?> inputFormat = HoodieTableSource.this.getInputFormat(true);
                    OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat)inputFormat);
                    SingleOutputStreamOperator source = execEnv.addSource((SourceFunction)monitoringFunction, HoodieTableSource.this.getSourceOperatorName("split_monitor")).uid(Pipelines.opUID("split_monitor", HoodieTableSource.this.conf)).setParallelism(1).keyBy(MergeOnReadInputSplit::getFileId).transform("split_reader", typeInfo, factory).uid(Pipelines.opUID("split_reader", HoodieTableSource.this.conf)).setParallelism(HoodieTableSource.this.conf.getInteger(FlinkOptions.READ_TASKS));
                    return new DataStreamSource(source);
                }
                InputFormatSourceFunction func = new InputFormatSourceFunction(HoodieTableSource.this.getInputFormat(), typeInfo);
                DataStreamSource source = execEnv.addSource((SourceFunction)func, HoodieTableSource.this.asSummaryString(), typeInfo);
                return source.name(HoodieTableSource.this.getSourceOperatorName("bounded_source")).setParallelism(HoodieTableSource.this.conf.getInteger(FlinkOptions.READ_TASKS));
            }
        };
    }

    public ChangelogMode getChangelogMode() {
        return OptionsResolver.emitChangelog(this.conf) ? ChangelogModes.FULL : ChangelogMode.insertOnly();
    }

    public DynamicTableSource copy() {
        return new HoodieTableSource(this.schema, this.path, this.partitionKeys, this.defaultPartName, this.conf, this.predicates, this.dataPruner, this.partitionPruner, this.dataBucket, this.requiredPos, this.limit, this.metaClient, this.internalSchemaManager);
    }

    public String asSummaryString() {
        return "HudiTableSource";
    }

    public SupportsFilterPushDown.Result applyFilters(List<ResolvedExpression> filters) {
        List<ResolvedExpression> simpleFilters = ExpressionUtils.filterSimpleCallExpression(filters);
        Tuple2<List<ResolvedExpression>, List<ResolvedExpression>> splitFilters = ExpressionUtils.splitExprByPartitionCall(simpleFilters, this.partitionKeys, this.tableRowType);
        this.predicates = ExpressionPredicates.fromExpression((List)splitFilters.f0);
        this.dataPruner = DataPruner.newInstance((List)splitFilters.f0);
        this.partitionPruner = this.cratePartitionPruner((List)splitFilters.f1);
        this.dataBucket = this.getDataBucket((List)splitFilters.f0);
        return SupportsFilterPushDown.Result.of(new ArrayList((Collection)splitFilters.f1), new ArrayList<ResolvedExpression>(filters));
    }

    public boolean supportsNestedProjection() {
        return false;
    }

    public void applyProjection(int[][] projections) {
        this.requiredPos = Arrays.stream(projections).mapToInt(array -> array[0]).toArray();
    }

    public void applyLimit(long limit) {
        this.limit = limit;
    }

    private DataType getProducedDataType() {
        String[] schemaFieldNames = this.schema.getColumnNames().toArray(new String[0]);
        DataType[] schemaTypes = this.schema.getColumnDataTypes().toArray(new DataType[0]);
        return (DataType)DataTypes.ROW((DataTypes.Field[])((DataTypes.Field[])Arrays.stream(this.requiredPos).mapToObj(i -> DataTypes.FIELD((String)schemaFieldNames[i], (DataType)schemaTypes[i])).toArray(DataTypes.Field[]::new))).bridgedTo(RowData.class);
    }

    private String getSourceOperatorName(String operatorName) {
        String[] schemaFieldNames = this.schema.getColumnNames().toArray(new String[0]);
        List fields = Arrays.stream(this.requiredPos).mapToObj(i -> schemaFieldNames[i]).collect(Collectors.toList());
        StringBuilder sb = new StringBuilder();
        sb.append(operatorName).append("(").append("table=").append(Collections.singletonList(this.conf.getString(FlinkOptions.TABLE_NAME))).append(", ").append("fields=").append(fields).append(")");
        return sb.toString();
    }

    @Nullable
    private PartitionPruners.PartitionPruner cratePartitionPruner(List<ResolvedExpression> partitionFilters) {
        if (partitionFilters.isEmpty()) {
            return null;
        }
        StringJoiner joiner = new StringJoiner(" and ");
        partitionFilters.forEach(f -> joiner.add(f.asSummaryString()));
        LOG.info("Partition pruner for hoodie source, condition is:\n" + joiner);
        List<ExpressionEvaluators.Evaluator> evaluators = ExpressionEvaluators.fromExpression(partitionFilters);
        List<DataType> partitionTypes = this.partitionKeys.stream().map(name -> (Column)this.schema.getColumn(name).orElseThrow(() -> new HoodieValidationException("Field " + name + " does not exist"))).map(Column::getDataType).collect(Collectors.toList());
        String defaultParName = this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME);
        boolean hivePartition = this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING);
        return PartitionPruners.getInstance(evaluators, this.partitionKeys, partitionTypes, defaultParName, hivePartition);
    }

    private int getDataBucket(List<ResolvedExpression> dataFilters) {
        if (!OptionsResolver.isBucketIndexType(this.conf) || dataFilters.isEmpty()) {
            return -1;
        }
        Set<String> indexKeyFields = Arrays.stream(OptionsResolver.getIndexKeys(this.conf)).collect(Collectors.toSet());
        List<ResolvedExpression> indexKeyFilters = dataFilters.stream().filter(expr -> ExpressionUtils.isEqualsLitExpr(expr, indexKeyFields)).collect(Collectors.toList());
        if (!ExpressionUtils.isFilteringByAllFields(indexKeyFilters, indexKeyFields)) {
            return -1;
        }
        return PrimaryKeyPruners.getBucketId(indexKeyFilters, this.conf);
    }

    private List<MergeOnReadInputSplit> buildInputSplits() {
        FileIndex fileIndex = this.getOrBuildFileIndex();
        List<String> relPartitionPaths = fileIndex.getOrBuildPartitionPaths();
        if (relPartitionPaths.size() == 0) {
            return Collections.emptyList();
        }
        FileStatus[] fileStatuses = fileIndex.getFilesInPartitions();
        if (fileStatuses.length == 0) {
            throw new HoodieException("No files found for reading in user provided path.");
        }
        HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(this.metaClient, this.metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(), fileStatuses);
        if (!fsView.getLastInstant().isPresent()) {
            return Collections.emptyList();
        }
        String latestCommit = fsView.getLastInstant().get().getTimestamp();
        String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
        AtomicInteger cnt = new AtomicInteger(0);
        return relPartitionPaths.stream().map(relPartitionPath -> fsView.getLatestMergedFileSlicesBeforeOrOn((String)relPartitionPath, latestCommit).map(fileSlice -> {
            String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
            Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()));
            return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths, latestCommit, this.metaClient.getBasePath(), this.maxCompactionMemoryInBytes, mergeType, null, fileSlice.getFileId());
        }).collect(Collectors.toList())).flatMap(Collection::stream).collect(Collectors.toList());
    }

    public InputFormat<RowData, ?> getInputFormat() {
        return this.getInputFormat(false);
    }

    @VisibleForTesting
    public InputFormat<RowData, ?> getInputFormat(boolean isStreaming) {
        return isStreaming ? this.getStreamInputFormat() : this.getBatchInputFormat();
    }

    private InputFormat<RowData, ?> getBatchInputFormat() {
        String queryType;
        Schema tableAvroSchema = this.getTableAvroSchema();
        DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema);
        RowType rowType = (RowType)rowDataType.getLogicalType();
        RowType requiredRowType = (RowType)((DataType)this.getProducedDataType().notNull()).getLogicalType();
        switch (queryType = this.conf.getString(FlinkOptions.QUERY_TYPE)) {
            case "snapshot": {
                HoodieTableType tableType = HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE));
                switch (tableType) {
                    case MERGE_ON_READ: {
                        List<MergeOnReadInputSplit> inputSplits = this.buildInputSplits();
                        if (inputSplits.size() == 0) {
                            LOG.warn("No input splits generate for MERGE_ON_READ input format, returns empty collection instead");
                            return InputFormats.EMPTY_INPUT_FORMAT;
                        }
                        return this.mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema, rowDataType, inputSplits, false);
                    }
                    case COPY_ON_WRITE: {
                        return this.baseFileOnlyInputFormat();
                    }
                }
                throw new HoodieException("Unexpected table type: " + this.conf.getString(FlinkOptions.TABLE_TYPE));
            }
            case "read_optimized": {
                return this.baseFileOnlyInputFormat();
            }
            case "incremental": {
                IncrementalInputSplits incrementalInputSplits = IncrementalInputSplits.builder().conf(this.conf).path(FilePathUtils.toFlinkPath(this.path)).rowType(this.tableRowType).maxCompactionMemoryInBytes(this.maxCompactionMemoryInBytes).partitionPruner(this.partitionPruner).build();
                boolean cdcEnabled = this.conf.getBoolean(FlinkOptions.CDC_ENABLED);
                IncrementalInputSplits.Result result = incrementalInputSplits.inputSplits(this.metaClient, cdcEnabled);
                if (result.isEmpty()) {
                    LOG.warn("No input splits generate for incremental read, returns empty collection instead");
                    return InputFormats.EMPTY_INPUT_FORMAT;
                }
                if (cdcEnabled) {
                    return this.cdcInputFormat(rowType, requiredRowType, tableAvroSchema, rowDataType, result.getInputSplits());
                }
                return this.mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema, rowDataType, result.getInputSplits(), false);
            }
        }
        String errMsg = String.format("Invalid query type : '%s', options ['%s', '%s', '%s'] are supported now", queryType, "snapshot", "read_optimized", "incremental");
        throw new HoodieException(errMsg);
    }

    private InputFormat<RowData, ?> getStreamInputFormat() {
        String queryType;
        Schema tableAvroSchema = this.metaClient == null || !this.tableDataExists() ? this.inferSchemaFromDdl() : this.getTableAvroSchema();
        DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema);
        RowType rowType = (RowType)rowDataType.getLogicalType();
        RowType requiredRowType = (RowType)((DataType)this.getProducedDataType().notNull()).getLogicalType();
        switch (queryType = this.conf.getString(FlinkOptions.QUERY_TYPE)) {
            case "snapshot": 
            case "incremental": {
                boolean emitDelete;
                HoodieTableType tableType = HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE));
                boolean bl = emitDelete = tableType == HoodieTableType.MERGE_ON_READ;
                if (this.conf.getBoolean(FlinkOptions.CDC_ENABLED)) {
                    return this.cdcInputFormat(rowType, requiredRowType, tableAvroSchema, rowDataType, Collections.emptyList());
                }
                return this.mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema, rowDataType, Collections.emptyList(), emitDelete);
            }
        }
        String errMsg = String.format("Invalid query type : '%s', options ['%s', '%s'] are supported now", queryType, "snapshot", "incremental");
        throw new HoodieException(errMsg);
    }

    private boolean tableDataExists() {
        HoodieActiveTimeline activeTimeline = this.metaClient.getActiveTimeline();
        Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata = activeTimeline.getLastCommitMetadataWithValidData();
        return instantAndCommitMetadata.isPresent();
    }

    private MergeOnReadInputFormat cdcInputFormat(RowType rowType, RowType requiredRowType, Schema tableAvroSchema, DataType rowDataType, List<MergeOnReadInputSplit> inputSplits) {
        MergeOnReadTableState hoodieTableState = new MergeOnReadTableState(rowType, requiredRowType, tableAvroSchema.toString(), AvroSchemaConverter.convertToSchema((LogicalType)requiredRowType).toString(), inputSplits, this.conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","));
        return ((CdcInputFormat.Builder)((CdcInputFormat.Builder)CdcInputFormat.builder().config(this.conf).tableState(hoodieTableState).fieldTypes(rowDataType.getChildren())).defaultPartName(this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME)).predicates((List)this.predicates)).limit(this.limit).emitDelete(false).build();
    }

    private MergeOnReadInputFormat mergeOnReadInputFormat(RowType rowType, RowType requiredRowType, Schema tableAvroSchema, DataType rowDataType, List<MergeOnReadInputSplit> inputSplits, boolean emitDelete) {
        MergeOnReadTableState hoodieTableState = new MergeOnReadTableState(rowType, requiredRowType, tableAvroSchema.toString(), AvroSchemaConverter.convertToSchema((LogicalType)requiredRowType).toString(), inputSplits, this.conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","));
        return MergeOnReadInputFormat.builder().config(this.conf).tableState(hoodieTableState).fieldTypes(rowDataType.getChildren()).defaultPartName(this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME)).predicates(this.predicates).limit(this.limit).emitDelete(emitDelete).internalSchemaManager(this.internalSchemaManager).build();
    }

    private InputFormat<RowData, ?> baseFileOnlyInputFormat() {
        FileStatus[] fileStatuses = this.getReadFiles();
        if (fileStatuses.length == 0) {
            return InputFormats.EMPTY_INPUT_FORMAT;
        }
        HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(this.metaClient, this.metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(), fileStatuses);
        Path[] paths = (Path[])fsView.getLatestBaseFiles().map(BaseFile::getFileStatus).map(FileStatus::getPath).toArray(Path[]::new);
        if (paths.length == 0) {
            return InputFormats.EMPTY_INPUT_FORMAT;
        }
        return new CopyOnWriteInputFormat(FilePathUtils.toFlinkPaths(paths), this.schema.getColumnNames().toArray(new String[0]), this.schema.getColumnDataTypes().toArray(new DataType[0]), this.requiredPos, this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME), this.conf.getString(FlinkOptions.PARTITION_PATH_FIELD), this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING), this.predicates, this.limit == -1L ? Long.MAX_VALUE : this.limit, HadoopConfigurations.getParquetConf(this.conf, this.hadoopConf), this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE), this.internalSchemaManager);
    }

    private Schema inferSchemaFromDdl() {
        Schema schema = AvroSchemaConverter.convertToSchema((LogicalType)this.tableRowType);
        return HoodieAvroUtils.addMetadataFields(schema, this.conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED));
    }

    private FileIndex getOrBuildFileIndex() {
        if (this.fileIndex == null) {
            this.fileIndex = FileIndex.builder().path(this.path).conf(this.conf).rowType(this.tableRowType).dataPruner(this.dataPruner).partitionPruner(this.partitionPruner).dataBucket(this.dataBucket).build();
        }
        return this.fileIndex;
    }

    @VisibleForTesting
    public Schema getTableAvroSchema() {
        try {
            TableSchemaResolver schemaResolver = new TableSchemaResolver(this.metaClient);
            return schemaResolver.getTableAvroSchema();
        }
        catch (Throwable e) {
            LOG.warn("Get table avro schema error, use schema from the DDL instead", e);
            return this.inferSchemaFromDdl();
        }
    }

    @VisibleForTesting
    public HoodieTableMetaClient getMetaClient() {
        return this.metaClient;
    }

    @VisibleForTesting
    public Configuration getConf() {
        return this.conf;
    }

    @VisibleForTesting
    public void reset() {
        this.metaClient.reloadActiveTimeline();
        this.fileIndex = null;
    }

    @VisibleForTesting
    public FileStatus[] getReadFiles() {
        FileIndex fileIndex = this.getOrBuildFileIndex();
        List<String> relPartitionPaths = fileIndex.getOrBuildPartitionPaths();
        if (relPartitionPaths.size() == 0) {
            return new FileStatus[0];
        }
        return fileIndex.getFilesInPartitions();
    }

    @VisibleForTesting
    public List<ExpressionPredicates.Predicate> getPredicates() {
        return this.predicates;
    }

    @VisibleForTesting
    public DataPruner getDataPruner() {
        return this.dataPruner;
    }

    @VisibleForTesting
    public int getDataBucket() {
        return this.dataBucket;
    }
}

