/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.connect.writers;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.client.HoodieJavaWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.connect.utils.KafkaConnectUtils;
import org.apache.hudi.connect.writers.ConnectTransactionServices;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.sync.common.util.SyncUtilHelpers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConnectTransactionServices
implements ConnectTransactionServices {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConnectTransactionServices.class);
    private final KafkaConnectConfigs connectConfigs;
    private final Option<HoodieTableMetaClient> tableMetaClient;
    private final StorageConfiguration<Configuration> storageConf;
    private final HoodieWriteConfig writeConfig;
    private final String tableBasePath;
    private final String tableName;
    private final HoodieEngineContext context;
    private final HoodieJavaWriteClient<HoodieAvroPayload> javaClient;

    public KafkaConnectTransactionServices(KafkaConnectConfigs connectConfigs) throws HoodieException {
        this.connectConfigs = connectConfigs;
        this.writeConfig = HoodieWriteConfig.newBuilder().withEngineType(EngineType.JAVA).withProperties(connectConfigs.getProps()).build();
        this.tableBasePath = this.writeConfig.getBasePath();
        this.tableName = this.writeConfig.getTableName();
        this.storageConf = KafkaConnectUtils.getDefaultStorageConf(connectConfigs);
        this.context = new HoodieJavaEngineContext(this.storageConf);
        try {
            KeyGenerator keyGenerator = HoodieAvroKeyGeneratorFactory.createAvroKeyGeneratorByType(connectConfigs.getProps());
            String recordKeyFields = KafkaConnectUtils.getRecordKeyColumns(keyGenerator);
            String partitionColumns = KafkaConnectUtils.getPartitionColumnsForKeyGenerator(keyGenerator, connectConfigs.getProps());
            LOG.info(String.format("Setting record key %s and partition fields %s for table %s", recordKeyFields, partitionColumns, this.tableBasePath + this.tableName));
            this.tableMetaClient = Option.of(HoodieTableMetaClient.newTableBuilder().setTableType(HoodieTableType.COPY_ON_WRITE.name()).setTableName(this.tableName).setPayloadClassName(HoodieAvroPayload.class.getName()).setRecordKeyFields(recordKeyFields).setPartitionFields(partitionColumns).setTableVersion(this.writeConfig.getWriteVersion()).setKeyGeneratorClassProp(this.writeConfig.getKeyGeneratorClass()).fromProperties(connectConfigs.getProps()).initTable(this.storageConf.newInstance(), this.tableBasePath));
            this.javaClient = new HoodieJavaWriteClient(this.context, this.writeConfig);
        }
        catch (Exception exception) {
            throw new HoodieException("Fatal error instantiating Hudi Transaction Services ", exception);
        }
    }

    @Override
    public String startCommit() {
        String newCommitTime = this.javaClient.startCommit();
        this.javaClient.transitionInflight(newCommitTime);
        LOG.info("Starting Hudi commit " + newCommitTime);
        return newCommitTime;
    }

    @Override
    public boolean endCommit(String commitTime, List<WriteStatus> writeStatuses, Map<String, String> extraMetadata) {
        boolean success = this.javaClient.commit(commitTime, writeStatuses, Option.of(extraMetadata));
        if (success) {
            LOG.info("Ending Hudi commit " + commitTime);
            if (this.writeConfig.isAsyncClusteringEnabled()) {
                this.javaClient.scheduleClustering(Option.empty()).ifPresent(instantTs -> LOG.info("Scheduled clustering at instant time:" + instantTs));
            }
            if (this.isAsyncCompactionEnabled()) {
                this.javaClient.scheduleCompaction(Option.empty()).ifPresent(instantTs -> LOG.info("Scheduled compaction at instant time:" + instantTs));
            }
            this.syncMeta();
        }
        return success;
    }

    @Override
    public Map<String, String> fetchLatestExtraCommitMetadata() {
        if (this.tableMetaClient.isPresent()) {
            Option<HoodieCommitMetadata> metadata2 = KafkaConnectUtils.getCommitMetadataForLatestInstant(this.tableMetaClient.get());
            if (metadata2.isPresent()) {
                return metadata2.get().getExtraMetadata();
            }
            LOG.info("Hoodie Extra Metadata from latest commit is absent");
            return Collections.emptyMap();
        }
        throw new HoodieException("Fatal error retrieving Hoodie Extra Metadata since Table Meta Client is absent");
    }

    private boolean isAsyncCompactionEnabled() {
        return this.tableMetaClient.isPresent() && HoodieTableType.MERGE_ON_READ.equals((Object)this.tableMetaClient.get().getTableType()) && this.connectConfigs.isAsyncCompactEnabled() != false;
    }

    private void syncMeta() {
        if (this.connectConfigs.isMetaSyncEnabled().booleanValue()) {
            HashSet<String> syncClientToolClasses = new HashSet<String>(Arrays.asList(this.connectConfigs.getMetaSyncClasses().split(",")));
            FileSystem fs = HadoopFSUtils.getFs(this.tableBasePath, new Configuration());
            for (String impl : syncClientToolClasses) {
                String baseFileFormat = this.connectConfigs.getStringOrDefault(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT);
                SyncUtilHelpers.runHoodieMetaSync(impl.trim(), this.connectConfigs.getProps(), this.storageConf.unwrap(), fs, this.tableBasePath, baseFileFormat);
            }
        }
    }
}

