/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.clustering;

import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.common.model.ClusteringGroupInfo;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.metrics.FlinkClusteringMetrics;
import org.apache.hudi.sink.clustering.ClusteringPlanEvent;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.ClusteringUtil;
import org.apache.hudi.util.FlinkTables;
import org.apache.hudi.util.FlinkWriteClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClusteringPlanOperator
extends AbstractStreamOperator<ClusteringPlanEvent>
implements OneInputStreamOperator<Object, ClusteringPlanEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(ClusteringPlanOperator.class);
    private final Configuration conf;
    private transient HoodieFlinkTable table;
    private transient FlinkClusteringMetrics clusteringMetrics;

    public ClusteringPlanOperator(Configuration conf) {
        this.conf = conf;
    }

    public void open() throws Exception {
        super.open();
        this.table = FlinkTables.createTable(this.conf, (RuntimeContext)this.getRuntimeContext());
        this.registerMetrics();
        ClusteringUtil.rollbackClustering(this.table, FlinkWriteClients.createWriteClient(this.conf, (RuntimeContext)this.getRuntimeContext()));
    }

    public void processElement(StreamRecord<Object> streamRecord) {
    }

    public void notifyCheckpointComplete(long checkpointId) {
        try {
            this.table.getMetaClient().reloadActiveTimeline();
            this.scheduleClustering(this.table, checkpointId);
        }
        catch (Throwable throwable) {
            LOG.error("Error while scheduling clustering plan for checkpoint: " + checkpointId, throwable);
        }
    }

    private void scheduleClustering(HoodieFlinkTable<?> table, long checkpointId) {
        List pendingClusteringInstantTimes = ClusteringUtils.getPendingClusteringInstantTimes((HoodieTableMetaClient)table.getMetaClient());
        Option firstRequested = Option.fromJavaOptional(pendingClusteringInstantTimes.stream().filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).findFirst());
        this.clusteringMetrics.setFirstPendingClusteringInstant((Option<HoodieInstant>)firstRequested);
        this.clusteringMetrics.setPendingClusteringCount(pendingClusteringInstantTimes.size());
        if (!firstRequested.isPresent()) {
            LOG.info("No clustering plan for checkpoint " + checkpointId);
            return;
        }
        String clusteringInstantTime = ((HoodieInstant)firstRequested.get()).requestedTime();
        HoodieInstant clusteringInstant = (HoodieInstant)firstRequested.get();
        Option clusteringPlanOption = ClusteringUtils.getClusteringPlan((HoodieTableMetaClient)table.getMetaClient(), (HoodieInstant)clusteringInstant);
        if (!clusteringPlanOption.isPresent()) {
            LOG.info("No clustering plan scheduled");
            return;
        }
        HoodieClusteringPlan clusteringPlan = (HoodieClusteringPlan)((Pair)clusteringPlanOption.get()).getRight();
        if (clusteringPlan == null || clusteringPlan.getInputGroups() == null || clusteringPlan.getInputGroups().isEmpty()) {
            LOG.info("Empty clustering plan for instant " + clusteringInstantTime);
        } else {
            ClusteringUtils.transitionClusteringOrReplaceRequestedToInflight((HoodieInstant)clusteringInstant, (Option)Option.empty(), (HoodieActiveTimeline)table.getActiveTimeline());
            table.getMetaClient().reloadActiveTimeline();
            HashMap<String, Integer> groupIndexMap = new HashMap<String, Integer>();
            int index = 0;
            for (HoodieClusteringGroup clusteringGroup : clusteringPlan.getInputGroups()) {
                int groupIndex;
                ClusteringGroupInfo groupInfo = ClusteringGroupInfo.create((HoodieClusteringGroup)clusteringGroup);
                String groupFileIds = groupInfo.getOperations().stream().map(ClusteringOperation::getFileId).collect(Collectors.joining());
                if (groupIndexMap.containsKey(groupFileIds)) {
                    groupIndex = (Integer)groupIndexMap.get(groupFileIds);
                } else {
                    groupIndex = index++;
                    groupIndexMap.put(groupFileIds, groupIndex);
                }
                LOG.info("Execute clustering plan for instant {} as {} file slices", (Object)clusteringInstantTime, (Object)clusteringGroup.getSlices().size());
                this.output.collect((Object)new StreamRecord((Object)new ClusteringPlanEvent(clusteringInstantTime, groupInfo, clusteringPlan.getStrategy().getStrategyParams(), groupIndex)));
            }
        }
    }

    @VisibleForTesting
    public void setOutput(Output<StreamRecord<ClusteringPlanEvent>> output) {
        this.output = output;
    }

    private void registerMetrics() {
        OperatorMetricGroup metrics = this.getRuntimeContext().getMetricGroup();
        this.clusteringMetrics = new FlinkClusteringMetrics((MetricGroup)metrics);
        this.clusteringMetrics.registerMetrics();
    }
}

