/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.druid.io.druid.segment.realtime.appenderator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hive.druid.com.google.common.base.Function;
import org.apache.hive.druid.com.google.common.base.Joiner;
import org.apache.hive.druid.com.google.common.base.Stopwatch;
import org.apache.hive.druid.com.google.common.base.Supplier;
import org.apache.hive.druid.com.google.common.base.Throwables;
import org.apache.hive.druid.com.google.common.collect.ImmutableList;
import org.apache.hive.druid.com.google.common.collect.ImmutableMap;
import org.apache.hive.druid.com.google.common.collect.Lists;
import org.apache.hive.druid.com.google.common.util.concurrent.FutureCallback;
import org.apache.hive.druid.com.google.common.util.concurrent.Futures;
import org.apache.hive.druid.io.druid.common.guava.ThreadRenamingCallable;
import org.apache.hive.druid.io.druid.data.input.Committer;
import org.apache.hive.druid.io.druid.data.input.InputRow;
import org.apache.hive.druid.io.druid.java.util.common.DateTimes;
import org.apache.hive.druid.io.druid.java.util.common.ISE;
import org.apache.hive.druid.io.druid.java.util.common.StringUtils;
import org.apache.hive.druid.io.druid.java.util.common.concurrent.Execs;
import org.apache.hive.druid.io.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.hive.druid.io.druid.java.util.common.granularity.Granularity;
import org.apache.hive.druid.io.druid.java.util.common.guava.Sequence;
import org.apache.hive.druid.io.druid.java.util.emitter.EmittingLogger;
import org.apache.hive.druid.io.druid.query.Query;
import org.apache.hive.druid.io.druid.query.QueryPlus;
import org.apache.hive.druid.io.druid.query.QueryRunner;
import org.apache.hive.druid.io.druid.segment.incremental.IndexSizeExceededException;
import org.apache.hive.druid.io.druid.segment.indexing.DataSchema;
import org.apache.hive.druid.io.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.hive.druid.io.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.hive.druid.io.druid.segment.realtime.SegmentPublisher;
import org.apache.hive.druid.io.druid.segment.realtime.appenderator.Appenderator;
import org.apache.hive.druid.io.druid.segment.realtime.appenderator.SegmentIdentifier;
import org.apache.hive.druid.io.druid.segment.realtime.appenderator.SegmentNotWritableException;
import org.apache.hive.druid.io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
import org.apache.hive.druid.io.druid.segment.realtime.plumber.Committers;
import org.apache.hive.druid.io.druid.segment.realtime.plumber.Plumber;
import org.apache.hive.druid.io.druid.segment.realtime.plumber.RejectionPolicy;
import org.apache.hive.druid.io.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import org.apache.hive.druid.io.druid.segment.realtime.plumber.VersioningPolicy;
import org.apache.hive.druid.io.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.hive.druid.io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;

public class AppenderatorPlumber
implements Plumber {
    private static final EmittingLogger log = new EmittingLogger(AppenderatorPlumber.class);
    private static final int WARN_DELAY = 1000;
    private final DataSchema schema;
    private final RealtimeTuningConfig config;
    private final RejectionPolicy rejectionPolicy;
    private final FireDepartmentMetrics metrics;
    private final DataSegmentAnnouncer segmentAnnouncer;
    private final SegmentPublisher segmentPublisher;
    private final SegmentHandoffNotifier handoffNotifier;
    private final Object handoffCondition = new Object();
    private final Map<Long, SegmentIdentifier> segments = new ConcurrentHashMap<Long, SegmentIdentifier>();
    private final Appenderator appenderator;
    private volatile boolean shuttingDown = false;
    private volatile boolean stopped = false;
    private volatile boolean cleanShutdown = true;
    private volatile ScheduledExecutorService scheduledExecutor = null;
    private volatile Supplier<Committer> lastCommitterSupplier = null;

    public AppenderatorPlumber(DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics, DataSegmentAnnouncer segmentAnnouncer, SegmentPublisher segmentPublisher, SegmentHandoffNotifier handoffNotifier, Appenderator appenderator) {
        this.schema = schema;
        this.config = config;
        this.rejectionPolicy = config.getRejectionPolicyFactory().create(config.getWindowPeriod());
        this.metrics = metrics;
        this.segmentAnnouncer = segmentAnnouncer;
        this.segmentPublisher = segmentPublisher;
        this.handoffNotifier = handoffNotifier;
        this.appenderator = appenderator;
        log.info("Creating plumber using rejectionPolicy[%s]", this.getRejectionPolicy());
    }

    public Map<Long, SegmentIdentifier> getSegmentsView() {
        return ImmutableMap.copyOf(this.segments);
    }

    public DataSchema getSchema() {
        return this.schema;
    }

    public RealtimeTuningConfig getConfig() {
        return this.config;
    }

    public RejectionPolicy getRejectionPolicy() {
        return this.rejectionPolicy;
    }

    @Override
    public Object startJob() {
        this.handoffNotifier.start();
        Object retVal = this.appenderator.startJob();
        this.initializeExecutors();
        this.startPersistThread();
        this.mergeAndPush();
        return retVal;
    }

    @Override
    public int add(InputRow row, Supplier<Committer> committerSupplier) throws IndexSizeExceededException {
        SegmentIdentifier identifier = this.getSegmentIdentifier(row.getTimestampFromEpoch());
        if (identifier == null) {
            return -1;
        }
        try {
            int numRows = this.appenderator.add(identifier, row, committerSupplier).getNumRowsInSegment();
            this.lastCommitterSupplier = committerSupplier;
            return numRows;
        }
        catch (SegmentNotWritableException e) {
            return -1;
        }
    }

    @Override
    public <T> QueryRunner<T> getQueryRunner(Query<T> query) {
        return new QueryRunner<T>(){

            @Override
            public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext) {
                return queryPlus.run(AppenderatorPlumber.this.appenderator, responseContext);
            }
        };
    }

    @Override
    public void persist(Committer committer) {
        Stopwatch runExecStopwatch = Stopwatch.createStarted();
        this.appenderator.persistAll(committer);
        long startDelay = runExecStopwatch.elapsed(TimeUnit.MILLISECONDS);
        this.metrics.incrementPersistBackPressureMillis(startDelay);
        if (startDelay > 1000L) {
            log.warn("Ingestion was throttled for [%,d] millis because persists were pending.", startDelay);
        }
        runExecStopwatch.stop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void finishJob() {
        log.info("Shutting down...", new Object[0]);
        this.shuttingDown = true;
        List<SegmentIdentifier> pending = this.appenderator.getSegments();
        if (pending.isEmpty()) {
            log.info("No segments to hand off.", new Object[0]);
        } else {
            log.info("Pushing segments: %s", Joiner.on(", ").join(pending));
        }
        try {
            if (this.lastCommitterSupplier != null) {
                this.mergeAndPush();
            }
            Object object = this.handoffCondition;
            synchronized (object) {
                while (!this.segments.isEmpty()) {
                    log.info("Waiting to hand off: %s", Joiner.on(", ").join(pending));
                    this.handoffCondition.wait();
                    pending = this.appenderator.getSegments();
                }
            }
        }
        catch (Exception e) {
            throw Throwables.propagate(e);
        }
        finally {
            this.stopped = true;
            this.handoffNotifier.close();
            this.shutdownExecutors();
            this.appenderator.close();
        }
        if (!this.cleanShutdown) {
            throw new ISE("Exception occurred during persist and merge.", new Object[0]);
        }
    }

    private SegmentIdentifier getSegmentIdentifier(long timestamp) {
        if (!this.rejectionPolicy.accept(timestamp)) {
            return null;
        }
        Granularity segmentGranularity = this.schema.getGranularitySpec().getSegmentGranularity();
        VersioningPolicy versioningPolicy = this.config.getVersioningPolicy();
        DateTime truncatedDateTime = segmentGranularity.bucketStart(DateTimes.utc(timestamp));
        long truncatedTime = truncatedDateTime.getMillis();
        SegmentIdentifier retVal = this.segments.get(truncatedTime);
        if (retVal == null) {
            Interval interval = new Interval((ReadableInstant)truncatedDateTime, (ReadableInstant)segmentGranularity.increment(truncatedDateTime));
            retVal = new SegmentIdentifier(this.schema.getDataSource(), interval, versioningPolicy.getVersion(interval), this.config.getShardSpec());
            this.addSegment(retVal);
        }
        return retVal;
    }

    protected void initializeExecutors() {
        if (this.scheduledExecutor == null) {
            this.scheduledExecutor = Execs.scheduledSingleThreaded("plumber_scheduled_%d");
        }
    }

    protected void shutdownExecutors() {
        if (this.scheduledExecutor != null) {
            this.scheduledExecutor.shutdown();
        }
    }

    private void addSegment(SegmentIdentifier identifier) {
        this.segments.put(identifier.getInterval().getStartMillis(), identifier);
        try {
            this.segmentAnnouncer.announceSegment(new DataSegment(identifier.getDataSource(), identifier.getInterval(), identifier.getVersion(), ImmutableMap.of(), ImmutableList.of(), ImmutableList.of(), identifier.getShardSpec(), null, 0L));
        }
        catch (IOException e) {
            log.makeAlert(e, "Failed to announce new segment[%s]", identifier.getDataSource()).addData("interval", identifier.getInterval()).emit();
        }
    }

    public void dropSegment(final SegmentIdentifier identifier) {
        log.info("Dropping segment: %s", identifier);
        this.segments.remove(identifier.getInterval().getStartMillis());
        Futures.addCallback(this.appenderator.drop(identifier), new FutureCallback<Object>(){

            @Override
            public void onSuccess(Object result) {
                log.info("Dropped segment: %s", identifier);
            }

            @Override
            public void onFailure(Throwable e) {
                log.warn(e, "Failed to drop segment: %s", identifier);
            }
        });
    }

    private void startPersistThread() {
        Granularity segmentGranularity = this.schema.getGranularitySpec().getSegmentGranularity();
        Period windowPeriod = this.config.getWindowPeriod();
        DateTime truncatedNow = segmentGranularity.bucketStart(DateTimes.nowUtc());
        long windowMillis = windowPeriod.toStandardDuration().getMillis();
        log.info("Expect to run at [%s]", DateTimes.nowUtc().plus((ReadableDuration)new Duration(System.currentTimeMillis(), segmentGranularity.increment(truncatedNow).getMillis() + windowMillis)));
        String threadName = StringUtils.format("%s-overseer-%d", this.schema.getDataSource(), this.config.getShardSpec().getPartitionNum());
        ThreadRenamingCallable<ScheduledExecutors.Signal> threadRenamingCallable = new ThreadRenamingCallable<ScheduledExecutors.Signal>(threadName){

            @Override
            public ScheduledExecutors.Signal doCall() {
                if (AppenderatorPlumber.this.stopped) {
                    log.info("Stopping merge-n-push overseer thread", new Object[0]);
                    return ScheduledExecutors.Signal.STOP;
                }
                AppenderatorPlumber.this.mergeAndPush();
                if (AppenderatorPlumber.this.stopped) {
                    log.info("Stopping merge-n-push overseer thread", new Object[0]);
                    return ScheduledExecutors.Signal.STOP;
                }
                return ScheduledExecutors.Signal.REPEAT;
            }
        };
        Duration initialDelay = new Duration(System.currentTimeMillis(), segmentGranularity.increment(truncatedNow).getMillis() + windowMillis);
        Duration rate = new Duration((ReadableInstant)truncatedNow, (ReadableInstant)segmentGranularity.increment(truncatedNow));
        ScheduledExecutors.scheduleAtFixedRate(this.scheduledExecutor, initialDelay, rate, (Callable<ScheduledExecutors.Signal>)threadRenamingCallable);
    }

    private void mergeAndPush() {
        Granularity segmentGranularity = this.schema.getGranularitySpec().getSegmentGranularity();
        Period windowPeriod = this.config.getWindowPeriod();
        long windowMillis = windowPeriod.toStandardDuration().getMillis();
        log.info("Starting merge and push.", new Object[0]);
        DateTime minTimestampAsDate = segmentGranularity.bucketStart(DateTimes.utc(Math.max(windowMillis, this.rejectionPolicy.getCurrMaxTime().getMillis()) - windowMillis));
        long minTimestamp = minTimestampAsDate.getMillis();
        List<SegmentIdentifier> appenderatorSegments = this.appenderator.getSegments();
        final ArrayList<SegmentIdentifier> segmentsToPush = Lists.newArrayList();
        if (this.shuttingDown) {
            log.info("Found [%,d] segments. Attempting to hand off all of them.", appenderatorSegments.size());
            segmentsToPush.addAll(appenderatorSegments);
        } else {
            log.info("Found [%,d] segments. Attempting to hand off segments that start before [%s].", appenderatorSegments.size(), minTimestampAsDate);
            for (SegmentIdentifier segment : appenderatorSegments) {
                Long intervalStart = segment.getInterval().getStartMillis();
                if (intervalStart < minTimestamp) {
                    log.info("Adding entry [%s] for merge and push.", segment);
                    segmentsToPush.add(segment);
                    continue;
                }
                log.info("Skipping persist and merge for entry [%s] : Start time [%s] >= [%s] min timestamp required in this run. Segment will be picked up in a future run.", segment, DateTimes.utc(intervalStart), minTimestampAsDate);
            }
        }
        log.info("Found [%,d] segments to persist and merge", segmentsToPush.size());
        final Function<Throwable, Void> errorHandler = new Function<Throwable, Void>(){

            @Override
            public Void apply(Throwable throwable) {
                List<String> segmentIdentifierStrings = Lists.transform(segmentsToPush, new Function<SegmentIdentifier, String>(){

                    @Override
                    public String apply(SegmentIdentifier input) {
                        return input.getIdentifierAsString();
                    }
                });
                log.makeAlert(throwable, "Failed to publish merged indexes[%s]", AppenderatorPlumber.this.schema.getDataSource()).addData("segments", segmentIdentifierStrings).emit();
                if (AppenderatorPlumber.this.shuttingDown) {
                    AppenderatorPlumber.this.cleanShutdown = false;
                    for (SegmentIdentifier identifier : segmentsToPush) {
                        AppenderatorPlumber.this.dropSegment(identifier);
                    }
                }
                return null;
            }
        };
        Futures.addCallback(this.appenderator.push(segmentsToPush, Committers.nil()), new FutureCallback<SegmentsAndMetadata>(){

            @Override
            public void onSuccess(SegmentsAndMetadata result) {
                for (DataSegment pushedSegment : result.getSegments()) {
                    try {
                        AppenderatorPlumber.this.segmentPublisher.publishSegment(pushedSegment);
                    }
                    catch (Exception e) {
                        errorHandler.apply(e);
                    }
                }
                log.info("Published [%,d] sinks.", segmentsToPush.size());
            }

            @Override
            public void onFailure(Throwable e) {
                log.warn(e, "Failed to push [%,d] segments.", segmentsToPush.size());
                errorHandler.apply(e);
            }
        });
    }
}

