/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink;

import io.hops.hudi.org.apache.avro.Schema;
import io.hops.hudi.org.apache.avro.generic.GenericRecord;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.function.BiFunction;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.model.HoodieFlinkInternalRow;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.ObjectSizeCalculator;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metrics.FlinkStreamWriteMetrics;
import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.utils.PayloadCreation;
import org.apache.hudi.table.action.commit.FlinkWriteHelper;
import org.apache.hudi.util.RowDataToAvroConverters;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamWriteFunction
extends AbstractStreamWriteFunction<HoodieFlinkInternalRow> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(StreamWriteFunction.class);
    private transient Map<String, DataBucket> buckets;
    protected transient BiFunction<List<HoodieRecord>, String, List<WriteStatus>> writeFunction;
    private transient HoodieRecordMerger recordMerger;
    protected final RowType rowType;
    protected transient Schema avroSchema;
    protected transient RowDataToAvroConverters.RowDataToAvroConverter converter;
    protected transient PayloadCreation payloadCreation;
    private transient TotalSizeTracer tracer;
    protected transient FlinkStreamWriteMetrics writeMetrics;

    public StreamWriteFunction(Configuration config, RowType rowType) {
        super(config);
        this.rowType = rowType;
    }

    public void open(Configuration parameters) throws IOException {
        this.tracer = new TotalSizeTracer(this.config);
        this.initBuffer();
        this.initWriteFunction();
        this.initMergeClass();
        this.registerMetrics();
        this.preparePayload();
    }

    @Override
    public void snapshotState() {
        this.flushRemaining(false);
    }

    public void processElement(HoodieFlinkInternalRow record, ProcessFunction.Context ctx, Collector<Object> out) throws Exception {
        this.bufferRecord(record);
    }

    public void close() {
        if (this.writeClient != null) {
            this.writeClient.close();
        }
    }

    @Override
    public void endInput() {
        super.endInput();
        this.flushRemaining(true);
        this.writeClient.cleanHandles();
        this.writeStatuses.clear();
    }

    @VisibleForTesting
    public Map<String, List<HoodieRecord>> getDataBuffer() {
        HashMap<String, List<HoodieRecord>> ret = new HashMap<String, List<HoodieRecord>>();
        for (Map.Entry<String, DataBucket> entry : this.buckets.entrySet()) {
            ret.put(entry.getKey(), this.convertToHoodieRecords(entry.getValue().getRecords()));
        }
        return ret;
    }

    private void initBuffer() {
        this.buckets = new LinkedHashMap<String, DataBucket>();
    }

    private void initWriteFunction() {
        String writeOperation = (String)this.config.get(FlinkOptions.OPERATION);
        switch (WriteOperationType.fromValue(writeOperation)) {
            case INSERT: {
                this.writeFunction = (records, instantTime) -> this.writeClient.insert(records, (String)instantTime);
                break;
            }
            case UPSERT: 
            case DELETE: 
            case DELETE_PREPPED: {
                this.writeFunction = (records, instantTime) -> this.writeClient.upsert(records, (String)instantTime);
                break;
            }
            case INSERT_OVERWRITE: {
                this.writeFunction = (records, instantTime) -> this.writeClient.insertOverwrite(records, (String)instantTime);
                break;
            }
            case INSERT_OVERWRITE_TABLE: {
                this.writeFunction = (records, instantTime) -> this.writeClient.insertOverwriteTable(records, (String)instantTime);
                break;
            }
            default: {
                throw new RuntimeException("Unsupported write operation : " + writeOperation);
            }
        }
    }

    private void initMergeClass() {
        this.recordMerger = HoodieRecordUtils.mergerToPreCombineMode(this.writeClient.getConfig().getRecordMerger());
        LOG.info("init hoodie merge with class [{}]", (Object)this.recordMerger.getClass().getName());
    }

    protected void preparePayload() {
        this.avroSchema = StreamerUtil.getSourceSchema(this.config);
        this.converter = RowDataToAvroConverters.createConverter((LogicalType)this.rowType, this.config.getBoolean(FlinkOptions.WRITE_UTC_TIMEZONE));
        try {
            this.payloadCreation = PayloadCreation.instance(this.config);
        }
        catch (Exception ex) {
            throw new HoodieException("Failed payload creation in StreamWriteFunction", ex);
        }
    }

    private String getBucketID(String partitionPath, String fileId) {
        return StreamerUtil.generateBucketKey(partitionPath, fileId);
    }

    protected void bufferRecord(HoodieFlinkInternalRow record) {
        this.writeMetrics.markRecordIn();
        String bucketID = this.getBucketID(record.getPartitionPath(), record.getFileId());
        DataBucket bucket = this.buckets.computeIfAbsent(bucketID, k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE)));
        bucket.records.add(record);
        boolean isFullBucket = bucket.detector.detect(record);
        boolean isFullBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
        this.writeMetrics.setWriteBufferedSize(this.tracer.bufferSize);
        if (isFullBucket) {
            if (this.flushBucket(bucket)) {
                this.tracer.countDown(bucket.detector.totalSize);
                bucket.reset();
            }
        } else if (isFullBuffer) {
            DataBucket bucketToFlush = this.buckets.values().stream().max(Comparator.comparingLong(b -> ((DataBucket)b).detector.totalSize)).orElseThrow(NoSuchElementException::new);
            if (this.flushBucket(bucketToFlush)) {
                this.tracer.countDown(bucketToFlush.detector.totalSize);
                bucketToFlush.reset();
            } else {
                LOG.warn("The buffer size hits the threshold {}, but still flush the max size data bucket failed!", (Object)this.tracer.maxBufferSize);
            }
        }
    }

    private boolean hasData() {
        return !this.buckets.isEmpty() && this.buckets.values().stream().anyMatch(bucket -> !((DataBucket)bucket).records.isEmpty());
    }

    private boolean flushBucket(DataBucket bucket) {
        String instant = this.instantToWrite(true);
        if (instant == null) {
            LOG.info("No inflight instant when flushing data, skip.");
            return false;
        }
        ValidationUtils.checkState(!bucket.isEmpty(), "Data bucket to flush has no buffering records");
        List<WriteStatus> writeStatus = this.writeRecords(instant, bucket.getRecords());
        WriteMetadataEvent event = WriteMetadataEvent.builder().taskID(this.taskID).instantTime(instant).writeStatus(writeStatus).lastBatch(false).endInput(false).build();
        this.eventGateway.sendEventToCoordinator((OperatorEvent)event);
        this.writeStatuses.addAll(writeStatus);
        return true;
    }

    private void flushRemaining(boolean endInput) {
        ArrayList<WriteStatus> writeStatus;
        this.writeMetrics.startDataFlush();
        this.currentInstant = this.instantToWrite(this.hasData());
        if (this.currentInstant == null) {
            throw new HoodieException("No inflight instant when flushing data!");
        }
        if (!this.buckets.isEmpty()) {
            writeStatus = new ArrayList();
            this.buckets.values().forEach(bucket -> {
                if (!bucket.isEmpty()) {
                    writeStatus.addAll(this.writeRecords(this.currentInstant, bucket.getRecords()));
                    bucket.reset();
                }
            });
        } else {
            LOG.info("No data to write in subtask [{}] for instant [{}]", (Object)this.taskID, (Object)this.currentInstant);
            writeStatus = Collections.emptyList();
        }
        WriteMetadataEvent event = WriteMetadataEvent.builder().taskID(this.taskID).instantTime(this.currentInstant).writeStatus(writeStatus).lastBatch(true).endInput(endInput).build();
        this.eventGateway.sendEventToCoordinator((OperatorEvent)event);
        this.buckets.clear();
        this.tracer.reset();
        this.writeClient.cleanHandles();
        this.writeStatuses.addAll(writeStatus);
        this.confirming = true;
        this.writeMetrics.endDataFlush();
        this.writeMetrics.resetAfterCommit();
    }

    private void registerMetrics() {
        OperatorMetricGroup metrics = this.getRuntimeContext().getMetricGroup();
        this.writeMetrics = new FlinkStreamWriteMetrics((MetricGroup)metrics);
        this.writeMetrics.registerMetrics();
    }

    protected List<WriteStatus> writeRecords(String instant, List<HoodieFlinkInternalRow> records) {
        this.writeMetrics.startFileFlush();
        List<WriteStatus> statuses = this.writeFunction.apply(this.deduplicateRecordsIfNeeded(this.convertToHoodieRecords(records)), instant);
        this.writeMetrics.endFileFlush();
        this.writeMetrics.increaseNumOfFilesWritten();
        return statuses;
    }

    protected List<HoodieRecord> convertToHoodieRecords(List<HoodieFlinkInternalRow> records) {
        List<HoodieRecord> hoodieRecords = Arrays.asList(new HoodieRecord[records.size()]);
        for (int i = 0; i < records.size(); ++i) {
            HoodieRecordPayload<?> payload;
            HoodieFlinkInternalRow record = records.get(i);
            RowData row = record.getRowData();
            GenericRecord gr = (GenericRecord)this.converter.convert(this.avroSchema, row);
            try {
                payload = this.payloadCreation.createPayload(gr);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            HoodieAvroRecord hoodieRecord = new HoodieAvroRecord(new HoodieKey(record.getRecordKey(), record.getPartitionPath()), payload, HoodieOperation.fromName(record.getOperationType()));
            hoodieRecord.unseal();
            hoodieRecord.setCurrentLocation(new HoodieRecordLocation(record.getInstantTime(), record.getFileId()));
            hoodieRecord.seal();
            hoodieRecords.set(i, hoodieRecord);
        }
        return hoodieRecords;
    }

    protected List<HoodieRecord> deduplicateRecordsIfNeeded(List<HoodieRecord> records) {
        if (this.config.getBoolean(FlinkOptions.PRE_COMBINE)) {
            return FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex<?, ?>)null, -1, this.writeClient.getConfig().getSchema(), this.writeClient.getConfig().getProps(), this.recordMerger);
        }
        return records;
    }

    private static class TotalSizeTracer {
        private long bufferSize = 0L;
        private final double maxBufferSize;

        TotalSizeTracer(Configuration conf) {
            long mergeReaderMem = 100L;
            long mergeMapMaxMem = conf.getInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY);
            this.maxBufferSize = (conf.getDouble(FlinkOptions.WRITE_TASK_MAX_SIZE) - (double)mergeReaderMem - (double)mergeMapMaxMem) * 1024.0 * 1024.0;
            String errMsg = String.format("'%s' should be at least greater than '%s' plus merge reader memory(constant 100MB now)", FlinkOptions.WRITE_TASK_MAX_SIZE.key(), FlinkOptions.WRITE_MERGE_MAX_MEMORY.key());
            ValidationUtils.checkState(this.maxBufferSize > 0.0, errMsg);
        }

        boolean trace(long recordSize) {
            this.bufferSize += recordSize;
            return (double)this.bufferSize > this.maxBufferSize;
        }

        void countDown(long size) {
            this.bufferSize -= size;
        }

        public void reset() {
            this.bufferSize = 0L;
        }
    }

    private static class BufferSizeDetector {
        private final Random random = new Random(47L);
        private static final int DENOMINATOR = 100;
        private final double batchSizeBytes;
        private long lastRecordSize = -1L;
        private long totalSize = 0L;

        BufferSizeDetector(double batchSizeMb) {
            this.batchSizeBytes = batchSizeMb * 1024.0 * 1024.0;
        }

        boolean detect(Object record) {
            if (this.lastRecordSize == -1L || this.sampling()) {
                this.lastRecordSize = ObjectSizeCalculator.getObjectSize(record);
            }
            this.totalSize += this.lastRecordSize;
            return (double)this.totalSize > this.batchSizeBytes;
        }

        boolean sampling() {
            return this.random.nextInt(100) == 1;
        }

        void reset() {
            this.lastRecordSize = -1L;
            this.totalSize = 0L;
        }
    }

    protected static class DataBucket {
        private final List<HoodieFlinkInternalRow> records = new ArrayList<HoodieFlinkInternalRow>();
        private final BufferSizeDetector detector;

        private DataBucket(Double batchSize) {
            this.detector = new BufferSizeDetector(batchSize);
        }

        public List<HoodieFlinkInternalRow> getRecords() {
            return this.records;
        }

        public boolean isEmpty() {
            return this.records.isEmpty();
        }

        public void reset() {
            this.records.clear();
            this.detector.reset();
        }
    }
}

