package org.apache.hudi.connect;

import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.hudi.connect.kafka.KafkaConnectControlAgent;
import org.apache.hudi.connect.transaction.ConnectTransactionCoordinator;
import org.apache.hudi.connect.transaction.ConnectTransactionParticipant;
import org.apache.hudi.connect.transaction.TransactionCoordinator;
import org.apache.hudi.connect.transaction.TransactionParticipant;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/connect/HoodieSinkTask.class */
public class HoodieSinkTask extends SinkTask {
    public static final String TASK_ID_CONFIG_NAME = "task.id";
    private static final Logger LOG = LoggerFactory.getLogger(HoodieSinkTask.class);
    private final Map<TopicPartition, TransactionCoordinator> transactionCoordinators = new HashMap();
    private final Map<TopicPartition, TransactionParticipant> transactionParticipants = new HashMap();
    private KafkaConnectControlAgent controlKafkaClient;
    private KafkaConnectConfigs connectConfigs;
    private String taskId;
    private String connectorName;

    public String version() {
        return HoodieSinkConnector.VERSION;
    }

    public void start(Map<String, String> map) {
        this.connectorName = map.get("name");
        this.taskId = map.get(TASK_ID_CONFIG_NAME);
        LOG.info(String.format("Starting Hudi Sink Task for %s connector %s with id %s with assignments %s", map, this.connectorName, this.taskId, this.context.assignment()));
        try {
            this.connectConfigs = KafkaConnectConfigs.newBuilder().withProperties(map).build();
            this.controlKafkaClient = KafkaConnectControlAgent.createKafkaControlManager(this.connectConfigs.getBootstrapServers(), this.connectConfigs.getControlTopicName());
        } catch (ConfigException e) {
            throw new ConnectException("Couldn't start HdfsSinkConnector due to configuration error.", e);
        } catch (ConnectException e2) {
            LOG.error("Couldn't start HudiSinkConnector:", e2);
            LOG.info("Shutting down HudiSinkConnector.");
            cleanup();
            throw e2;
        }
    }

    public void put(Collection<SinkRecord> collection) {
        for (SinkRecord sinkRecord : collection) {
            TransactionParticipant transactionParticipant = this.transactionParticipants.get(new TopicPartition(sinkRecord.topic(), sinkRecord.kafkaPartition().intValue()));
            if (transactionParticipant != null) {
                transactionParticipant.buffer(sinkRecord);
            }
        }
        for (TopicPartition topicPartition : this.context.assignment()) {
            if (this.transactionParticipants.get(topicPartition) == null) {
                throw new RetriableException("TransactionParticipant should be created for each assigned partition, but has not been created for the topic/partition: " + topicPartition.topic() + ":" + topicPartition.partition());
            }
            try {
                this.transactionParticipants.get(topicPartition).processRecords();
            } catch (HoodieIOException e) {
                throw new RetriableException("Intermittent write errors for Hudi  for the topic/partition: " + topicPartition.topic() + ":" + topicPartition.partition() + " , ensuring kafka connect will retry ", e);
            }
        }
    }

    public void stop() {
        cleanup();
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
    }

    public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> map) {
        HashMap hashMap = new HashMap();
        for (TopicPartition topicPartition : this.context.assignment()) {
            TransactionParticipant transactionParticipant = this.transactionParticipants.get(topicPartition);
            if (transactionParticipant != null && transactionParticipant.getLastKafkaCommittedOffset() >= 0) {
                hashMap.put(topicPartition, new OffsetAndMetadata(transactionParticipant.getLastKafkaCommittedOffset()));
            }
        }
        return hashMap;
    }

    public void open(Collection<TopicPartition> collection) {
        LOG.info("New partitions added " + collection.toString());
        bootstrap(collection);
    }

    public void close(Collection<TopicPartition> collection) {
        LOG.info("Existing partitions deleted " + collection.toString());
        for (TopicPartition topicPartition : collection) {
            if (topicPartition.partition() == 0 && this.transactionCoordinators.containsKey(topicPartition)) {
                this.transactionCoordinators.get(topicPartition).stop();
                this.transactionCoordinators.remove(topicPartition);
            }
            TransactionParticipant remove = this.transactionParticipants.remove(topicPartition);
            if (remove != null) {
                try {
                    LOG.debug("Closing data writer due to task start failure.");
                    remove.stop();
                } catch (Throwable th) {
                    LOG.debug(String.format("Error closing and stopping data writer: %s", th.getMessage()), th);
                }
            }
        }
    }

    private void bootstrap(Collection<TopicPartition> collection) {
        LOG.info(String.format("Bootstrap task for connector %s with id %s with assignments %s part %s", this.connectorName, this.taskId, this.context.assignment(), collection));
        for (TopicPartition topicPartition : collection) {
            try {
                if (topicPartition.partition() == 0) {
                    ConnectTransactionCoordinator connectTransactionCoordinator = new ConnectTransactionCoordinator(this.connectConfigs, topicPartition, this.controlKafkaClient);
                    connectTransactionCoordinator.start();
                    this.transactionCoordinators.put(topicPartition, connectTransactionCoordinator);
                }
                ConnectTransactionParticipant connectTransactionParticipant = new ConnectTransactionParticipant(this.connectConfigs, topicPartition, this.controlKafkaClient, this.context);
                this.transactionParticipants.put(topicPartition, connectTransactionParticipant);
                connectTransactionParticipant.start();
            } catch (HoodieException e) {
                LOG.error(String.format("Fatal error initializing task %s for partition %s", this.taskId, Integer.valueOf(topicPartition.partition())), e);
            }
        }
    }

    private void cleanup() {
        Iterator it = this.context.assignment().iterator();
        while (it.hasNext()) {
            TransactionParticipant transactionParticipant = this.transactionParticipants.get((TopicPartition) it.next());
            if (transactionParticipant != null) {
                try {
                    LOG.debug("Closing data writer due to task start failure.");
                    transactionParticipant.stop();
                } catch (Throwable th) {
                    LOG.debug("Error closing and stopping data writer", th);
                }
            }
        }
        this.transactionParticipants.clear();
        this.transactionCoordinators.forEach((topicPartition, transactionCoordinator) -> {
            transactionCoordinator.stop();
        });
        this.transactionCoordinators.clear();
    }
}
