package org.apache.hudi.sink.compact;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/sink/compact/CompactionCommitSink.class */
public class CompactionCommitSink extends RichSinkFunction<CompactionCommitEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(CompactionCommitSink.class);
    private final Configuration conf;
    private transient HoodieFlinkWriteClient writeClient;
    private transient List<CompactionCommitEvent> commitBuffer;
    private String compactionInstantTime;

    public CompactionCommitSink(Configuration configuration) {
        this.conf = configuration;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        initWriteClient();
        this.commitBuffer = new ArrayList();
    }

    public void invoke(CompactionCommitEvent compactionCommitEvent, SinkFunction.Context context) throws Exception {
        if (this.compactionInstantTime == null) {
            this.compactionInstantTime = compactionCommitEvent.getInstant();
        } else if (!compactionCommitEvent.getInstant().equals(this.compactionInstantTime)) {
            this.writeClient.rollbackInflightCompaction(HoodieTimeline.getCompactionInflightInstant(this.compactionInstantTime));
            this.compactionInstantTime = compactionCommitEvent.getInstant();
        }
        this.commitBuffer.add(compactionCommitEvent);
        commitIfNecessary();
    }

    private void commitIfNecessary() throws IOException {
        if (CompactionUtils.getCompactionPlan(this.writeClient.getHoodieTable().getMetaClient(), this.compactionInstantTime).getOperations().size() == this.commitBuffer.size() && this.commitBuffer.stream().allMatch(compactionCommitEvent -> {
            return compactionCommitEvent != null && Objects.equals(compactionCommitEvent.getInstant(), this.compactionInstantTime);
        })) {
            List list = (List) this.commitBuffer.stream().map((v0) -> {
                return v0.getWriteStatuses();
            }).flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toList());
            if (this.writeClient.getConfig().shouldAutoCommit().booleanValue()) {
                List<HoodieWriteStat> list2 = (List) list.stream().map((v0) -> {
                    return v0.getStat();
                }).collect(Collectors.toList());
                HoodieCommitMetadata hoodieCommitMetadata = new HoodieCommitMetadata(true);
                for (HoodieWriteStat hoodieWriteStat : list2) {
                    hoodieCommitMetadata.addWriteStat(hoodieWriteStat.getPartitionPath(), hoodieWriteStat);
                }
                hoodieCommitMetadata.addMetadata("schema", this.writeClient.getConfig().getSchema());
                this.writeClient.completeCompaction(hoodieCommitMetadata, list, this.writeClient.getHoodieTable(), this.compactionInstantTime);
            }
            this.writeClient.commitCompaction(this.compactionInstantTime, list, Option.empty());
            reset();
        }
    }

    private void reset() {
        this.commitBuffer.clear();
        this.compactionInstantTime = null;
    }

    private void initWriteClient() {
        this.writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(new SerializableConfiguration(StreamerUtil.getHadoopConf()), new FlinkTaskContextSupplier(getRuntimeContext())), StreamerUtil.getHoodieClientConfig(this.conf));
    }
}
