/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.format;

import io.hops.hudi.org.apache.avro.Schema;
import io.hops.hudi.org.apache.avro.generic.GenericRecord;
import io.hops.hudi.org.apache.avro.generic.GenericRecordBuilder;
import io.hops.hudi.org.apache.avro.generic.IndexedRecord;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.PrimitiveIterator;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.serialization.DefaultSerializer;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
import org.apache.hudi.common.util.queue.HoodieProducer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;

public class FormatUtils {
    private FormatUtils() {
    }

    public static void setRowKind(RowData rowData, IndexedRecord record, int index) {
        if (index == -1) {
            return;
        }
        rowData.setRowKind(FormatUtils.getRowKind(record, index));
    }

    private static RowKind getRowKind(IndexedRecord record, int index) {
        Object val = record.get(index);
        if (val == null) {
            return RowKind.INSERT;
        }
        HoodieOperation operation = HoodieOperation.fromName(val.toString());
        if (HoodieOperation.isInsert(operation)) {
            return RowKind.INSERT;
        }
        if (HoodieOperation.isUpdateBefore(operation)) {
            return RowKind.UPDATE_BEFORE;
        }
        if (HoodieOperation.isUpdateAfter(operation)) {
            return RowKind.UPDATE_AFTER;
        }
        if (HoodieOperation.isDelete(operation)) {
            return RowKind.DELETE;
        }
        throw new AssertionError();
    }

    public static RowKind getRowKindSafely(IndexedRecord record, int index) {
        if (index == -1) {
            return RowKind.INSERT;
        }
        return FormatUtils.getRowKind(record, index);
    }

    public static GenericRecord buildAvroRecordBySchema(IndexedRecord record, Schema requiredSchema, int[] requiredPos, GenericRecordBuilder recordBuilder) {
        List<Schema.Field> requiredFields = requiredSchema.getFields();
        assert (requiredFields.size() == requiredPos.length);
        PrimitiveIterator.OfInt positionIterator = Arrays.stream(requiredPos).iterator();
        requiredFields.forEach(f -> recordBuilder.set((Schema.Field)f, FormatUtils.getVal(record, (Integer)positionIterator.next())));
        return recordBuilder.build();
    }

    private static Object getVal(IndexedRecord record, int pos) {
        return pos == -1 ? null : record.get(pos);
    }

    public static ExternalSpillableMap<String, byte[]> spillableMap(HoodieWriteConfig writeConfig, long maxCompactionMemoryInBytes, String loggingContext) {
        try {
            return new ExternalSpillableMap<String, byte[]>(maxCompactionMemoryInBytes, writeConfig.getSpillableMapBasePath(), new DefaultSizeEstimator(), new DefaultSizeEstimator(), writeConfig.getCommonConfig().getSpillableDiskMapType(), new DefaultSerializer(), writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled(), loggingContext);
        }
        catch (IOException e) {
            throw new HoodieIOException("IOException when creating ExternalSpillableMap at " + writeConfig.getSpillableMapBasePath(), e);
        }
    }

    public static HoodieMergedLogRecordScanner logScanner(MergeOnReadInputSplit split, Schema logSchema, InternalSchema internalSchema, Configuration flinkConf, org.apache.hadoop.conf.Configuration hadoopConf) {
        HoodieWriteConfig writeConfig = FlinkWriteClients.getHoodieClientConfig(flinkConf);
        HoodieStorage storage = HoodieStorageUtils.getStorage(split.getTablePath(), HadoopFSUtils.getStorageConf(hadoopConf));
        return ((HoodieMergedLogRecordScanner.Builder)((HoodieMergedLogRecordScanner.Builder)HoodieMergedLogRecordScanner.newBuilder().withStorage(storage).withBasePath(split.getTablePath()).withLogFilePaths((List)split.getLogPaths().get())).withReaderSchema(logSchema).withInternalSchema(internalSchema).withLatestInstantTime(split.getLatestCommit()).withReverseReader(false).withBufferSize(writeConfig.getMaxDFSStreamBufferSize()).withMaxMemorySizeInBytes(split.getMaxCompactionMemoryInBytes()).withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType()).withBitCaskDiskMapCompressionEnabled(writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled()).withSpillableMapBasePath(writeConfig.getSpillableMapBasePath()).withInstantRange((Option)split.getInstantRange())).withOperationField(flinkConf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)).withRecordMerger(writeConfig.getRecordMerger()).build();
    }

    public static HoodieMergedLogRecordScanner logScanner(List<String> logPaths, Schema logSchema, String latestInstantTime, HoodieWriteConfig writeConfig, org.apache.hadoop.conf.Configuration hadoopConf) {
        String basePath = writeConfig.getBasePath();
        return ((HoodieMergedLogRecordScanner.Builder)HoodieMergedLogRecordScanner.newBuilder().withStorage(HoodieStorageUtils.getStorage(basePath, HadoopFSUtils.getStorageConf(hadoopConf))).withBasePath(basePath).withLogFilePaths((List)logPaths)).withReaderSchema(logSchema).withLatestInstantTime(latestInstantTime).withReverseReader(false).withBufferSize(writeConfig.getMaxDFSStreamBufferSize()).withMaxMemorySizeInBytes(writeConfig.getMaxMemoryPerPartitionMerge()).withSpillableMapBasePath(writeConfig.getSpillableMapBasePath()).withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType()).withBitCaskDiskMapCompressionEnabled(writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled()).withRecordMerger(writeConfig.getRecordMerger()).build();
    }

    public static Option<String> getRawValueWithAltKeys(Configuration flinkConf, ConfigProperty<?> configProperty) {
        if (flinkConf.containsKey(configProperty.key())) {
            return Option.ofNullable(flinkConf.getString(configProperty.key(), ""));
        }
        for (String alternative : configProperty.getAlternatives()) {
            if (!flinkConf.containsKey(alternative)) continue;
            return Option.ofNullable(flinkConf.getString(alternative, ""));
        }
        return Option.empty();
    }

    public static boolean getBooleanWithAltKeys(Configuration conf, ConfigProperty<?> configProperty) {
        Option<String> rawValue = FormatUtils.getRawValueWithAltKeys(conf, configProperty);
        boolean defaultValue = configProperty.hasDefaultValue() && Boolean.parseBoolean(configProperty.defaultValue().toString());
        return rawValue.map(Boolean::parseBoolean).orElse(defaultValue);
    }

    private static Boolean string2Boolean(String s) {
        return "true".equals(s.toLowerCase(Locale.ROOT));
    }

    public static class BoundedMemoryRecords {
        private final BoundedInMemoryExecutor<HoodieRecord<?>, HoodieRecord<?>, ?> executor;
        private final Iterator<HoodieRecord<?>> iterator;

        public BoundedMemoryRecords(MergeOnReadInputSplit split, Schema logSchema, InternalSchema internalSchema, org.apache.hadoop.conf.Configuration hadoopConf, Configuration flinkConf) {
            List<String> mergers = Arrays.stream(flinkConf.getString(FlinkOptions.RECORD_MERGER_IMPLS).split(",")).map(String::trim).distinct().collect(Collectors.toList());
            HoodieRecordMerger merger = HoodieRecordUtils.createRecordMerger(split.getTablePath(), EngineType.FLINK, mergers, flinkConf.getString(FlinkOptions.RECORD_MERGER_STRATEGY_ID));
            HoodieUnMergedLogRecordScanner.Builder scannerBuilder = ((HoodieUnMergedLogRecordScanner.Builder)((HoodieUnMergedLogRecordScanner.Builder)HoodieUnMergedLogRecordScanner.newBuilder().withStorage(HoodieStorageUtils.getStorage(split.getTablePath(), HadoopFSUtils.getStorageConf(hadoopConf))).withBasePath(split.getTablePath()).withLogFilePaths((List)split.getLogPaths().get())).withReaderSchema(logSchema).withInternalSchema(internalSchema).withLatestInstantTime(split.getLatestCommit()).withReverseReader(false).withBufferSize(flinkConf.getInteger(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(), 0x100000)).withInstantRange((Option)split.getInstantRange())).withRecordMerger(merger);
            this.executor = new BoundedInMemoryExecutor(StreamerUtil.getMaxCompactionMemoryInBytes(flinkConf), this.getParallelProducers(scannerBuilder), Option.empty(), Function.identity(), new DefaultSizeEstimator(), Functions.noop());
            this.iterator = this.executor.getRecordIterator();
            this.executor.startProducingAsync();
        }

        public Iterator<HoodieRecord<?>> getRecordsIterator() {
            return this.iterator;
        }

        private List<HoodieProducer<HoodieRecord<?>>> getParallelProducers(HoodieUnMergedLogRecordScanner.Builder scannerBuilder) {
            ArrayList producers = new ArrayList();
            producers.add(new FunctionBasedQueueProducer(queue -> {
                HoodieUnMergedLogRecordScanner scanner = scannerBuilder.withLogRecordScannerCallback(queue::insertRecord).build();
                scanner.scan();
                return null;
            }));
            return producers;
        }

        public void close() {
            this.executor.shutdownNow();
        }
    }
}

