package org.apache.hudi.table;

import java.io.File;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.types.DataType;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.source.ExpressionPredicates;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/table/TestHoodieTableSource.class */
public class TestHoodieTableSource {
    private static final Logger LOG = LoggerFactory.getLogger(TestHoodieTableSource.class);
    private Configuration conf;

    @TempDir
    File tempFile;

    void beforeEach() throws Exception {
        this.conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
    }

    @Test
    void testGetReadPaths() throws Exception {
        beforeEach();
        FileStatus[] readFiles = getEmptyStreamingSource().getReadFiles();
        Assertions.assertNotNull(readFiles);
        MatcherAssert.assertThat(Integer.valueOf(readFiles.length), Is.is(4));
        ResolvedExpression callExpression = new CallExpression(BuiltInFunctionDefinitions.EQUALS, Arrays.asList(new FieldReferenceExpression("partition", DataTypes.STRING(), 4, 4), new ValueLiteralExpression("par1", DataTypes.STRING().notNull())), DataTypes.BOOLEAN());
        HoodieTableSource emptyStreamingSource = getEmptyStreamingSource();
        emptyStreamingSource.applyFilters(Arrays.asList(callExpression));
        FileStatus[] readFiles2 = emptyStreamingSource.getReadFiles();
        Assertions.assertNotNull(readFiles2);
        MatcherAssert.assertThat(Integer.valueOf(readFiles2.length), Is.is(1));
    }

    @Test
    void testGetInputFormat() throws Exception {
        beforeEach();
        TestData.writeData(TestData.DATA_SET_INSERT, this.conf);
        HoodieTableSource hoodieTableSource = new HoodieTableSource(TestConfigurations.TABLE_SCHEMA, new Path(this.tempFile.getPath()), Arrays.asList(this.conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",")), "default-par", this.conf);
        MatcherAssert.assertThat(hoodieTableSource.getInputFormat(), Is.is(CoreMatchers.instanceOf(FileInputFormat.class)));
        this.conf.setString(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
        MatcherAssert.assertThat(hoodieTableSource.getInputFormat(), Is.is(CoreMatchers.instanceOf(MergeOnReadInputFormat.class)));
        this.conf.setString(FlinkOptions.QUERY_TYPE.key(), "incremental");
        hoodieTableSource.getClass();
        Assertions.assertDoesNotThrow(hoodieTableSource::getInputFormat, "Query type: 'incremental' should be supported");
    }

    @Test
    void testGetTableAvroSchema() {
        HoodieTableSource emptyStreamingSource = getEmptyStreamingSource();
        Assertions.assertNull(emptyStreamingSource.getMetaClient(), "Streaming source with empty table path is allowed");
        MatcherAssert.assertThat((String) emptyStreamingSource.getTableAvroSchema().getFields().stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.joining(",")), Is.is("_hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,uuid,name,age,ts,partition"));
    }

    @Test
    void testDataSkippingFilterShouldBeNotNullWhenTableSourceIsCopied() {
        HoodieTableSource emptyStreamingSource = getEmptyStreamingSource();
        emptyStreamingSource.applyFilters(Collections.singletonList(new CallExpression(BuiltInFunctionDefinitions.IN, Arrays.asList(new FieldReferenceExpression("uuid", DataTypes.STRING(), 0, 0), new ValueLiteralExpression("1", DataTypes.STRING().notNull())), DataTypes.BOOLEAN())));
        Assertions.assertNotNull(emptyStreamingSource.copy().getDataPruner());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    void testBucketPruning(boolean z) throws Exception {
        Configuration defaultConf = TestConfigurations.getDefaultConf(new Path(this.tempFile.getAbsolutePath(), "tbl1").toString());
        defaultConf.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
        defaultConf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, z);
        TestData.writeDataAsBatch(TestData.DATA_SET_INSERT, defaultConf);
        HoodieTableSource createHoodieTableSource = createHoodieTableSource(defaultConf);
        createHoodieTableSource.applyFilters(Collections.singletonList(createLitEquivalenceExpr("uuid", 0, (DataType) DataTypes.STRING().notNull(), "id1")));
        MatcherAssert.assertThat(Integer.valueOf(createHoodieTableSource.getDataBucket()), Is.is(1));
        MatcherAssert.assertThat("Files should be pruned by bucket id 1", Integer.valueOf(createHoodieTableSource.getReadFiles().length), CoreMatchers.is(2));
        Configuration clone = defaultConf.clone();
        clone.setString(FlinkOptions.PATH, new Path(this.tempFile.getAbsolutePath(), "tbl2").toString());
        clone.setString(FlinkOptions.RECORD_KEY_FIELD, "uuid,name");
        clone.setString(FlinkOptions.KEYGEN_TYPE, "COMPLEX");
        TestData.writeDataAsBatch(TestData.DATA_SET_INSERT, clone);
        HoodieTableSource createHoodieTableSource2 = createHoodieTableSource(clone);
        createHoodieTableSource2.applyFilters(Arrays.asList(createLitEquivalenceExpr("uuid", 0, (DataType) DataTypes.STRING().notNull(), "id1"), createLitEquivalenceExpr("name", 1, (DataType) DataTypes.STRING().notNull(), "Danny")));
        MatcherAssert.assertThat(Integer.valueOf(createHoodieTableSource2.getDataBucket()), Is.is(3));
        MatcherAssert.assertThat("Files should be pruned by bucket id 3", Integer.valueOf(createHoodieTableSource2.getReadFiles().length), CoreMatchers.is(3));
        createHoodieTableSource2.reset();
        createHoodieTableSource2.applyFilters(Arrays.asList(createLitEquivalenceExpr("name", 1, (DataType) DataTypes.STRING().notNull(), "Danny"), createLitEquivalenceExpr("uuid", 0, (DataType) DataTypes.STRING().notNull(), "id1")));
        MatcherAssert.assertThat(Integer.valueOf(createHoodieTableSource2.getDataBucket()), Is.is(3));
        MatcherAssert.assertThat("Files should be pruned by bucket id 3", Integer.valueOf(createHoodieTableSource2.getReadFiles().length), CoreMatchers.is(3));
        Configuration clone2 = defaultConf.clone();
        clone2.setString(FlinkOptions.PATH, new Path(this.tempFile.getAbsolutePath(), "tbl3").toString());
        clone2.setString(FlinkOptions.RECORD_KEY_FIELD, "uuid,name");
        clone2.setString(FlinkOptions.KEYGEN_TYPE, "COMPLEX");
        TestData.writeDataAsBatch(TestData.DATA_SET_INSERT, clone2);
        HoodieTableSource createHoodieTableSource3 = createHoodieTableSource(clone2);
        createHoodieTableSource3.applyFilters(Collections.singletonList(createLitEquivalenceExpr("uuid", 0, (DataType) DataTypes.STRING().notNull(), "id1")));
        MatcherAssert.assertThat(Integer.valueOf(createHoodieTableSource3.getDataBucket()), Is.is(-1));
        MatcherAssert.assertThat("Partial pk filtering does not prune any files", Integer.valueOf(createHoodieTableSource3.getReadFiles().length), CoreMatchers.is(7));
        Configuration clone3 = defaultConf.clone();
        clone3.setString(FlinkOptions.PATH, new Path(this.tempFile.getAbsolutePath(), "tbl4").toString());
        TestData.writeDataAsBatch(TestData.DATA_SET_INSERT, clone3);
        HoodieTableSource createHoodieTableSource4 = createHoodieTableSource(clone3);
        createHoodieTableSource4.applyFilters(Arrays.asList(createLitEquivalenceExpr("uuid", 0, (DataType) DataTypes.STRING().notNull(), "id1"), createLitEquivalenceExpr("name", 1, (DataType) DataTypes.STRING().notNull(), "Danny")));
        MatcherAssert.assertThat(Integer.valueOf(createHoodieTableSource4.getDataBucket()), Is.is(1));
        MatcherAssert.assertThat("Files should be pruned by bucket id 1", Integer.valueOf(createHoodieTableSource4.getReadFiles().length), CoreMatchers.is(2));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    void testBucketPruningSpecialKeyDataType(boolean z) throws Exception {
        Configuration defaultConf = TestConfigurations.getDefaultConf(new Path(this.tempFile.getAbsolutePath(), "tbl1").toString(), TestConfigurations.ROW_DATA_TYPE_HOODIE_KEY_SPECIAL_DATA_TYPE);
        defaultConf.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
        defaultConf.setString(FlinkOptions.RECORD_KEY_FIELD, "f_timestamp");
        defaultConf.setString(FlinkOptions.PRECOMBINE_FIELD, "f_timestamp");
        defaultConf.removeConfig(FlinkOptions.PARTITION_PATH_FIELD);
        defaultConf.setBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), z);
        TestData.writeDataAsBatch(TestData.DATA_SET_INSERT_HOODIE_KEY_SPECIAL_DATA_TYPE, defaultConf);
        HoodieTableSource createHoodieTableSource = createHoodieTableSource(defaultConf);
        createHoodieTableSource.applyFilters(Collections.singletonList(createLitEquivalenceExpr("f_timestamp", 0, (DataType) DataTypes.TIMESTAMP(3).notNull(), LocalDateTime.ofInstant(Instant.ofEpochMilli(1L), ZoneId.of("UTC")))));
        MatcherAssert.assertThat(Integer.valueOf(createHoodieTableSource.getDataBucket()), Is.is(Integer.valueOf(z ? 1 : 0)));
        MatcherAssert.assertThat("Files should be pruned", Integer.valueOf(createHoodieTableSource.getReadFiles().length), CoreMatchers.is(1));
        Configuration clone = defaultConf.clone();
        clone.setString(FlinkOptions.PATH, new Path(this.tempFile.getAbsolutePath(), "tbl2").toString());
        clone.setString(FlinkOptions.RECORD_KEY_FIELD, "f_date");
        clone.setString(FlinkOptions.PRECOMBINE_FIELD, "f_date");
        TestData.writeDataAsBatch(TestData.DATA_SET_INSERT_HOODIE_KEY_SPECIAL_DATA_TYPE, clone);
        HoodieTableSource createHoodieTableSource2 = createHoodieTableSource(clone);
        createHoodieTableSource2.applyFilters(Collections.singletonList(createLitEquivalenceExpr("f_date", 1, (DataType) DataTypes.DATE().notNull(), LocalDate.ofEpochDay(1L))));
        MatcherAssert.assertThat(Integer.valueOf(createHoodieTableSource2.getDataBucket()), Is.is(1));
        MatcherAssert.assertThat("Files should be pruned", Integer.valueOf(createHoodieTableSource2.getReadFiles().length), CoreMatchers.is(1));
        Configuration clone2 = defaultConf.clone();
        clone2.setString(FlinkOptions.PATH, new Path(this.tempFile.getAbsolutePath(), "tbl3").toString());
        clone2.setString(FlinkOptions.RECORD_KEY_FIELD, "f_decimal");
        clone2.setString(FlinkOptions.PRECOMBINE_FIELD, "f_decimal");
        TestData.writeDataAsBatch(TestData.DATA_SET_INSERT_HOODIE_KEY_SPECIAL_DATA_TYPE, clone2);
        HoodieTableSource createHoodieTableSource3 = createHoodieTableSource(clone2);
        createHoodieTableSource3.applyFilters(Collections.singletonList(createLitEquivalenceExpr("f_decimal", 1, (DataType) DataTypes.DECIMAL(3, 2).notNull(), new BigDecimal("1.11"))));
        MatcherAssert.assertThat(Integer.valueOf(createHoodieTableSource3.getDataBucket()), Is.is(0));
        MatcherAssert.assertThat("Files should be pruned", Integer.valueOf(createHoodieTableSource3.getReadFiles().length), CoreMatchers.is(1));
    }

    @Test
    void testHoodieSourceCachedMetaClient() {
        HoodieTableSource emptyStreamingSource = getEmptyStreamingSource();
        MatcherAssert.assertThat(emptyStreamingSource.getMetaClient(), Is.is(emptyStreamingSource.copy().getMetaClient()));
    }

    @Test
    void testFilterPushDownWithParquetPredicates() {
        HoodieTableSource emptyStreamingSource = getEmptyStreamingSource();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new FieldReferenceExpression("f_int", DataTypes.INT(), 0, 0));
        arrayList.add(new ValueLiteralExpression(10));
        ResolvedExpression callExpression = new CallExpression(BuiltInFunctionDefinitions.EQUALS, arrayList, DataTypes.BOOLEAN());
        ResolvedExpression callExpression2 = new CallExpression(BuiltInFunctionDefinitions.GREATER_THAN, arrayList, DataTypes.BOOLEAN());
        List asList = Arrays.asList(callExpression, callExpression2, new CallExpression(BuiltInFunctionDefinitions.OR, Arrays.asList(callExpression, callExpression2), DataTypes.BOOLEAN()));
        emptyStreamingSource.applyFilters(asList);
        Assertions.assertEquals(ExpressionPredicates.fromExpression(asList).toString(), emptyStreamingSource.getPredicates().toString());
    }

    private HoodieTableSource getEmptyStreamingSource() {
        this.conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        this.conf.setBoolean(FlinkOptions.READ_AS_STREAMING, true);
        this.conf.setBoolean(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true);
        return createHoodieTableSource(this.conf);
    }

    private HoodieTableSource createHoodieTableSource(Configuration configuration) {
        return new HoodieTableSource(TestConfigurations.TABLE_SCHEMA, new Path(configuration.getString(FlinkOptions.PATH)), Arrays.asList(configuration.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",")), "default-par", configuration);
    }

    private ResolvedExpression createLitEquivalenceExpr(String str, int i, DataType dataType, Object obj) {
        return new CallExpression(BuiltInFunctionDefinitions.EQUALS, Arrays.asList(new FieldReferenceExpression(str, dataType, i, i), new ValueLiteralExpression(obj, dataType)), DataTypes.BOOLEAN());
    }
}
