/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.io;

import io.hops.hudi.org.apache.avro.Schema;
import io.hops.hudi.org.apache.avro.generic.GenericData;
import io.hops.hudi.org.apache.avro.generic.GenericRecord;
import io.hops.hudi.org.apache.avro.generic.IndexedRecord;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.serialization.DefaultSerializer;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.cdc.HoodieCDCOperation;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SizeEstimator;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;

public class HoodieCDCLogger
implements Closeable {
    private final String commitTime;
    private final String keyField;
    private final String partitionPath;
    private final HoodieStorage storage;
    private final Schema dataSchema;
    private final HoodieLogFormat.Writer cdcWriter;
    private final HoodieCDCSupplementalLoggingMode cdcSupplementalLoggingMode;
    private final Schema cdcSchema;
    private final ExternalSpillableMap<String, HoodieAvroPayload> cdcData;
    private final Map<HoodieLogBlock.HeaderMetadataType, String> cdcDataBlockHeader;
    private final CDCTransformer transformer;
    private final long maxBlockSize;
    private long averageCDCRecordSize = 0L;
    private AtomicInteger numOfCDCRecordsInMemory = new AtomicInteger();
    private final SizeEstimator<HoodieAvroPayload> sizeEstimator;
    private final List<StoragePath> cdcAbsPaths;

    public HoodieCDCLogger(String commitTime, HoodieWriteConfig config, HoodieTableConfig tableConfig, String partitionPath, HoodieStorage storage, Schema schema, HoodieLogFormat.Writer cdcWriter, long maxInMemorySizeInBytes) {
        try {
            this.commitTime = commitTime;
            this.keyField = config.populateMetaFields() ? HoodieRecord.RECORD_KEY_METADATA_FIELD : tableConfig.getRecordKeyFieldProp();
            this.partitionPath = partitionPath;
            this.storage = storage;
            this.dataSchema = HoodieAvroUtils.removeMetadataFields(schema);
            this.cdcWriter = cdcWriter;
            this.cdcSupplementalLoggingMode = tableConfig.cdcSupplementalLoggingMode();
            this.cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(this.cdcSupplementalLoggingMode, this.dataSchema);
            this.cdcDataBlockHeader = new HashMap<HoodieLogBlock.HeaderMetadataType, String>();
            this.cdcDataBlockHeader.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, commitTime);
            this.cdcDataBlockHeader.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, this.cdcSchema.toString());
            this.sizeEstimator = new DefaultSizeEstimator<HoodieAvroPayload>();
            this.cdcData = new ExternalSpillableMap(maxInMemorySizeInBytes, config.getSpillableMapBasePath(), new DefaultSizeEstimator(), new DefaultSizeEstimator(), config.getCommonConfig().getSpillableDiskMapType(), new DefaultSerializer(), config.getCommonConfig().isBitCaskDiskMapCompressionEnabled(), this.getClass().getSimpleName());
            this.transformer = this.getTransformer();
            this.maxBlockSize = config.getLogFileDataBlockMaxSize();
            this.cdcAbsPaths = new ArrayList<StoragePath>();
        }
        catch (IOException e) {
            throw new HoodieUpsertException("Failed to initialize HoodieCDCLogger", e);
        }
    }

    public void put(HoodieRecord hoodieRecord, GenericRecord oldRecord, Option<IndexedRecord> newRecord) {
        GenericData.Record cdcRecord;
        String recordKey = hoodieRecord.getRecordKey();
        if (newRecord.isPresent()) {
            GenericRecord record = (GenericRecord)newRecord.get();
            cdcRecord = oldRecord == null ? this.transformer.transform(HoodieCDCOperation.INSERT, recordKey, null, record) : this.transformer.transform(HoodieCDCOperation.UPDATE, recordKey, oldRecord, record);
        } else {
            cdcRecord = this.transformer.transform(HoodieCDCOperation.DELETE, recordKey, oldRecord, null);
        }
        this.flushIfNeeded(false);
        HoodieAvroPayload payload = new HoodieAvroPayload(Option.of(cdcRecord));
        if (this.cdcData.isEmpty()) {
            this.averageCDCRecordSize = this.sizeEstimator.sizeEstimate(payload);
        }
        this.cdcData.put(recordKey, payload);
        this.numOfCDCRecordsInMemory.incrementAndGet();
    }

    private void flushIfNeeded(Boolean force) {
        if (force.booleanValue() || (long)this.numOfCDCRecordsInMemory.get() * this.averageCDCRecordSize >= this.maxBlockSize) {
            try {
                ArrayList<HoodieRecord> records = new ArrayList<HoodieRecord>();
                for (HoodieAvroPayload record : this.cdcData) {
                    try {
                        records.add(new HoodieAvroIndexedRecord(record.getInsertValue(this.cdcSchema).get()));
                    }
                    catch (IOException e) {
                        throw new HoodieIOException("Failed to get cdc record", e);
                    }
                }
                HoodieCDCDataBlock block = new HoodieCDCDataBlock(records, this.cdcDataBlockHeader, this.keyField);
                AppendResult result = this.cdcWriter.appendBlocks(Collections.singletonList(block));
                StoragePath cdcAbsPath = result.logFile().getPath();
                if (!this.cdcAbsPaths.contains(cdcAbsPath)) {
                    this.cdcAbsPaths.add(cdcAbsPath);
                }
                this.cdcData.clear();
                this.numOfCDCRecordsInMemory = new AtomicInteger();
            }
            catch (Exception e) {
                throw new HoodieException("Failed to write the cdc data to " + this.cdcWriter.getLogFile().getPath(), e);
            }
        }
    }

    public Map<String, Long> getCDCWriteStats() {
        HashMap<String, Long> stats = new HashMap<String, Long>();
        try {
            for (StoragePath cdcAbsPath : this.cdcAbsPaths) {
                String cdcFileName = cdcAbsPath.getName();
                String cdcPath = StringUtils.isNullOrEmpty(this.partitionPath) ? cdcFileName : this.partitionPath + "/" + cdcFileName;
                stats.put(cdcPath, this.storage.getPathInfo(cdcAbsPath).getLength());
            }
        }
        catch (IOException e) {
            throw new HoodieUpsertException("Failed to get cdc write stat", e);
        }
        return stats;
    }

    @Override
    public void close() {
        try {
            this.flushIfNeeded(true);
            if (this.cdcWriter != null) {
                this.cdcWriter.close();
            }
        }
        catch (IOException e) {
            throw new HoodieIOException("Failed to close HoodieCDCLogger", e);
        }
        finally {
            this.cdcData.close();
        }
    }

    private CDCTransformer getTransformer() {
        if (this.cdcSupplementalLoggingMode == HoodieCDCSupplementalLoggingMode.DATA_BEFORE_AFTER) {
            return (operation, recordKey, oldRecord, newRecord) -> HoodieCDCUtils.cdcRecord(this.cdcSchema, operation.getValue(), this.commitTime, this.removeCommitMetadata(oldRecord), this.removeCommitMetadata(newRecord));
        }
        if (this.cdcSupplementalLoggingMode == HoodieCDCSupplementalLoggingMode.DATA_BEFORE) {
            return (operation, recordKey, oldRecord, newRecord) -> HoodieCDCUtils.cdcRecord(this.cdcSchema, operation.getValue(), recordKey, this.removeCommitMetadata(oldRecord));
        }
        return (operation, recordKey, oldRecord, newRecord) -> HoodieCDCUtils.cdcRecord(this.cdcSchema, operation.getValue(), recordKey);
    }

    private GenericRecord removeCommitMetadata(GenericRecord record) {
        return record == null ? null : HoodieAvroUtils.rewriteRecordWithNewSchema(record, this.dataSchema, Collections.emptyMap());
    }

    private static interface CDCTransformer {
        public GenericData.Record transform(HoodieCDCOperation var1, String var2, GenericRecord var3, GenericRecord var4);
    }
}

