/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client;

import java.io.IOException;
import org.apache.hudi.client.AbstractCompactor;
import org.apache.hudi.client.AbstractHoodieWriteClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;

public class HoodieSparkCompactor<T extends HoodieRecordPayload>
extends AbstractCompactor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
    private static final Logger LOG = LogManager.getLogger(HoodieSparkCompactor.class);
    private transient HoodieEngineContext context;

    public HoodieSparkCompactor(AbstractHoodieWriteClient<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> compactionClient, HoodieEngineContext context) {
        super(compactionClient);
        this.context = context;
    }

    @Override
    public void compact(HoodieInstant instant) throws IOException {
        LOG.info((Object)("Compactor executing compaction " + instant));
        SparkRDDWriteClient writeClient = (SparkRDDWriteClient)this.compactionClient;
        JavaRDD res = (JavaRDD)writeClient.compact(instant.getTimestamp());
        this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status");
        long numWriteErrors = res.collect().stream().filter(WriteStatus::hasErrors).count();
        if (numWriteErrors != 0L) {
            LOG.error((Object)("Compaction for instant (" + instant + ") failed with write errors. Errors :" + numWriteErrors));
            throw new HoodieException("Compaction for instant (" + instant + ") failed with write errors. Errors :" + numWriteErrors);
        }
        writeClient.commitCompaction(instant.getTimestamp(), (JavaRDD<WriteStatus>)res, Option.empty());
    }
}

