/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.commit;

import java.io.IOException;
import java.util.Iterator;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.AbstractMergeHelper;
import scala.collection.immutable.List;

public class FlinkMergeHelper<T extends HoodieRecordPayload>
extends AbstractMergeHelper<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
    private FlinkMergeHelper() {
    }

    public static FlinkMergeHelper newInstance() {
        return MergeHelperHolder.FLINK_MERGE_HELPER;
    }

    @Override
    public void runMerge(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, HoodieMergeHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> upsertHandle) throws IOException {
        GenericDatumReader gReader;
        GenericDatumWriter gWriter;
        Schema readSchema;
        boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation();
        Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
        HoodieMergeHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> mergeHandle = upsertHandle;
        HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
        if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) {
            readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema();
            gWriter = new GenericDatumWriter(readSchema);
            gReader = new GenericDatumReader(readSchema, mergeHandle.getWriterSchemaWithMetafields());
        } else {
            gReader = null;
            gWriter = null;
            readSchema = mergeHandle.getWriterSchemaWithMetafields();
        }
        BoundedInMemoryExecutor wrapper = null;
        HoodieFileReader<GenericRecord> reader = HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath());
        try {
            Iterator<Object> readerIterator = baseFile.getBootstrapBaseFile().isPresent() ? this.getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation) : reader.getRecordIterator(readSchema);
            ThreadLocal encoderCache = new ThreadLocal();
            ThreadLocal decoderCache = new ThreadLocal();
            wrapper = new BoundedInMemoryExecutor(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer(readerIterator), Option.of(new AbstractMergeHelper.UpdateHandler(mergeHandle)), record -> {
                if (!externalSchemaTransformation) {
                    return record;
                }
                return this.transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, decoderCache, (GenericRecord)record);
            });
            wrapper.execute();
        }
        catch (Exception e) {
            throw new HoodieException(e);
        }
        finally {
            if (reader != null) {
                reader.close();
            }
            mergeHandle.close();
            if (null != wrapper) {
                wrapper.shutdownNow();
            }
        }
    }

    private static class MergeHelperHolder {
        private static final FlinkMergeHelper FLINK_MERGE_HELPER = new FlinkMergeHelper();

        private MergeHelperHolder() {
        }
    }
}

