/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink;

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.apache.hudi.adapter.OperatorCoordinatorAdapter;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.sink.event.CommitAckEvent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.meta.CkpMetadata;
import org.apache.hudi.sink.meta.CkpMetadataFactory;
import org.apache.hudi.sink.utils.HiveSyncContext;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.util.ClientIds;
import org.apache.hudi.util.ClusteringUtil;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamWriteOperatorCoordinator
implements OperatorCoordinatorAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(StreamWriteOperatorCoordinator.class);
    private final Configuration conf;
    private final StorageConfiguration<org.apache.hadoop.conf.Configuration> storageConf;
    private final OperatorCoordinator.Context context;
    private transient OperatorCoordinator.SubtaskGateway[] gateways;
    private transient HoodieFlinkWriteClient writeClient;
    private transient HoodieTableMetaClient metaClient;
    private volatile String instant = "";
    private transient WriteMetadataEvent[] eventBuffer;
    private final int parallelism;
    private NonThrownExecutor executor;
    private NonThrownExecutor hiveSyncExecutor;
    private HiveSyncContext hiveSyncContext;
    private transient TableState tableState;
    private CkpMetadata ckpMetadata;
    private ClientIds clientIds;

    public StreamWriteOperatorCoordinator(Configuration conf, OperatorCoordinator.Context context) {
        this.conf = conf;
        this.context = context;
        this.parallelism = context.currentParallelism();
        this.storageConf = HadoopFSUtils.getStorageConfWithCopy(HadoopConfigurations.getHiveConf(conf));
    }

    public void start() throws Exception {
        Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
        this.reset();
        this.gateways = new OperatorCoordinator.SubtaskGateway[this.parallelism];
        this.metaClient = StreamerUtil.initTableIfNotExists(this.conf);
        this.writeClient = FlinkWriteClients.createWriteClient(this.conf);
        this.ckpMetadata = StreamWriteOperatorCoordinator.initCkpMetadata(this.writeClient.getConfig(), this.conf);
        StreamWriteOperatorCoordinator.initMetadataTable(this.writeClient);
        this.tableState = TableState.create(this.conf);
        this.executor = NonThrownExecutor.builder(LOG).exceptionHook((errMsg, t) -> this.context.failJob((Throwable)new HoodieException(errMsg, t))).waitForTasksFinish(true).build();
        if (this.tableState.syncHive) {
            this.initHiveSync();
        }
        if (OptionsResolver.isMultiWriter(this.conf)) {
            this.initClientIds(this.conf);
        }
    }

    public void close() throws Exception {
        if (this.executor != null) {
            this.executor.close();
        }
        if (this.hiveSyncExecutor != null) {
            this.hiveSyncExecutor.close();
        }
        if (this.writeClient != null) {
            this.writeClient.close();
        }
        this.eventBuffer = null;
        if (this.ckpMetadata != null) {
            this.ckpMetadata.close();
        }
        if (this.clientIds != null) {
            this.clientIds.close();
        }
    }

    public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) {
        this.executor.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            try {
                result.complete(new byte[0]);
            }
            catch (Throwable throwable) {
                result.completeExceptionally(new CompletionException(String.format("Failed to checkpoint Instant %s for source %s", this.instant, this.getClass().getSimpleName()), throwable));
            }
        }), "taking checkpoint %d", checkpointId);
    }

    public void notifyCheckpointComplete(long checkpointId) {
        this.executor.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
            boolean committed = this.commitInstant(this.instant, checkpointId);
            this.scheduleTableServices(committed);
            if (committed) {
                this.syncHiveAsync();
                this.startInstant();
            }
        }), "commits the instant %s", this.instant);
    }

    public void resetToCheckpoint(long checkpointID, byte[] checkpointData) {
    }

    @Override
    public void handleEventFromOperator(int i, OperatorEvent operatorEvent) {
        ValidationUtils.checkState(operatorEvent instanceof WriteMetadataEvent, "The coordinator can only handle WriteMetaEvent");
        WriteMetadataEvent event = (WriteMetadataEvent)operatorEvent;
        if (event.isEndInput()) {
            this.executor.executeSync((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.handleEndInputEvent(event)), "handle end input event for instant %s", this.instant);
        } else {
            this.executor.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
                if (event.isBootstrap()) {
                    this.handleBootstrapEvent(event);
                } else {
                    this.handleWriteMetaEvent(event);
                }
            }), "handle write metadata event for instant %s", this.instant);
        }
    }

    @Override
    public void subtaskFailed(int i, @Nullable Throwable throwable) {
    }

    public void subtaskReset(int i, long l) {
    }

    @Override
    public void subtaskReady(int i, OperatorCoordinator.SubtaskGateway subtaskGateway) {
        this.gateways[i] = subtaskGateway;
    }

    private void initHiveSync() {
        this.hiveSyncExecutor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
        this.hiveSyncContext = HiveSyncContext.create(this.conf, this.storageConf);
    }

    private void syncHiveAsync() {
        if (this.tableState.syncHive) {
            this.hiveSyncExecutor.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)this::doSyncHive), "sync hive metadata for instant %s", this.instant);
        }
    }

    private void syncHive() {
        if (this.tableState.syncHive) {
            this.doSyncHive();
            LOG.info("Sync hive metadata for instant {} success!", (Object)this.instant);
        }
    }

    public void doSyncHive() {
        try (HiveSyncTool syncTool = this.hiveSyncContext.hiveSyncTool();){
            syncTool.syncHoodieTable();
        }
    }

    private static void initMetadataTable(HoodieFlinkWriteClient<?> writeClient) {
        writeClient.initMetadataTable();
    }

    private static CkpMetadata initCkpMetadata(HoodieWriteConfig writeConfig, Configuration conf) throws IOException {
        CkpMetadata ckpMetadata = CkpMetadataFactory.getCkpMetadata(writeConfig, conf);
        ckpMetadata.bootstrap();
        return ckpMetadata;
    }

    private void initClientIds(Configuration conf) {
        this.clientIds = ClientIds.builder().conf(conf).build();
        this.clientIds.start();
    }

    private void reset() {
        this.eventBuffer = new WriteMetadataEvent[this.parallelism];
    }

    private boolean allEventsReceived() {
        return Arrays.stream(this.eventBuffer).allMatch(event -> event != null && event.isLastBatch());
    }

    private void addEventToBuffer(WriteMetadataEvent event) {
        if (this.eventBuffer[event.getTaskID()] != null && this.eventBuffer[event.getTaskID()].getInstantTime().equals(event.getInstantTime())) {
            this.eventBuffer[event.getTaskID()].mergeWith(event);
        } else {
            this.eventBuffer[event.getTaskID()] = event;
        }
    }

    private void startInstant() {
        this.metaClient.reloadActiveTimeline();
        this.writeClient.preTxn(this.tableState.operationType, this.metaClient);
        this.instant = this.writeClient.startCommit(this.tableState.commitAction, this.metaClient);
        this.metaClient.getActiveTimeline().transitionRequestedToInflight(this.tableState.commitAction, this.instant);
        this.writeClient.setWriteTimer(this.tableState.commitAction);
        this.ckpMetadata.startInstant(this.instant);
        LOG.info("Create instant [{}] for table [{}] with type [{}]", new Object[]{this.instant, this.conf.getString(FlinkOptions.TABLE_NAME), this.conf.getString(FlinkOptions.TABLE_TYPE)});
    }

    private void initInstant(String instant) {
        HoodieTimeline completedTimeline = this.metaClient.getActiveTimeline().filterCompletedInstants();
        if (instant.equals("") || completedTimeline.containsInstant(instant)) {
            this.reset();
        } else {
            LOG.info("Recommit instant {}", (Object)instant);
            if (this.writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) {
                this.writeClient.getHeartbeatClient().start(instant);
            }
            this.commitInstant(instant);
        }
        if (this.writeClient.getConfig().getFailedWritesCleanPolicy().isLazy() && !"".equals(this.instant)) {
            this.writeClient.getHeartbeatClient().stop(this.instant);
        }
        this.startInstant();
        this.writeClient.upgradeDowngrade(this.instant, this.metaClient);
    }

    private void handleBootstrapEvent(WriteMetadataEvent event) {
        this.eventBuffer[event.getTaskID()] = event;
        if (Arrays.stream(this.eventBuffer).allMatch(evt -> evt != null && evt.isBootstrap())) {
            String instant = Arrays.stream(this.eventBuffer).filter(evt -> evt.getWriteStatuses().size() > 0).findFirst().map(WriteMetadataEvent::getInstantTime).orElse("");
            if (this.metaClient.reloadActiveTimeline().filterInflightsAndRequested().containsInstant(this.instant) && instant.equals("") && this.tableState.operationType == WriteOperationType.INSERT) {
                LOG.warn("Reuse current pending Instant {} with {} operationType, ignoring empty bootstrap event.", (Object)this.instant, (Object)WriteOperationType.INSERT.value());
                this.reset();
                this.sendCommitAckEvents(-1L);
                return;
            }
            this.initInstant(instant);
        }
    }

    private void handleEndInputEvent(WriteMetadataEvent event) {
        boolean committed;
        this.addEventToBuffer(event);
        if (this.allEventsReceived() && (committed = this.commitInstant(this.instant))) {
            Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
            this.syncHive();
            this.scheduleTableServices(true);
        }
    }

    private void scheduleTableServices(Boolean committed) {
        if (this.tableState.scheduleCompaction) {
            CompactionUtil.scheduleCompaction(this.writeClient, this.tableState.isDeltaTimeCompaction, committed);
        }
        if (this.tableState.scheduleClustering) {
            ClusteringUtil.scheduleClustering(this.conf, this.writeClient, committed);
        }
    }

    private void handleWriteMetaEvent(WriteMetadataEvent event) {
        ValidationUtils.checkState(InstantComparison.compareTimestamps(this.instant, InstantComparison.GREATER_THAN_OR_EQUALS, event.getInstantTime()), String.format("Receive an unexpected event for instant %s from task %d", event.getInstantTime(), event.getTaskID()));
        this.addEventToBuffer(event);
    }

    private void sendCommitAckEvents(long checkpointId) {
        CompletableFuture[] futures = (CompletableFuture[])Arrays.stream(this.gateways).filter(Objects::nonNull).map(gw -> gw.sendEvent((OperatorEvent)CommitAckEvent.getInstance(checkpointId))).toArray(CompletableFuture[]::new);
        CompletableFuture.allOf(futures).whenComplete((resp, error) -> {
            if (!StreamWriteOperatorCoordinator.sendToFinishedTasks(error)) {
                throw new HoodieException("Error while waiting for the commit ack events to finish sending", (Throwable)error);
            }
        });
    }

    private static boolean sendToFinishedTasks(Throwable throwable) {
        return throwable.getCause() instanceof TaskNotRunningException || throwable.getCause().getMessage().contains("running");
    }

    private boolean commitInstant(String instant) {
        return this.commitInstant(instant, -1L);
    }

    private boolean commitInstant(String instant, long checkpointId) {
        if (Arrays.stream(this.eventBuffer).allMatch(Objects::isNull)) {
            return false;
        }
        List<WriteStatus> writeResults = Arrays.stream(this.eventBuffer).filter(Objects::nonNull).map(WriteMetadataEvent::getWriteStatuses).flatMap(Collection::stream).collect(Collectors.toList());
        if (writeResults.size() == 0 && !OptionsResolver.allowCommitOnEmptyBatch(this.conf)) {
            this.reset();
            if (checkpointId != -1L) {
                this.sendCommitAckEvents(checkpointId);
            }
            return false;
        }
        this.doCommit(instant, writeResults);
        return true;
    }

    private void doCommit(String instant, List<WriteStatus> writeResults) {
        boolean hasErrors;
        long totalErrorRecords = writeResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
        long totalRecords = writeResults.stream().map(WriteStatus::getTotalRecords).reduce(Long::sum).orElse(0L);
        boolean bl = hasErrors = totalErrorRecords > 0L;
        if (!hasErrors || this.conf.getBoolean(FlinkOptions.IGNORE_FAILED)) {
            HashMap checkpointCommitMetadata = new HashMap();
            if (hasErrors) {
                LOG.warn("Some records failed to merge but forcing commit since commitOnErrors set to true. Errors/Total=" + totalErrorRecords + "/" + totalRecords);
            }
            Map<String, List<String>> partitionToReplacedFileIds = this.tableState.isOverwrite ? this.writeClient.getPartitionToReplacedFileIds(this.tableState.operationType, writeResults) : Collections.emptyMap();
            boolean success = this.writeClient.commit(instant, writeResults, Option.of(checkpointCommitMetadata), this.tableState.commitAction, partitionToReplacedFileIds);
            if (!success) {
                throw new HoodieException(String.format("Commit instant [%s] failed!", instant));
            }
        } else {
            LOG.error("Error when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords);
            LOG.error("The first 10 files with write errors:");
            writeResults.stream().filter(WriteStatus::hasErrors).limit(10L).forEach(ws -> {
                if (ws.getGlobalError() != null) {
                    LOG.error("Global error for partition path {} and fileID {}: {}", new Object[]{ws.getPartitionPath(), ws.getFileId(), ws.getGlobalError()});
                }
                if (ws.getErrors().size() > 0) {
                    LOG.error("The first 100 records-level errors for partition path {} and fileID {}:", (Object)ws.getPartitionPath(), (Object)ws.getFileId());
                    ws.getErrors().entrySet().stream().limit(100L).forEach(entry -> LOG.error("Error for key: " + entry.getKey() + " and Exception: " + ((Throwable)entry.getValue()).getMessage()));
                }
            });
            this.writeClient.rollback(instant);
            throw new HoodieException(String.format("Commit instant [%s] failed and rolled back !", instant));
        }
        this.reset();
        this.ckpMetadata.commitInstant(instant);
        LOG.info("Commit instant [{}] success!", (Object)instant);
    }

    @VisibleForTesting
    public WriteMetadataEvent[] getEventBuffer() {
        return this.eventBuffer;
    }

    @VisibleForTesting
    public String getInstant() {
        return this.instant;
    }

    @VisibleForTesting
    public OperatorCoordinator.Context getContext() {
        return this.context;
    }

    @VisibleForTesting
    public HoodieFlinkWriteClient getWriteClient() {
        return this.writeClient;
    }

    @VisibleForTesting
    public void setExecutor(NonThrownExecutor executor) throws Exception {
        if (this.executor != null) {
            this.executor.close();
        }
        this.executor = executor;
    }

    private static class TableState
    implements Serializable {
        private static final long serialVersionUID = 1L;
        final WriteOperationType operationType;
        final String commitAction;
        final boolean isOverwrite;
        final boolean scheduleCompaction;
        final boolean scheduleClustering;
        final boolean syncHive;
        final boolean syncMetadata;
        final boolean isDeltaTimeCompaction;

        private TableState(Configuration conf) {
            this.operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
            this.commitAction = CommitUtils.getCommitActionType(this.operationType, HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE).toUpperCase(Locale.ROOT)));
            this.isOverwrite = WriteOperationType.isOverwrite(this.operationType);
            this.scheduleCompaction = OptionsResolver.needsScheduleCompaction(conf);
            this.scheduleClustering = OptionsResolver.needsScheduleClustering(conf);
            this.syncHive = conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED);
            this.syncMetadata = conf.getBoolean(FlinkOptions.METADATA_ENABLED);
            this.isDeltaTimeCompaction = OptionsResolver.isDeltaTimeCompaction(conf);
        }

        public static TableState create(Configuration conf) {
            return new TableState(conf);
        }
    }

    public static class Provider
    implements OperatorCoordinator.Provider {
        private final OperatorID operatorId;
        private final Configuration conf;

        public Provider(OperatorID operatorId, Configuration conf) {
            this.operatorId = operatorId;
            this.conf = conf;
        }

        public OperatorID getOperatorId() {
            return this.operatorId;
        }

        public OperatorCoordinator create(OperatorCoordinator.Context context) {
            return new StreamWriteOperatorCoordinator(this.conf, context);
        }
    }
}

