/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.connect.writers;

import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.hudi.client.HoodieJavaWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.connect.writers.AbstractConnectWriter;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.schema.SchemaProvider;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public class BufferedConnectWriter
extends AbstractConnectWriter {
    private static final Logger LOG = LogManager.getLogger(BufferedConnectWriter.class);
    private final HoodieEngineContext context;
    private final HoodieJavaWriteClient writeClient;
    private final HoodieWriteConfig config;
    private ExternalSpillableMap<String, HoodieRecord<?>> bufferedRecords;

    public BufferedConnectWriter(HoodieEngineContext context, HoodieJavaWriteClient writeClient, String instantTime, KafkaConnectConfigs connectConfigs, HoodieWriteConfig config, KeyGenerator keyGenerator, SchemaProvider schemaProvider) {
        super(connectConfigs, keyGenerator, schemaProvider, instantTime);
        this.context = context;
        this.writeClient = writeClient;
        this.config = config;
        this.init();
    }

    private void init() {
        try {
            long memoryForMerge = IOUtils.getMaxMemoryPerPartitionMerge(this.context.getTaskContextSupplier(), this.config);
            LOG.info((Object)("MaxMemoryPerPartitionMerge => " + memoryForMerge));
            this.bufferedRecords = new ExternalSpillableMap(memoryForMerge, this.config.getSpillableMapBasePath(), new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(new Schema.Parser().parse(this.config.getSchema())), this.config.getCommonConfig().getSpillableDiskMapType(), this.config.getCommonConfig().isBitCaskDiskMapCompressionEnabled());
        }
        catch (IOException io) {
            throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io);
        }
    }

    @Override
    public void writeHudiRecord(HoodieRecord<?> record) {
        this.bufferedRecords.put(record.getRecordKey(), record);
    }

    @Override
    public List<WriteStatus> flushRecords() {
        try {
            LOG.info((Object)("Number of entries in MemoryBasedMap => " + this.bufferedRecords.getInMemoryMapNumEntries() + ", Total size in bytes of MemoryBasedMap => " + this.bufferedRecords.getCurrentInMemoryMapSize() + ", Number of entries in BitCaskDiskMap => " + this.bufferedRecords.getDiskBasedMapNumEntries() + ", Size of file spilled to disk => " + this.bufferedRecords.getSizeOfFileOnDiskInBytes()));
            List<Object> writeStatuses = new ArrayList();
            boolean isMorTable = Option.ofNullable(this.connectConfigs.getString(HoodieTableConfig.TYPE)).map(t -> t.equals(HoodieTableType.MERGE_ON_READ.name())).orElse(false);
            if (!this.bufferedRecords.isEmpty()) {
                writeStatuses = isMorTable ? this.writeClient.upsertPreppedRecords(new LinkedList(this.bufferedRecords.values()), this.instantTime) : this.writeClient.bulkInsertPreppedRecords(new LinkedList(this.bufferedRecords.values()), this.instantTime, Option.empty());
            }
            this.bufferedRecords.close();
            LOG.info((Object)("Flushed hudi records and got writeStatuses: " + writeStatuses));
            return writeStatuses;
        }
        catch (Exception e) {
            throw new HoodieIOException("Write records failed", new IOException(e));
        }
    }
}

