/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.compact;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metrics.FlinkCompactionMetrics;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.action.compact.CompactHelpers;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.FlinkWriteClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CompactionCommitSink
extends CleanFunction<CompactionCommitEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(CompactionCommitSink.class);
    private final Configuration conf;
    private transient Map<String, Map<String, CompactionCommitEvent>> commitBuffer;
    private transient Map<String, HoodieCompactionPlan> compactionPlanCache;
    private transient HoodieFlinkTable<?> table;
    private transient FlinkCompactionMetrics compactionMetrics;

    public CompactionCommitSink(Configuration conf) {
        super(conf);
        this.conf = conf;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        if (this.writeClient == null) {
            this.writeClient = FlinkWriteClients.createWriteClient(this.conf, this.getRuntimeContext());
        }
        this.commitBuffer = new HashMap<String, Map<String, CompactionCommitEvent>>();
        this.compactionPlanCache = new HashMap<String, HoodieCompactionPlan>();
        this.table = this.writeClient.getHoodieTable();
        this.registerMetrics();
    }

    public void invoke(CompactionCommitEvent event, SinkFunction.Context context) throws Exception {
        String instant = event.getInstant();
        if (event.isFailed() || event.getWriteStatuses() != null && event.getWriteStatuses().stream().anyMatch(writeStatus -> writeStatus.getTotalErrorRecords() > 0L)) {
            LOG.warn("Receive abnormal CompactionCommitEvent of instant {}, task ID is {}, is failed: {}, error record count: {}", new Object[]{instant, event.getTaskID(), event.isFailed(), this.getNumErrorRecords(event)});
        }
        this.commitBuffer.computeIfAbsent(instant, k -> new HashMap()).put(event.getFileId(), event);
        this.commitIfNecessary(instant, this.commitBuffer.get(instant).values());
    }

    private long getNumErrorRecords(CompactionCommitEvent event) {
        if (event.getWriteStatuses() == null) {
            return -1L;
        }
        return event.getWriteStatuses().stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void commitIfNecessary(String instant, Collection<CompactionCommitEvent> events) throws IOException {
        boolean isReady;
        HoodieCompactionPlan compactionPlan = this.compactionPlanCache.computeIfAbsent(instant, k -> {
            try {
                return CompactionUtils.getCompactionPlan((HoodieTableMetaClient)this.writeClient.getHoodieTable().getMetaClient(), (String)instant);
            }
            catch (Exception e) {
                throw new HoodieException((Throwable)e);
            }
        });
        boolean bl = isReady = compactionPlan.getOperations().size() == events.size();
        if (!isReady) {
            return;
        }
        if (events.stream().anyMatch(CompactionCommitEvent::isFailed)) {
            try {
                CompactionUtil.rollbackCompaction(this.table, instant);
            }
            finally {
                this.reset(instant);
                this.compactionMetrics.markCompactionRolledBack();
            }
            return;
        }
        try {
            this.doCommit(instant, events);
        }
        catch (Throwable throwable) {
            LOG.error("Error while committing compaction instant: " + instant, throwable);
            this.compactionMetrics.markCompactionRolledBack();
        }
        finally {
            this.reset(instant);
        }
    }

    private void doCommit(String instant, Collection<CompactionCommitEvent> events) throws IOException {
        List statuses = events.stream().map(CompactionCommitEvent::getWriteStatuses).flatMap(Collection::stream).collect(Collectors.toList());
        long numErrorRecords = statuses.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
        if (numErrorRecords > 0L && !this.conf.getBoolean(FlinkOptions.IGNORE_FAILED)) {
            LOG.error("Got {} error records during compaction of instant {},\noption '{}' is configured as false,rolls back the compaction", new Object[]{numErrorRecords, instant, FlinkOptions.IGNORE_FAILED.key()});
            CompactionUtil.rollbackCompaction(this.table, instant);
            this.compactionMetrics.markCompactionRolledBack();
            return;
        }
        HoodieCommitMetadata metadata = CompactHelpers.getInstance().createCompactionMetadata(this.table, instant, (HoodieData)HoodieListData.eager(statuses), this.writeClient.getConfig().getSchema());
        this.writeClient.commitCompaction(instant, metadata, Option.empty());
        this.compactionMetrics.updateCommitMetrics(instant, metadata);
        this.compactionMetrics.markCompactionCompleted();
        if (!this.conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !this.isCleaning) {
            this.writeClient.clean();
        }
    }

    private void reset(String instant) {
        this.commitBuffer.remove(instant);
        this.compactionPlanCache.remove(instant);
    }

    private void registerMetrics() {
        OperatorMetricGroup metrics = this.getRuntimeContext().getMetricGroup();
        this.compactionMetrics = new FlinkCompactionMetrics((MetricGroup)metrics);
        this.compactionMetrics.registerMetrics();
    }
}

