/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources.helpers;

import java.util.Collections;
import java.util.List;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.config.HoodieIncrSourceConfig;
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueryRunner {
    private final SparkSession sparkSession;
    private final String sourcePath;
    private static final Logger LOG = LoggerFactory.getLogger(QueryRunner.class);

    public QueryRunner(SparkSession sparkSession, TypedProperties props) {
        this.sparkSession = sparkSession;
        ConfigUtils.checkRequiredConfigProperties(props, Collections.singletonList(HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH));
        this.sourcePath = ConfigUtils.getStringWithAltKeys(props, HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH);
    }

    public Dataset<Row> run(QueryInfo queryInfo) {
        Dataset<Row> dataset = null;
        if (queryInfo.isIncremental()) {
            dataset = this.runIncrementalQuery(queryInfo);
        } else if (queryInfo.isSnapshot()) {
            dataset = this.runSnapshotQuery(queryInfo);
        } else {
            throw new HoodieException("Unknown query type " + queryInfo.getQueryType());
        }
        return dataset;
    }

    public static Dataset<Row> applyOrdering(Dataset<Row> dataset, List<String> orderByColumns) {
        if (orderByColumns != null && !orderByColumns.isEmpty()) {
            LOG.debug("Applying ordering " + orderByColumns);
            return dataset.orderBy((Column[])orderByColumns.stream().map(functions::col).toArray(Column[]::new));
        }
        return dataset;
    }

    public Dataset<Row> runIncrementalQuery(QueryInfo queryInfo) {
        LOG.info("Running incremental query");
        return this.sparkSession.read().format("org.apache.hudi").option(DataSourceReadOptions.QUERY_TYPE().key(), queryInfo.getQueryType()).option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), queryInfo.getPreviousInstant()).option(DataSourceReadOptions.END_INSTANTTIME().key(), queryInfo.getEndInstant()).load(this.sourcePath);
    }

    public Dataset<Row> runSnapshotQuery(QueryInfo queryInfo) {
        LOG.info("Running snapshot query");
        return this.sparkSession.read().format("org.apache.hudi").option(DataSourceReadOptions.QUERY_TYPE().key(), queryInfo.getQueryType()).load(this.sourcePath).filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, queryInfo.getStartInstant())).filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, queryInfo.getEndInstant()));
    }
}

