package org.apache.flink.connector.file.table.stream;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.table.EmptyMetaStoreFactory;
import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
import org.apache.flink.connector.file.table.FileSystemFactory;
import org.apache.flink.connector.file.table.MetastoreCommitPolicy;
import org.apache.flink.connector.file.table.PartitionCommitPolicy;
import org.apache.flink.connector.file.table.PartitionCommitPolicyFactory;
import org.apache.flink.connector.file.table.TableMetaStoreFactory;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.utils.PartitionPathUtils;

@Internal
/* loaded from: input_file:org/apache/flink/connector/file/table/stream/PartitionCommitter.class */
public class PartitionCommitter extends AbstractStreamOperator<Void> implements OneInputStreamOperator<PartitionCommitInfo, Void> {
    private static final long serialVersionUID = 1;
    private final Configuration conf;
    private final Path locationPath;
    private final ObjectIdentifier tableIdentifier;
    private final List<String> partitionKeys;
    private final TableMetaStoreFactory metaStoreFactory;
    private final FileSystemFactory fsFactory;
    private transient PartitionCommitTrigger trigger;
    private transient TaskTracker taskTracker;
    private transient long currentWatermark;
    private transient List<PartitionCommitPolicy> policies;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/file/table/stream/PartitionCommitter$CommitPolicyContextImpl.class */
    public class CommitPolicyContextImpl implements PartitionCommitPolicy.Context {
        private final List<String> partitionValues;
        private final Path partitionPath;

        private CommitPolicyContextImpl(List<String> list, Path path) {
            this.partitionValues = list;
            this.partitionPath = path;
        }

        @Override // org.apache.flink.connector.file.table.PartitionCommitPolicy.Context
        public String catalogName() {
            return PartitionCommitter.this.tableIdentifier.getCatalogName();
        }

        @Override // org.apache.flink.connector.file.table.PartitionCommitPolicy.Context
        public String databaseName() {
            return PartitionCommitter.this.tableIdentifier.getDatabaseName();
        }

        @Override // org.apache.flink.connector.file.table.PartitionCommitPolicy.Context
        public String tableName() {
            return PartitionCommitter.this.tableIdentifier.getObjectName();
        }

        @Override // org.apache.flink.connector.file.table.PartitionCommitPolicy.Context
        public List<String> partitionKeys() {
            return PartitionCommitter.this.partitionKeys;
        }

        @Override // org.apache.flink.connector.file.table.PartitionCommitPolicy.Context
        public List<String> partitionValues() {
            return this.partitionValues;
        }

        @Override // org.apache.flink.connector.file.table.PartitionCommitPolicy.Context
        public Path partitionPath() {
            return this.partitionPath;
        }
    }

    public PartitionCommitter(Path path, ObjectIdentifier objectIdentifier, List<String> list, TableMetaStoreFactory tableMetaStoreFactory, FileSystemFactory fileSystemFactory, Configuration configuration) {
        this.locationPath = path;
        this.tableIdentifier = objectIdentifier;
        this.partitionKeys = list;
        this.metaStoreFactory = tableMetaStoreFactory;
        this.fsFactory = fileSystemFactory;
        this.conf = configuration;
        PartitionCommitPolicy.validatePolicyChain(tableMetaStoreFactory instanceof EmptyMetaStoreFactory, (String) configuration.get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_KIND));
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.currentWatermark = Long.MIN_VALUE;
        this.trigger = PartitionCommitTrigger.create(stateInitializationContext.isRestored(), stateInitializationContext.getOperatorStateStore(), this.conf, getUserCodeClassloader(), this.partitionKeys, getProcessingTimeService());
        this.policies = new PartitionCommitPolicyFactory((String) this.conf.get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_KIND), (String) this.conf.get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_CLASS), (String) this.conf.get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME)).createPolicyChain(getUserCodeClassloader(), () -> {
            try {
                return this.fsFactory.create(this.locationPath.toUri());
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
    }

    public void processElement(StreamRecord<PartitionCommitInfo> streamRecord) throws Exception {
        PartitionCommitInfo partitionCommitInfo = (PartitionCommitInfo) streamRecord.getValue();
        Iterator<String> it = partitionCommitInfo.getPartitions().iterator();
        while (it.hasNext()) {
            this.trigger.addPartition(it.next());
        }
        if (this.taskTracker == null) {
            this.taskTracker = new TaskTracker(partitionCommitInfo.getNumberOfTasks());
        }
        if (this.taskTracker.add(partitionCommitInfo.getCheckpointId(), partitionCommitInfo.getTaskId())) {
            commitPartitions(partitionCommitInfo.getCheckpointId());
        }
    }

    private void commitPartitions(long j) throws Exception {
        List<String> endInput = j == Long.MAX_VALUE ? this.trigger.endInput() : this.trigger.committablePartitions(j);
        if (endInput.isEmpty()) {
            return;
        }
        TableMetaStoreFactory.TableMetaStore createTableMetaStore = this.metaStoreFactory.createTableMetaStore();
        Throwable th = null;
        try {
            try {
                Iterator<String> it = endInput.iterator();
                while (it.hasNext()) {
                    LinkedHashMap extractPartitionSpecFromPath = PartitionPathUtils.extractPartitionSpecFromPath(new Path(it.next()));
                    LOG.info("Partition {} of table {} is ready to be committed", extractPartitionSpecFromPath, this.tableIdentifier);
                    CommitPolicyContextImpl commitPolicyContextImpl = new CommitPolicyContextImpl(new ArrayList(extractPartitionSpecFromPath.values()), new Path(this.locationPath, PartitionPathUtils.generatePartitionPath(extractPartitionSpecFromPath)));
                    for (PartitionCommitPolicy partitionCommitPolicy : this.policies) {
                        if (partitionCommitPolicy instanceof MetastoreCommitPolicy) {
                            ((MetastoreCommitPolicy) partitionCommitPolicy).setMetastore(createTableMetaStore);
                        }
                        partitionCommitPolicy.commit(commitPolicyContextImpl);
                    }
                }
                if (createTableMetaStore != null) {
                    if (0 == 0) {
                        createTableMetaStore.close();
                        return;
                    }
                    try {
                        createTableMetaStore.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createTableMetaStore != null) {
                if (th != null) {
                    try {
                        createTableMetaStore.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createTableMetaStore.close();
                }
            }
            throw th4;
        }
    }

    public void processWatermark(Watermark watermark) throws Exception {
        super.processWatermark(watermark);
        this.currentWatermark = watermark.getTimestamp();
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        this.trigger.snapshotState(stateSnapshotContext.getCheckpointId(), this.currentWatermark);
    }
}
