/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.hadoop.realtime;

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader;
import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo;
import org.apache.hudi.hadoop.realtime.RealtimeSplit;
import org.apache.hudi.hadoop.utils.HiveAvroSerializer;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RealtimeCompactedRecordReader
extends AbstractRealtimeRecordReader
implements RecordReader<NullWritable, ArrayWritable> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractRealtimeRecordReader.class);
    protected final RecordReader<NullWritable, ArrayWritable> parquetReader;
    private final Map<String, HoodieRecord> deltaRecordMap;
    private final Set<String> deltaRecordKeys;
    private final HoodieMergedLogRecordScanner mergedLogRecordScanner;
    private final int recordKeyIndex;
    private Iterator<String> deltaItr;

    public RealtimeCompactedRecordReader(RealtimeSplit split, JobConf job, RecordReader<NullWritable, ArrayWritable> realReader) throws IOException {
        super(split, job);
        this.parquetReader = realReader;
        this.mergedLogRecordScanner = this.getMergedLogRecordScanner();
        this.deltaRecordMap = this.mergedLogRecordScanner.getRecords();
        this.deltaRecordKeys = new HashSet<String>(this.deltaRecordMap.keySet());
        this.recordKeyIndex = split.getVirtualKeyInfo().map(HoodieVirtualKeyInfo::getRecordKeyFieldIndex).orElse(2);
    }

    private HoodieMergedLogRecordScanner getMergedLogRecordScanner() throws IOException {
        return ((HoodieMergedLogRecordScanner.Builder)HoodieMergedLogRecordScanner.newBuilder().withStorage(HoodieStorageUtils.getStorage(this.split.getPath().toString(), HadoopFSUtils.getStorageConf((Configuration)this.jobConf))).withBasePath(this.split.getBasePath()).withLogFilePaths((List)this.split.getDeltaLogPaths())).withReaderSchema(this.getLogScannerReaderSchema()).withLatestInstantTime(this.split.getMaxCommitTime()).withMaxMemorySizeInBytes(HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(this.jobConf)).withReverseReader(false).withBufferSize(this.jobConf.getInt(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(), 0x100000)).withSpillableMapBasePath(this.jobConf.get(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(), FileIOUtils.getDefaultSpillableMapBasePath())).withDiskMapType((ExternalSpillableMap.DiskMapType)this.jobConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), (Enum)HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())).withBitCaskDiskMapCompressionEnabled(this.jobConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue().booleanValue())).withOptimizedLogBlocksScan(this.jobConf.getBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.key(), Boolean.parseBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))).withInternalSchema(this.schemaEvolutionContext.internalSchemaOption.orElse(InternalSchema.getEmptyInternalSchema())).build();
    }

    private Option<HoodieAvroIndexedRecord> buildGenericRecordwithCustomPayload(HoodieRecord record) throws IOException {
        if (this.usesCustomPayload) {
            return record.toIndexedRecord(this.getWriterSchema(), this.payloadProps);
        }
        return record.toIndexedRecord(this.getReaderSchema(), this.payloadProps);
    }

    public boolean next(NullWritable aVoid, ArrayWritable arrayWritable) throws IOException {
        Option<HoodieAvroIndexedRecord> rec;
        String key;
        while (this.parquetReader.next((Object)aVoid, (Object)arrayWritable)) {
            if (!this.deltaRecordMap.isEmpty() && this.deltaRecordMap.containsKey(key = arrayWritable.get()[this.recordKeyIndex].toString())) {
                this.deltaRecordKeys.remove(key);
                Option<HoodieAvroIndexedRecord> option = rec = this.supportPayload ? this.mergeRecord(this.deltaRecordMap.get(key), arrayWritable) : this.buildGenericRecordwithCustomPayload(this.deltaRecordMap.get(key));
                if (!rec.isPresent()) continue;
                this.setUpWritable(rec, arrayWritable, key);
                return true;
            }
            return true;
        }
        if (this.deltaItr == null) {
            this.deltaItr = this.deltaRecordKeys.iterator();
        }
        while (this.deltaItr.hasNext()) {
            key = this.deltaItr.next();
            rec = this.buildGenericRecordwithCustomPayload(this.deltaRecordMap.get(key));
            if (!rec.isPresent()) continue;
            this.setUpWritable(rec, arrayWritable, key);
            return true;
        }
        return false;
    }

    private void setUpWritable(Option<HoodieAvroIndexedRecord> rec, ArrayWritable arrayWritable, String key) {
        GenericRecord recordToReturn = (GenericRecord)rec.get().getData();
        if (this.usesCustomPayload) {
            recordToReturn = HoodieAvroUtils.rewriteRecord((GenericRecord)rec.get().getData(), this.getReaderSchema());
        }
        ArrayWritable aWritable = (ArrayWritable)HoodieRealtimeRecordReaderUtils.avroToArrayWritable(recordToReturn, this.getHiveSchema(), this.isSupportTimestamp());
        Writable[] replaceValue = aWritable.get();
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("key %s, base values: %s, log values: %s", key, HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable), HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable)));
        }
        Writable[] originalValue = arrayWritable.get();
        try {
            System.arraycopy(replaceValue, 0, originalValue, 0, Math.min(originalValue.length, replaceValue.length));
            arrayWritable.set(originalValue);
        }
        catch (RuntimeException re) {
            LOG.error("Got exception when doing array copy", (Throwable)re);
            LOG.error("Base record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable));
            LOG.error("Log record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable));
            String errMsg = "Base-record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable) + " ,Log-record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable) + " ,Error :" + re.getMessage();
            throw new RuntimeException(errMsg, re);
        }
    }

    private Option<HoodieAvroIndexedRecord> mergeRecord(HoodieRecord<?> newRecord, ArrayWritable writableFromParquet) throws IOException {
        GenericRecord oldRecord = this.convertArrayWritableToHoodieRecord(writableFromParquet);
        GenericRecord genericRecord = HiveAvroSerializer.rewriteRecordIgnoreResultCheck(oldRecord, this.getLogScannerReaderSchema());
        HoodieAvroIndexedRecord record = new HoodieAvroIndexedRecord(genericRecord);
        Option<Pair<HoodieRecord, Schema>> mergeResult = HoodieAvroRecordMerger.INSTANCE.merge(record, genericRecord.getSchema(), newRecord, this.getLogScannerReaderSchema(), this.payloadProps);
        return mergeResult.map(p -> (HoodieAvroIndexedRecord)p.getLeft());
    }

    private GenericRecord convertArrayWritableToHoodieRecord(ArrayWritable arrayWritable) {
        GenericRecord record = this.serializer.serialize(arrayWritable, this.getHiveSchema());
        return record;
    }

    public NullWritable createKey() {
        return (NullWritable)this.parquetReader.createKey();
    }

    public ArrayWritable createValue() {
        return (ArrayWritable)this.parquetReader.createValue();
    }

    public long getPos() throws IOException {
        return this.parquetReader.getPos();
    }

    public void close() throws IOException {
        this.parquetReader.close();
        this.mergedLogRecordScanner.close();
    }

    public float getProgress() throws IOException {
        return this.parquetReader.getProgress();
    }
}

