/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.connect.writers;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.hudi.client.HoodieJavaWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.serialization.CustomSerializer;
import org.apache.hudi.common.serialization.DefaultSerializer;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SizeEstimator;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.connect.writers.AbstractConnectWriter;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.schema.SchemaProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BufferedConnectWriter
extends AbstractConnectWriter {
    private static final Logger LOG = LoggerFactory.getLogger(BufferedConnectWriter.class);
    private final HoodieEngineContext context;
    private final HoodieJavaWriteClient writeClient;
    private final HoodieWriteConfig config;
    private ExternalSpillableMap<String, HoodieRecord<?>> bufferedRecords;

    public BufferedConnectWriter(HoodieEngineContext context, HoodieJavaWriteClient writeClient, String instantTime, KafkaConnectConfigs connectConfigs, HoodieWriteConfig config, KeyGenerator keyGenerator, SchemaProvider schemaProvider) {
        super(connectConfigs, keyGenerator, schemaProvider, instantTime);
        this.context = context;
        this.writeClient = writeClient;
        this.config = config;
        this.init();
    }

    private void init() {
        try {
            long memoryForMerge = IOUtils.getMaxMemoryPerPartitionMerge((TaskContextSupplier)this.context.getTaskContextSupplier(), (HoodieConfig)this.config);
            LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
            this.bufferedRecords = new ExternalSpillableMap(memoryForMerge, this.config.getSpillableMapBasePath(), (SizeEstimator)new DefaultSizeEstimator(), (SizeEstimator)new HoodieRecordSizeEstimator(new Schema.Parser().parse(this.config.getSchema())), this.config.getCommonConfig().getSpillableDiskMapType(), (CustomSerializer)new DefaultSerializer(), this.config.getCommonConfig().isBitCaskDiskMapCompressionEnabled(), this.getClass().getSimpleName());
        }
        catch (IOException io) {
            throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io);
        }
    }

    @Override
    public void writeHudiRecord(HoodieRecord<?> record) {
        this.bufferedRecords.put((Serializable)((Object)record.getRecordKey()), record);
    }

    @Override
    public List<WriteStatus> flushRecords() {
        try {
            LOG.info("Number of entries in MemoryBasedMap => " + this.bufferedRecords.getInMemoryMapNumEntries() + ", Total size in bytes of MemoryBasedMap => " + this.bufferedRecords.getCurrentInMemoryMapSize() + ", Number of entries in BitCaskDiskMap => " + this.bufferedRecords.getDiskBasedMapNumEntries() + ", Size of file spilled to disk => " + this.bufferedRecords.getSizeOfFileOnDiskInBytes());
            List writeStatuses = new ArrayList();
            boolean isMorTable = (Boolean)Option.ofNullable((Object)this.connectConfigs.getString(HoodieTableConfig.TYPE)).map(t -> t.equals(HoodieTableType.MERGE_ON_READ.name())).orElse((Object)false);
            if (!this.bufferedRecords.isEmpty()) {
                writeStatuses = isMorTable ? this.writeClient.upsertPreppedRecords(new LinkedList(this.bufferedRecords.values()), this.instantTime) : this.writeClient.bulkInsertPreppedRecords(new LinkedList(this.bufferedRecords.values()), this.instantTime, Option.empty());
            }
            this.bufferedRecords.close();
            LOG.info("Flushed hudi records and got writeStatuses: " + writeStatuses);
            return writeStatuses;
        }
        catch (Exception e) {
            throw new HoodieIOException("Write records failed", new IOException(e));
        }
    }
}

