package org.apache.hudi.sink;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.Preconditions;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
import org.apache.hudi.util.StreamerUtil;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/sink/StreamWriteOperatorCoordinator.class */
public class StreamWriteOperatorCoordinator implements OperatorCoordinator {
    private static final Logger LOG;
    private final Configuration conf;
    private transient HoodieFlinkWriteClient writeClient;
    private long inFlightCheckpoint = -1;
    private String instant = "";
    private transient BatchWriteSuccessEvent[] eventBuffer;
    private final int parallelism;
    private final boolean needsScheduleCompaction;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/hudi/sink/StreamWriteOperatorCoordinator$Provider.class */
    public static class Provider implements OperatorCoordinator.Provider {
        private final OperatorID operatorId;
        private final Configuration conf;

        public Provider(OperatorID operatorID, Configuration configuration) {
            this.operatorId = operatorID;
            this.conf = configuration;
        }

        public OperatorID getOperatorId() {
            return this.operatorId;
        }

        public OperatorCoordinator create(OperatorCoordinator.Context context) {
            return new StreamWriteOperatorCoordinator(this.conf, context.currentParallelism());
        }
    }

    public StreamWriteOperatorCoordinator(Configuration configuration, int i) {
        this.conf = configuration;
        this.parallelism = i;
        this.needsScheduleCompaction = StreamerUtil.needsScheduleCompaction(configuration);
    }

    public void start() throws Exception {
        reset();
        initWriteClient();
        StreamerUtil.initTableIfNotExists(this.conf);
        startInstant();
    }

    public void close() {
        if (this.writeClient != null) {
            this.writeClient.close();
        }
        this.eventBuffer = null;
    }

    public void checkpointCoordinator(long j, CompletableFuture<byte[]> completableFuture) {
        try {
            this.inFlightCheckpoint = j;
            completableFuture.complete(writeCheckpointBytes());
        } catch (Throwable th) {
            completableFuture.completeExceptionally(new CompletionException(String.format("Failed to checkpoint Instant %s for source %s", this.instant, getClass().getSimpleName()), th));
        }
    }

    public void checkpointComplete(long j) {
        checkAndCommitWithRetry();
        if (this.needsScheduleCompaction) {
            this.writeClient.scheduleCompaction(Option.empty());
        }
        startInstant();
    }

    private void startInstant() {
        this.instant = this.writeClient.startCommit();
        this.writeClient.transitionRequestedToInflight(this.conf.getString(FlinkOptions.TABLE_TYPE), this.instant);
        LOG.info("Create instant [{}] for table [{}] with type [{}]", new Object[]{this.instant, this.conf.getString(FlinkOptions.TABLE_NAME), this.conf.getString(FlinkOptions.TABLE_TYPE)});
    }

    public void notifyCheckpointAborted(long j) {
        Preconditions.checkState(this.inFlightCheckpoint == j, "The aborted checkpoint should always be the last checkpoint");
        checkAndForceCommit("The last checkpoint was aborted, roll back the last write and throw");
    }

    public void resetToCheckpoint(@Nullable byte[] bArr) throws Exception {
        if (bArr != null) {
            deserializeCheckpointAndRestore(bArr);
        }
    }

    public void handleEventFromOperator(int i, OperatorEvent operatorEvent) {
        Preconditions.checkState(operatorEvent instanceof BatchWriteSuccessEvent, "The coordinator can only handle BatchWriteSuccessEvent");
        BatchWriteSuccessEvent batchWriteSuccessEvent = (BatchWriteSuccessEvent) operatorEvent;
        Preconditions.checkState(batchWriteSuccessEvent.getInstantTime().equals(this.instant), String.format("Receive an unexpected event for instant %s from task %d", batchWriteSuccessEvent.getInstantTime(), Integer.valueOf(batchWriteSuccessEvent.getTaskID())));
        if (this.eventBuffer[batchWriteSuccessEvent.getTaskID()] != null) {
            this.eventBuffer[batchWriteSuccessEvent.getTaskID()].mergeWith(batchWriteSuccessEvent);
        } else {
            this.eventBuffer[batchWriteSuccessEvent.getTaskID()] = batchWriteSuccessEvent;
        }
        if (batchWriteSuccessEvent.isEndInput() && checkReady()) {
            doCommit();
        }
    }

    public void subtaskFailed(int i, @Nullable Throwable th) {
    }

    private void initWriteClient() {
        this.writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(new FlinkTaskContextSupplier((RuntimeContext) null)), StreamerUtil.getHoodieClientConfig(this.conf), true);
    }

    static byte[] readBytes(DataInputStream dataInputStream, int i) throws IOException {
        byte[] bArr = new byte[i];
        dataInputStream.readFully(bArr);
        return bArr;
    }

    private byte[] writeCheckpointBytes() throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Throwable th = null;
        try {
            DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(byteArrayOutputStream);
            Throwable th2 = null;
            try {
                try {
                    dataOutputViewStreamWrapper.writeLong(this.inFlightCheckpoint);
                    byte[] bytes = this.instant.getBytes();
                    dataOutputViewStreamWrapper.writeInt(bytes.length);
                    dataOutputViewStreamWrapper.write(bytes);
                    dataOutputViewStreamWrapper.flush();
                    byte[] byteArray = byteArrayOutputStream.toByteArray();
                    if (dataOutputViewStreamWrapper != null) {
                        if (0 != 0) {
                            try {
                                dataOutputViewStreamWrapper.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            dataOutputViewStreamWrapper.close();
                        }
                    }
                    return byteArray;
                } finally {
                }
            } catch (Throwable th4) {
                if (dataOutputViewStreamWrapper != null) {
                    if (th2 != null) {
                        try {
                            dataOutputViewStreamWrapper.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        dataOutputViewStreamWrapper.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (byteArrayOutputStream != null) {
                if (0 != 0) {
                    try {
                        byteArrayOutputStream.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    byteArrayOutputStream.close();
                }
            }
        }
    }

    private void deserializeCheckpointAndRestore(byte[] bArr) throws Exception {
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bArr);
        Throwable th = null;
        try {
            DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(byteArrayInputStream);
            Throwable th2 = null;
            try {
                long readLong = dataInputViewStreamWrapper.readLong();
                byte[] readBytes = readBytes(dataInputViewStreamWrapper, dataInputViewStreamWrapper.readInt());
                this.inFlightCheckpoint = readLong;
                this.instant = new String(readBytes);
                if (dataInputViewStreamWrapper != null) {
                    if (0 != 0) {
                        try {
                            dataInputViewStreamWrapper.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        dataInputViewStreamWrapper.close();
                    }
                }
                if (byteArrayInputStream != null) {
                    if (0 == 0) {
                        byteArrayInputStream.close();
                        return;
                    }
                    try {
                        byteArrayInputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                }
            } catch (Throwable th5) {
                if (dataInputViewStreamWrapper != null) {
                    if (0 != 0) {
                        try {
                            dataInputViewStreamWrapper.close();
                        } catch (Throwable th6) {
                            th2.addSuppressed(th6);
                        }
                    } else {
                        dataInputViewStreamWrapper.close();
                    }
                }
                throw th5;
            }
        } catch (Throwable th7) {
            if (byteArrayInputStream != null) {
                if (0 != 0) {
                    try {
                        byteArrayInputStream.close();
                    } catch (Throwable th8) {
                        th.addSuppressed(th8);
                    }
                } else {
                    byteArrayInputStream.close();
                }
            }
            throw th7;
        }
    }

    private void reset() {
        this.instant = "";
        this.eventBuffer = new BatchWriteSuccessEvent[this.parallelism];
    }

    private void checkAndForceCommit(String str) {
        if (!checkReady()) {
            String inflightAndRequestedInstant = this.writeClient.getInflightAndRequestedInstant(this.conf.getString(FlinkOptions.TABLE_TYPE));
            if (inflightAndRequestedInstant != null) {
                if (!$assertionsDisabled && !inflightAndRequestedInstant.equals(this.instant)) {
                    throw new AssertionError();
                }
                this.writeClient.rollback(this.instant);
                throw new HoodieException(str);
            }
            if (Arrays.stream(this.eventBuffer).allMatch((v0) -> {
                return Objects.isNull(v0);
            })) {
                return;
            }
        }
        doCommit();
    }

    private void checkAndCommitWithRetry() {
        int i;
        HoodieException hoodieException;
        int integer = this.conf.getInteger(FlinkOptions.RETRY_TIMES);
        if (integer < 0) {
            integer = 1;
        }
        long j = this.conf.getLong(FlinkOptions.RETRY_INTERVAL_MS);
        int i2 = 0;
        while (true) {
            int i3 = i2;
            i2++;
            if (i3 >= integer) {
                return;
            }
            try {
            } finally {
                if (i2 == i) {
                }
            }
            if (checkReady()) {
                doCommit();
                return;
            } else {
                if (i2 == integer) {
                    throw new HoodieException("Try " + integer + " to commit instant [" + this.instant + "] failed");
                }
                sleepFor(j);
            }
        }
    }

    private void sleepFor(long j) {
        try {
            TimeUnit.MILLISECONDS.sleep(j);
        } catch (InterruptedException e) {
            LOG.error("Thread interrupted while waiting to retry the instant commits");
            throw new HoodieException(e);
        }
    }

    private boolean checkReady() {
        return Arrays.stream(this.eventBuffer).allMatch(batchWriteSuccessEvent -> {
            return batchWriteSuccessEvent != null && batchWriteSuccessEvent.isReady(this.instant);
        });
    }

    private void doCommit() {
        List list = (List) Arrays.stream(this.eventBuffer).filter((v0) -> {
            return Objects.nonNull(v0);
        }).map((v0) -> {
            return v0.getWriteStatuses();
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
        if (list.size() == 0) {
            this.writeClient.deletePendingInstant(this.conf.getString(FlinkOptions.TABLE_TYPE), this.instant);
            reset();
            return;
        }
        long longValue = ((Long) list.stream().map((v0) -> {
            return v0.getTotalErrorRecords();
        }).reduce((v0, v1) -> {
            return Long.sum(v0, v1);
        }).orElse(0L)).longValue();
        long longValue2 = ((Long) list.stream().map((v0) -> {
            return v0.getTotalRecords();
        }).reduce((v0, v1) -> {
            return Long.sum(v0, v1);
        }).orElse(0L)).longValue();
        boolean z = longValue > 0;
        if (z && !this.conf.getBoolean(FlinkOptions.IGNORE_FAILED)) {
            LOG.error("Error when writing. Errors/Total=" + longValue + "/" + longValue2);
            LOG.error("The first 100 error messages");
            list.stream().filter((v0) -> {
                return v0.hasErrors();
            }).limit(100L).forEach(writeStatus -> {
                LOG.error("Global error for partition path {} and fileID {}: {}", new Object[]{writeStatus.getGlobalError(), writeStatus.getPartitionPath(), writeStatus.getFileId()});
                if (writeStatus.getErrors().size() > 0) {
                    writeStatus.getErrors().forEach((hoodieKey, th) -> {
                        LOG.trace("Error for key:" + hoodieKey + " and value " + th);
                    });
                }
            });
            this.writeClient.rollback(this.instant);
            throw new HoodieException(String.format("Commit instant [%s] failed and rolled back !", this.instant));
        }
        HashMap hashMap = new HashMap();
        if (z) {
            LOG.warn("Some records failed to merge but forcing commit since commitOnErrors set to true. Errors/Total=" + longValue + "/" + longValue2);
        }
        if (!this.writeClient.commit(this.instant, list, Option.of(hashMap))) {
            throw new HoodieException(String.format("Commit instant [%s] failed!", this.instant));
        }
        reset();
        LOG.info("Commit instant [{}] success!", this.instant);
    }

    @VisibleForTesting
    public BatchWriteSuccessEvent[] getEventBuffer() {
        return this.eventBuffer;
    }

    @VisibleForTesting
    public String getInstant() {
        return this.instant;
    }

    @VisibleForTesting
    public HoodieFlinkWriteClient getWriteClient() {
        return this.writeClient;
    }

    static {
        $assertionsDisabled = !StreamWriteOperatorCoordinator.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(StreamWriteOperatorCoordinator.class);
    }
}
