/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import java.io.Serializable;
import java.util.Properties;
import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.PublicAPIClass;
import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.table.checkpoint.CheckpointUtils;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Either;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieErrorTableConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.utilities.callback.SourceCommitCallback;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
import org.apache.hudi.utilities.streamer.StreamContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicAPIClass(maturity=ApiMaturityLevel.STABLE)
public abstract class Source<T>
implements SourceCommitCallback,
Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(Source.class);
    protected transient TypedProperties props;
    protected transient JavaSparkContext sparkContext;
    protected transient SparkSession sparkSession;
    protected transient Option<SourceProfileSupplier> sourceProfileSupplier;
    protected int writeTableVersion;
    private transient SchemaProvider overriddenSchemaProvider;
    private final SourceType sourceType;
    private final StorageLevel storageLevel;
    protected final boolean persistRdd;
    private Either<Dataset<Row>, JavaRDD<?>> cachedSourceRdd = null;

    protected Source(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) {
        this(props, sparkContext, sparkSession, schemaProvider, SourceType.AVRO);
    }

    protected Source(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, SourceType sourceType) {
        this(props, sparkContext, sparkSession, sourceType, new DefaultStreamContext(schemaProvider, Option.empty()));
    }

    protected Source(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SourceType sourceType, StreamContext streamContext) {
        this.props = props;
        this.sparkContext = sparkContext;
        this.sparkSession = sparkSession;
        this.overriddenSchemaProvider = streamContext.getSchemaProvider();
        this.sourceType = sourceType;
        this.sourceProfileSupplier = streamContext.getSourceProfileSupplier();
        this.storageLevel = StorageLevel.fromString((String)ConfigUtils.getStringWithAltKeys((Properties)props, HoodieWriteConfig.TAGGED_RECORD_STORAGE_LEVEL_VALUE, true));
        this.persistRdd = ConfigUtils.getBooleanWithAltKeys(props, HoodieErrorTableConfig.ERROR_TABLE_PERSIST_SOURCE_RDD);
        this.writeTableVersion = ConfigUtils.getIntWithAltKeys(props, HoodieWriteConfig.WRITE_TABLE_VERSION);
    }

    @Deprecated
    @PublicAPIMethod(maturity=ApiMaturityLevel.STABLE)
    protected abstract InputBatch<T> fetchNewData(Option<String> var1, long var2);

    @PublicAPIMethod(maturity=ApiMaturityLevel.EVOLVING)
    protected InputBatch<T> readFromCheckpoint(Option<Checkpoint> lastCheckpoint, long sourceLimit) {
        LOG.warn("In Hudi 1.0+, the checkpoint based on Hudi timeline is changed. If your Source implementation relies on request time as the checkpoint, you may consider migrating to completion time-based checkpoint by overriding Source#translateCheckpoint and Source#fetchNewDataFromCheckpoint");
        return this.fetchNewData(lastCheckpoint.isPresent() ? Option.of(lastCheckpoint.get().getCheckpointKey()) : Option.empty(), sourceLimit);
    }

    @PublicAPIMethod(maturity=ApiMaturityLevel.EVOLVING)
    protected Option<Checkpoint> translateCheckpoint(Option<Checkpoint> lastCheckpoint) {
        if (lastCheckpoint.isEmpty()) {
            return Option.empty();
        }
        if (CheckpointUtils.shouldTargetCheckpointV2(this.writeTableVersion, this.getClass().getName())) {
            if (lastCheckpoint.get() instanceof StreamerCheckpointV2) {
                return lastCheckpoint;
            }
            if (lastCheckpoint.get() instanceof StreamerCheckpointV1) {
                StreamerCheckpointV2 newCheckpoint = new StreamerCheckpointV2(lastCheckpoint.get());
                newCheckpoint.addV1Props();
                return Option.of(newCheckpoint);
            }
        } else {
            if (lastCheckpoint.get() instanceof StreamerCheckpointV2) {
                return Option.of(new StreamerCheckpointV1(lastCheckpoint.get()));
            }
            if (lastCheckpoint.get() instanceof StreamerCheckpointV1) {
                return lastCheckpoint;
            }
        }
        throw new UnsupportedOperationException("Unsupported checkpoint type: " + lastCheckpoint.get());
    }

    public void assertCheckpointVersion(Option<Checkpoint> lastCheckpoint, Option<Checkpoint> lastCheckpointTranslated, Checkpoint checkpoint) {
        if (checkpoint != null) {
            boolean shouldBeV2Checkpoint = CheckpointUtils.shouldTargetCheckpointV2(this.writeTableVersion, this.getClass().getName());
            String errorMessage2 = String.format("Data source should return checkpoint version V%s. The checkpoint resumed in the iteration is %s, whose translated version is %s. The checkpoint returned after the iteration %s.", shouldBeV2Checkpoint ? "2" : "1", lastCheckpoint.isEmpty() ? "null" : lastCheckpointTranslated.get(), lastCheckpointTranslated.isEmpty() ? "null" : lastCheckpointTranslated.get(), checkpoint);
            if (shouldBeV2Checkpoint && !(checkpoint instanceof StreamerCheckpointV2)) {
                throw new IllegalStateException(errorMessage2);
            }
            if (!shouldBeV2Checkpoint && !(checkpoint instanceof StreamerCheckpointV1)) {
                throw new IllegalStateException(errorMessage2);
            }
        }
    }

    public final InputBatch<T> fetchNext(Option<Checkpoint> lastCheckpoint, long sourceLimit) {
        Option<Checkpoint> lastCheckpointTranslated = this.translateCheckpoint(lastCheckpoint);
        InputBatch batch = this.readFromCheckpoint(lastCheckpointTranslated, sourceLimit);
        batch.getBatch().ifPresent(this::persist);
        return this.overriddenSchemaProvider == null ? batch : new InputBatch(batch.getBatch(), batch.getCheckpointForNextBatch(), this.overriddenSchemaProvider);
    }

    public SourceType getSourceType() {
        return this.sourceType;
    }

    public SparkSession getSparkSession() {
        return this.sparkSession;
    }

    private synchronized void persist(T data2) {
        boolean isSparkRdd;
        boolean bl = isSparkRdd = data2.getClass().isAssignableFrom(Dataset.class) || data2.getClass().isAssignableFrom(JavaRDD.class);
        if (this.allowSourcePersist() && isSparkRdd) {
            if (data2.getClass().isAssignableFrom(Dataset.class)) {
                Dataset df = (Dataset)data2;
                this.cachedSourceRdd = Either.left(df);
                df.persist(this.storageLevel);
            } else {
                JavaRDD javaRDD = (JavaRDD)data2;
                this.cachedSourceRdd = Either.right(javaRDD);
                javaRDD.persist(this.storageLevel);
            }
        }
    }

    protected boolean allowSourcePersist() {
        return this.persistRdd;
    }

    @Override
    public void releaseResources() {
        if (this.cachedSourceRdd != null && this.cachedSourceRdd.isLeft()) {
            this.cachedSourceRdd.asLeft().unpersist();
        } else if (this.cachedSourceRdd != null && this.cachedSourceRdd.isRight()) {
            this.cachedSourceRdd.asRight().unpersist();
        }
    }

    public static enum SourceType {
        JSON,
        AVRO,
        ROW,
        PROTO;

    }
}

