/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.compact;

import com.beust.jcommander.JCommander;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.client.deployment.application.ApplicationExecutionException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.hudi.async.HoodieAsyncTableService;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantGenerator;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.compact.CompactOperator;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactionCommitSink;
import org.apache.hudi.sink.compact.CompactionPlanSourceFunction;
import org.apache.hudi.sink.compact.FlinkCompactionConfig;
import org.apache.hudi.sink.compact.strategy.CompactionPlanStrategies;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieFlinkCompactor {
    protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkCompactor.class);
    private static final String NO_EXECUTE_KEYWORD = "no execute";
    private final AsyncCompactionService compactionScheduleService;

    public HoodieFlinkCompactor(AsyncCompactionService service) {
        this.compactionScheduleService = service;
    }

    public static void main(String[] args) throws Exception {
        FlinkCompactionConfig cfg = HoodieFlinkCompactor.getFlinkCompactionConfig(args);
        Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
        AsyncCompactionService service = new AsyncCompactionService(cfg, conf);
        new HoodieFlinkCompactor(service).start(cfg.serviceMode);
    }

    public void start(boolean serviceMode) throws Exception {
        block14: {
            if (serviceMode) {
                this.compactionScheduleService.start(null);
                try {
                    this.compactionScheduleService.waitForShutdown();
                }
                catch (Exception e) {
                    throw new HoodieException(e.getMessage(), (Throwable)e);
                }
                finally {
                    LOG.info("Shut down hoodie flink compactor");
                }
            }
            LOG.info("Hoodie Flink Compactor running only single round");
            try {
                this.compactionScheduleService.compact();
            }
            catch (ApplicationExecutionException aee) {
                if (aee.getMessage().contains(NO_EXECUTE_KEYWORD)) {
                    LOG.info("Compaction is not performed");
                    break block14;
                }
                throw aee;
            }
            catch (Exception e) {
                LOG.error("Got error running delta sync once. Shutting down", (Throwable)e);
                throw e;
            }
            finally {
                LOG.info("Shut down hoodie flink compactor");
            }
        }
    }

    public static FlinkCompactionConfig getFlinkCompactionConfig(String[] args) {
        FlinkCompactionConfig cfg = new FlinkCompactionConfig();
        JCommander cmd = new JCommander((Object)cfg, null, args);
        if (cfg.help.booleanValue() || args.length == 0) {
            cmd.usage();
            System.exit(1);
        }
        return cfg;
    }

    private static boolean validCompactionPlan(HoodieCompactionPlan plan) {
        return plan != null && plan.getOperations() != null && plan.getOperations().size() > 0;
    }

    public static class AsyncCompactionService
    extends HoodieAsyncTableService {
        private static final long serialVersionUID = 1L;
        private final FlinkCompactionConfig cfg;
        private final Configuration conf;
        private final HoodieTableMetaClient metaClient;
        private final HoodieFlinkWriteClient<?> writeClient;
        private final HoodieFlinkTable<?> table;
        private final ExecutorService executor;

        public AsyncCompactionService(FlinkCompactionConfig cfg, Configuration conf) throws Exception {
            this.cfg = cfg;
            this.conf = conf;
            this.executor = Executors.newFixedThreadPool(1);
            this.metaClient = StreamerUtil.createMetaClient(conf);
            conf.setString(FlinkOptions.TABLE_NAME, this.metaClient.getTableConfig().getTableName());
            CompactionUtil.setAvroSchema(conf, this.metaClient);
            CompactionUtil.setPreCombineField(conf, this.metaClient);
            CompactionUtil.inferChangelogMode(conf, this.metaClient);
            CompactionUtil.inferMetadataConf(conf, this.metaClient);
            this.writeClient = FlinkWriteClients.createWriteClientV2(conf);
            this.writeConfig = this.writeClient.getConfig();
            this.table = this.writeClient.getHoodieTable();
        }

        protected Pair<CompletableFuture, ExecutorService> startService() {
            return Pair.of(CompletableFuture.supplyAsync(() -> {
                boolean error = false;
                try {
                    while (!this.isShutdownRequested()) {
                        try {
                            this.compact();
                            Thread.sleep(this.cfg.minCompactionIntervalSeconds * 1000);
                        }
                        catch (ApplicationExecutionException aee) {
                            if (!aee.getMessage().contains(HoodieFlinkCompactor.NO_EXECUTE_KEYWORD)) throw new HoodieException(aee.getMessage(), (Throwable)aee);
                            LOG.info("Compaction is not performed.");
                        }
                        catch (Exception e) {
                            LOG.error("Shutting down compaction service due to exception", (Throwable)e);
                            error = true;
                            throw new HoodieException(e.getMessage(), (Throwable)e);
                            return true;
                        }
                    }
                }
                finally {
                    this.shutdownAsyncService(error);
                }
            }, this.executor), (Object)this.executor);
        }

        private void compact() throws Exception {
            this.table.getMetaClient().reloadActiveTimeline();
            if (this.cfg.schedule.booleanValue()) {
                boolean scheduled = this.writeClient.scheduleCompaction(Option.empty()).isPresent();
                if (!scheduled) {
                    LOG.info("No compaction plan for this job ");
                    return;
                }
                this.table.getMetaClient().reloadActiveTimeline();
            }
            HoodieTimeline pendingCompactionTimeline = this.table.getActiveTimeline().filterPendingCompactionTimeline();
            List<HoodieInstant> requested = CompactionPlanStrategies.getStrategy(this.cfg).select(pendingCompactionTimeline);
            if (requested.isEmpty()) {
                LOG.info("No compaction plan scheduled, turns on the compaction plan schedule with --schedule option");
                return;
            }
            List<String> compactionInstantTimes = requested.stream().map(HoodieInstant::requestedTime).collect(Collectors.toList());
            compactionInstantTimes.forEach(timestamp -> {
                HoodieInstant inflightInstant = this.table.getInstantGenerator().getCompactionInflightInstant(timestamp);
                if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
                    LOG.info("Rollback inflight compaction instant: [" + timestamp + "]");
                    this.table.rollbackInflightCompaction(inflightInstant);
                    this.table.getMetaClient().reloadActiveTimeline();
                }
            });
            List<Pair<String, HoodieCompactionPlan>> compactionPlans = compactionInstantTimes.stream().map(timestamp -> {
                try {
                    return Pair.of((Object)timestamp, (Object)CompactionUtils.getCompactionPlan((HoodieTableMetaClient)this.table.getMetaClient(), (String)timestamp));
                }
                catch (Exception e) {
                    throw new HoodieException("Get compaction plan at instant " + timestamp + " error", (Throwable)e);
                }
            }).filter(pair -> HoodieFlinkCompactor.validCompactionPlan((HoodieCompactionPlan)pair.getRight())).collect(Collectors.toList());
            if (compactionPlans.isEmpty()) {
                LOG.info("No compaction plan for instant " + String.join((CharSequence)",", compactionInstantTimes));
                return;
            }
            InstantGenerator instantGenerator = this.table.getInstantGenerator();
            List instants = compactionInstantTimes.stream().map(arg_0 -> ((InstantGenerator)instantGenerator).getCompactionRequestedInstant(arg_0)).collect(Collectors.toList());
            int totalOperations = Math.toIntExact(compactionPlans.stream().mapToLong(pair -> ((HoodieCompactionPlan)pair.getRight()).getOperations().size()).sum());
            int compactionParallelism = this.conf.getInteger(FlinkOptions.COMPACTION_TASKS) == -1 ? totalOperations : Math.min(this.conf.getInteger(FlinkOptions.COMPACTION_TASKS), totalOperations);
            LOG.info("Start to compaction for instant " + compactionInstantTimes);
            for (HoodieInstant instant : instants) {
                this.table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
            }
            this.table.getMetaClient().reloadActiveTimeline();
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.addSource((SourceFunction)new CompactionPlanSourceFunction(compactionPlans, this.conf)).name("compaction_source").uid("uid_compaction_source").rebalance().transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), (OneInputStreamOperator)new CompactOperator(this.conf)).setParallelism(compactionParallelism).addSink((SinkFunction)new CompactionCommitSink(this.conf)).name("compaction_commit").uid("uid_compaction_commit").setParallelism(1).getTransformation().setMaxParallelism(1);
            env.execute("flink_hudi_compaction_" + String.join((CharSequence)",", compactionInstantTimes));
        }

        public void shutdownAsyncService(boolean error) {
            LOG.info("Gracefully shutting down compactor. Error ?" + error);
            this.executor.shutdown();
            this.writeClient.close();
        }

        @VisibleForTesting
        public void shutDown() {
            this.shutdownAsyncService(false);
        }
    }
}

