/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.io;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.FlinkMergeAndReplaceHandle;
import org.apache.hudi.io.HoodieCDCLogger;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.table.HoodieTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkMergeAndReplaceHandleWithChangeLog<T, I, K, O>
extends FlinkMergeAndReplaceHandle<T, I, K, O> {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkMergeAndReplaceHandleWithChangeLog.class);
    private final HoodieCDCLogger cdcLogger;

    public FlinkMergeAndReplaceHandleWithChangeLog(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable, Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId, TaskContextSupplier taskContextSupplier, Path basePath) {
        super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier, basePath);
        this.cdcLogger = new HoodieCDCLogger(instantTime, config, hoodieTable.getMetaClient().getTableConfig(), partitionPath, this.getFileSystem(), this.getWriterSchema(), this.createLogWriter(instantTime, ".cdc"), IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
    }

    @Override
    protected boolean writeUpdateRecord(HoodieRecord<T> newRecord, HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOpt, Schema writerSchema) throws IOException {
        Option<HoodieRecord> savedCombineRecordOp = combineRecordOpt.map(HoodieRecord::newInstance);
        boolean result = super.writeUpdateRecord(newRecord, oldRecord, combineRecordOpt, writerSchema);
        if (result) {
            boolean isDelete = HoodieOperation.isDelete(newRecord.getOperation());
            Option avroRecordOpt = savedCombineRecordOp.flatMap(r -> FlinkMergeAndReplaceHandleWithChangeLog.toAvroRecord(r, writerSchema, this.config.getPayloadConfig().getProps()));
            this.cdcLogger.put(newRecord, (GenericRecord)oldRecord.getData(), isDelete ? Option.empty() : avroRecordOpt);
        }
        return result;
    }

    @Override
    protected void writeInsertRecord(HoodieRecord<T> newRecord) throws IOException {
        Schema schema = this.useWriterSchemaForCompaction ? this.writeSchemaWithMetaFields : this.writeSchema;
        HoodieRecord<T> savedRecord = newRecord.newInstance();
        super.writeInsertRecord(newRecord);
        if (!HoodieOperation.isDelete(newRecord.getOperation())) {
            this.cdcLogger.put(newRecord, null, savedRecord.toIndexedRecord(schema, this.config.getPayloadConfig().getProps()).map(HoodieRecord::getData));
            newRecord.deflate();
        }
    }

    @Override
    public List<WriteStatus> close() {
        List<WriteStatus> writeStatuses = super.close();
        this.cdcLogger.close();
        HoodieWriteStat stat = writeStatuses.get(0).getStat();
        stat.setCdcStats(this.cdcLogger.getCDCWriteStats());
        return writeStatuses;
    }
}

