/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.clustering;

import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.common.model.ClusteringGroupInfo;
import org.apache.hudi.sink.clustering.ClusteringPlanEvent;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClusteringPlanSourceFunction
extends AbstractRichFunction
implements SourceFunction<ClusteringPlanEvent> {
    protected static final Logger LOG = LoggerFactory.getLogger(ClusteringPlanSourceFunction.class);
    private final HoodieClusteringPlan clusteringPlan;
    private final String clusteringInstantTime;
    private final Configuration conf;

    public ClusteringPlanSourceFunction(String clusteringInstantTime, HoodieClusteringPlan clusteringPlan, Configuration conf) {
        this.clusteringInstantTime = clusteringInstantTime;
        this.clusteringPlan = clusteringPlan;
        this.conf = conf;
    }

    public void open(Configuration parameters) throws Exception {
    }

    public void run(SourceFunction.SourceContext<ClusteringPlanEvent> sourceContext) throws Exception {
        boolean isPending = StreamerUtil.createMetaClient(this.conf).getActiveTimeline().isPendingClusteringInstant(this.clusteringInstantTime);
        if (isPending) {
            for (HoodieClusteringGroup clusteringGroup : this.clusteringPlan.getInputGroups()) {
                LOG.info("Execute clustering plan for instant {} as {} file slices", (Object)this.clusteringInstantTime, (Object)clusteringGroup.getSlices().size());
                sourceContext.collect((Object)new ClusteringPlanEvent(this.clusteringInstantTime, ClusteringGroupInfo.create(clusteringGroup), this.clusteringPlan.getStrategy().getStrategyParams()));
            }
        } else {
            LOG.warn(this.clusteringInstantTime + " not found in pending clustering instants.");
        }
    }

    public void close() throws Exception {
    }

    public void cancel() {
    }
}

