/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.timeline.service.handlers;

import io.hops.hudi.com.fasterxml.jackson.core.JsonProcessingException;
import io.hops.hudi.com.fasterxml.jackson.databind.ObjectMapper;
import io.javalin.http.Context;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hudi.common.conflict.detection.TimelineServerBasedDetectionStrategy;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.exception.HoodieEarlyConflictDetectionException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.timeline.service.RequestHandler;
import org.apache.hudi.timeline.service.TimelineService;
import org.apache.hudi.timeline.service.handlers.Handler;
import org.apache.hudi.timeline.service.handlers.marker.MarkerCreationDispatchingRunnable;
import org.apache.hudi.timeline.service.handlers.marker.MarkerCreationFuture;
import org.apache.hudi.timeline.service.handlers.marker.MarkerDirState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MarkerHandler
extends Handler {
    private static final Logger LOG = LoggerFactory.getLogger(MarkerHandler.class);
    private final Registry metricsRegistry;
    private final ScheduledExecutorService dispatchingExecutorService;
    private final ExecutorService batchingExecutorService;
    private final int parallelism;
    private final Map<String, MarkerDirState> markerDirStateMap = new ConcurrentHashMap<String, MarkerDirState>();
    private final MarkerCreationDispatchingRunnable markerCreationDispatchingRunnable;
    private final Object firstCreationRequestSeenLock = new Object();
    private final Object earlyConflictDetectionLock = new Object();
    private transient HoodieEngineContext hoodieEngineContext;
    private ScheduledFuture<?> dispatchingThreadFuture;
    private boolean firstCreationRequestSeen;
    private String currentMarkerDir = null;
    private TimelineServerBasedDetectionStrategy earlyConflictDetectionStrategy;

    public MarkerHandler(StorageConfiguration<?> conf, TimelineService.Config timelineServiceConfig, HoodieEngineContext hoodieEngineContext, FileSystemViewManager viewManager, Registry metricsRegistry) {
        super(conf, timelineServiceConfig, viewManager);
        LOG.debug("MarkerHandler batching params: batchNumThreads=" + timelineServiceConfig.markerBatchNumThreads + " batchIntervalMs=" + timelineServiceConfig.markerBatchIntervalMs + "ms");
        this.hoodieEngineContext = hoodieEngineContext;
        this.metricsRegistry = metricsRegistry;
        this.parallelism = timelineServiceConfig.markerParallelism;
        this.dispatchingExecutorService = Executors.newSingleThreadScheduledExecutor();
        this.batchingExecutorService = Executors.newFixedThreadPool(timelineServiceConfig.markerBatchNumThreads);
        this.markerCreationDispatchingRunnable = new MarkerCreationDispatchingRunnable(this.markerDirStateMap, this.batchingExecutorService);
        this.firstCreationRequestSeen = false;
    }

    public void stop() {
        if (this.dispatchingThreadFuture != null) {
            this.dispatchingThreadFuture.cancel(true);
        }
        this.dispatchingExecutorService.shutdownNow();
        this.batchingExecutorService.shutdownNow();
        if (this.earlyConflictDetectionStrategy != null) {
            this.earlyConflictDetectionStrategy.stop();
        }
    }

    public Set<String> getAllMarkers(String markerDir) {
        MarkerDirState markerDirState = this.getMarkerDirState(markerDir);
        return markerDirState.getAllMarkers();
    }

    public Set<String> getPendingMarkersToProcess(String markerDir) {
        if (this.markerDirStateMap.containsKey(markerDir)) {
            MarkerDirState markerDirState = this.getMarkerDirState(markerDir);
            return markerDirState.getPendingMarkerCreationRequests(false).stream().map(MarkerCreationFuture::getMarkerName).collect(Collectors.toSet());
        }
        return Collections.emptySet();
    }

    public Set<String> getCreateAndMergeMarkers(String markerDir) {
        return this.getAllMarkers(markerDir).stream().filter(markerName -> !markerName.endsWith(IOType.APPEND.name())).collect(Collectors.toSet());
    }

    public Set<String> getAppendMarkers(String markerDir) {
        return this.getAllMarkers(markerDir).stream().filter(markerName -> markerName.endsWith(IOType.APPEND.name())).collect(Collectors.toSet());
    }

    public boolean doesMarkerDirExist(String markerDir) {
        MarkerDirState markerDirState = this.getMarkerDirState(markerDir);
        return markerDirState.exists();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<String> createMarker(Context context, String markerDir, String markerName, String basePath) {
        if (this.timelineServiceConfig.earlyConflictDetectionEnable.booleanValue()) {
            try {
                Object object = this.earlyConflictDetectionLock;
                synchronized (object) {
                    if (this.earlyConflictDetectionStrategy == null) {
                        String strategyClassName = this.timelineServiceConfig.earlyConflictDetectionStrategy;
                        if (!ReflectionUtils.isSubClass(strategyClassName, TimelineServerBasedDetectionStrategy.class)) {
                            LOG.warn("Cannot use " + strategyClassName + " for timeline-server-based markers.");
                            strategyClassName = "org.apache.hudi.timeline.service.handlers.marker.AsyncTimelineServerBasedDetectionStrategy";
                            LOG.warn("Falling back to " + strategyClassName);
                        }
                        this.earlyConflictDetectionStrategy = (TimelineServerBasedDetectionStrategy)ReflectionUtils.loadClass(strategyClassName, basePath, markerDir, markerName, this.timelineServiceConfig.checkCommitConflict);
                    }
                    if (!markerDir.equalsIgnoreCase(this.currentMarkerDir)) {
                        this.currentMarkerDir = markerDir;
                        Set<String> actions = CollectionUtils.createSet("commit", "deltacommit", "replacecommit");
                        HashSet<HoodieInstant> completedCommits = new HashSet<HoodieInstant>(this.viewManager.getFileSystemView(basePath).getTimeline().filterCompletedInstants().filter(instant -> actions.contains(instant.getAction())).getInstants());
                        this.earlyConflictDetectionStrategy.startAsyncDetection(this.timelineServiceConfig.asyncConflictDetectorInitialDelayMs, this.timelineServiceConfig.asyncConflictDetectorPeriodMs, markerDir, basePath, this.timelineServiceConfig.maxAllowableHeartbeatIntervalInMs, this.getStorage(basePath), this, completedCommits);
                    }
                }
                this.earlyConflictDetectionStrategy.detectAndResolveConflictIfNecessary();
            }
            catch (HoodieEarlyConflictDetectionException he) {
                LOG.warn("Detected the write conflict due to a concurrent writer, failing the marker creation as the early conflict detection is enabled", (Throwable)he);
                return this.finishCreateMarkerFuture(context, markerDir, markerName);
            }
            catch (Exception e) {
                LOG.warn("Failed to execute early conflict detection." + e.getMessage());
                return this.addMarkerCreationRequestForAsyncProcessing(context, markerDir, markerName);
            }
        }
        return this.addMarkerCreationRequestForAsyncProcessing(context, markerDir, markerName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private MarkerCreationFuture addMarkerCreationRequestForAsyncProcessing(Context context, String markerDir, String markerName) {
        LOG.debug("Request: create marker: {}", (Object)markerName);
        MarkerCreationFuture future = new MarkerCreationFuture(context, markerDir, markerName);
        MarkerDirState markerDirState = this.getMarkerDirState(markerDir);
        markerDirState.addMarkerCreationFuture(future);
        if (!this.firstCreationRequestSeen) {
            Object object = this.firstCreationRequestSeenLock;
            synchronized (object) {
                if (!this.firstCreationRequestSeen) {
                    this.dispatchingThreadFuture = this.dispatchingExecutorService.scheduleAtFixedRate(this.markerCreationDispatchingRunnable, this.timelineServiceConfig.markerBatchIntervalMs, this.timelineServiceConfig.markerBatchIntervalMs, TimeUnit.MILLISECONDS);
                    this.firstCreationRequestSeen = true;
                }
            }
        }
        return future;
    }

    private CompletableFuture<String> finishCreateMarkerFuture(Context context, String markerDir, String markerName) {
        MarkerCreationFuture future = new MarkerCreationFuture(context, markerDir, markerName);
        try {
            future.complete(RequestHandler.jsonifyResult(future.getContext(), future.isSuccessful(), this.metricsRegistry, new ObjectMapper(), LOG));
        }
        catch (JsonProcessingException e) {
            throw new HoodieException("Failed to JSON encode the value", e);
        }
        return future;
    }

    public Boolean deleteMarkers(String markerDir) {
        boolean result = this.getMarkerDirState(markerDir).deleteAllMarkers();
        this.markerDirStateMap.remove(markerDir);
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private MarkerDirState getMarkerDirState(String markerDir) {
        MarkerDirState markerDirState = this.markerDirStateMap.get(markerDir);
        if (markerDirState == null) {
            Map<String, MarkerDirState> map = this.markerDirStateMap;
            synchronized (map) {
                if (this.markerDirStateMap.get(markerDir) == null) {
                    Option<TimelineServerBasedDetectionStrategy> strategy = this.timelineServiceConfig.earlyConflictDetectionEnable != false && this.earlyConflictDetectionStrategy != null ? Option.of(this.earlyConflictDetectionStrategy) : Option.empty();
                    markerDirState = new MarkerDirState(markerDir, this.timelineServiceConfig.markerBatchNumThreads, strategy, this.getStorage(markerDir), this.metricsRegistry, this.hoodieEngineContext, this.parallelism);
                    this.markerDirStateMap.put(markerDir, markerDirState);
                } else {
                    markerDirState = this.markerDirStateMap.get(markerDir);
                }
            }
        }
        return markerDirState;
    }
}

