/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.connect;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.hudi.connect.kafka.KafkaConnectControlAgent;
import org.apache.hudi.connect.transaction.ConnectTransactionCoordinator;
import org.apache.hudi.connect.transaction.ConnectTransactionParticipant;
import org.apache.hudi.connect.transaction.TransactionCoordinator;
import org.apache.hudi.connect.transaction.TransactionParticipant;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public class HoodieSinkTask
extends SinkTask {
    public static final String TASK_ID_CONFIG_NAME = "task.id";
    private static final Logger LOG = LogManager.getLogger(HoodieSinkTask.class);
    private final Map<TopicPartition, TransactionCoordinator> transactionCoordinators = new HashMap<TopicPartition, TransactionCoordinator>();
    private final Map<TopicPartition, TransactionParticipant> transactionParticipants = new HashMap<TopicPartition, TransactionParticipant>();
    private KafkaConnectControlAgent controlKafkaClient;
    private KafkaConnectConfigs connectConfigs;
    private String taskId;
    private String connectorName;

    public String version() {
        return "0.1.0";
    }

    public void start(Map<String, String> props) {
        this.connectorName = props.get("name");
        this.taskId = props.get(TASK_ID_CONFIG_NAME);
        LOG.info((Object)String.format("Starting Hudi Sink Task for %s connector %s with id %s with assignments %s", props, this.connectorName, this.taskId, this.context.assignment()));
        try {
            this.connectConfigs = KafkaConnectConfigs.newBuilder().withProperties(props).build();
            this.controlKafkaClient = KafkaConnectControlAgent.createKafkaControlManager(this.connectConfigs.getBootstrapServers(), this.connectConfigs.getControlTopicName());
        }
        catch (ConfigException e) {
            throw new ConnectException("Couldn't start HdfsSinkConnector due to configuration error.", (Throwable)e);
        }
        catch (ConnectException e) {
            LOG.error((Object)"Couldn't start HudiSinkConnector:", (Throwable)e);
            LOG.info((Object)"Shutting down HudiSinkConnector.");
            this.cleanup();
            throw e;
        }
    }

    public void put(Collection<SinkRecord> records) {
        for (SinkRecord record : records) {
            int partition;
            String topic = record.topic();
            TopicPartition tp = new TopicPartition(topic, partition = record.kafkaPartition().intValue());
            TransactionParticipant transactionParticipant = this.transactionParticipants.get(tp);
            if (transactionParticipant == null) continue;
            transactionParticipant.buffer(record);
        }
        for (TopicPartition partition : this.context.assignment()) {
            if (this.transactionParticipants.get(partition) == null) {
                throw new RetriableException("TransactionParticipant should be created for each assigned partition, but has not been created for the topic/partition: " + partition.topic() + ":" + partition.partition());
            }
            try {
                this.transactionParticipants.get(partition).processRecords();
            }
            catch (HoodieIOException exception) {
                throw new RetriableException("Intermittent write errors for Hudi  for the topic/partition: " + partition.topic() + ":" + partition.partition() + " , ensuring kafka connect will retry ", (Throwable)exception);
            }
        }
    }

    public void stop() {
        this.cleanup();
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
    }

    public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
        HashMap<TopicPartition, OffsetAndMetadata> result = new HashMap<TopicPartition, OffsetAndMetadata>();
        for (TopicPartition partition : this.context.assignment()) {
            TransactionParticipant worker = this.transactionParticipants.get(partition);
            if (worker == null || worker.getLastKafkaCommittedOffset() < 0L) continue;
            result.put(partition, new OffsetAndMetadata(worker.getLastKafkaCommittedOffset()));
        }
        return result;
    }

    public void open(Collection<TopicPartition> partitions) {
        LOG.info((Object)("New partitions added " + partitions.toString()));
        this.bootstrap(partitions);
    }

    public void close(Collection<TopicPartition> partitions) {
        LOG.info((Object)("Existing partitions deleted " + partitions.toString()));
        for (TopicPartition partition : partitions) {
            TransactionParticipant worker;
            if (partition.partition() == 0 && this.transactionCoordinators.containsKey(partition)) {
                this.transactionCoordinators.get(partition).stop();
                this.transactionCoordinators.remove(partition);
            }
            if ((worker = this.transactionParticipants.remove(partition)) == null) continue;
            try {
                LOG.debug((Object)"Closing data writer due to task start failure.");
                worker.stop();
            }
            catch (Throwable t) {
                LOG.debug((Object)String.format("Error closing and stopping data writer: %s", t.getMessage()), t);
            }
        }
    }

    private void bootstrap(Collection<TopicPartition> partitions) {
        LOG.info((Object)String.format("Bootstrap task for connector %s with id %s with assignments %s part %s", this.connectorName, this.taskId, this.context.assignment(), partitions));
        for (TopicPartition partition : partitions) {
            try {
                if (partition.partition() == 0) {
                    ConnectTransactionCoordinator coordinator = new ConnectTransactionCoordinator(this.connectConfigs, partition, this.controlKafkaClient);
                    coordinator.start();
                    this.transactionCoordinators.put(partition, coordinator);
                }
                ConnectTransactionParticipant worker = new ConnectTransactionParticipant(this.connectConfigs, partition, this.controlKafkaClient, this.context);
                this.transactionParticipants.put(partition, worker);
                worker.start();
            }
            catch (HoodieException exception) {
                LOG.error((Object)String.format("Fatal error initializing task %s for partition %s", this.taskId, partition.partition()), (Throwable)exception);
            }
        }
    }

    private void cleanup() {
        for (TopicPartition partition : this.context.assignment()) {
            TransactionParticipant worker = this.transactionParticipants.get(partition);
            if (worker == null) continue;
            try {
                LOG.debug((Object)"Closing data writer due to task start failure.");
                worker.stop();
            }
            catch (Throwable t) {
                LOG.debug((Object)"Error closing and stopping data writer", t);
            }
        }
        this.transactionParticipants.clear();
        this.transactionCoordinators.forEach((topic, transactionCoordinator) -> transactionCoordinator.stop());
        this.transactionCoordinators.clear();
    }
}

