/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.lookup;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.table.lookup.HoodieLookupTableReader;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieLookupFunction
extends TableFunction<RowData> {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieLookupFunction.class);
    private static final int MAX_RETRIES = 3;
    private static final Duration RETRY_INTERVAL = Duration.ofSeconds(10L);
    private final HoodieLookupTableReader partitionReader;
    private final RowData.FieldGetter[] lookupFieldGetters;
    private final Duration reloadInterval;
    private final TypeSerializer<RowData> serializer;
    private final RowType rowType;
    private transient Map<RowData, List<RowData>> cache;
    private transient long nextLoadTime;
    private transient HoodieTableMetaClient metaClient;
    private transient HoodieInstant currentCommit;
    private final Configuration conf;

    public HoodieLookupFunction(HoodieLookupTableReader partitionReader, RowType rowType, int[] lookupKeys, Duration reloadInterval, Configuration conf) {
        this.partitionReader = partitionReader;
        this.rowType = rowType;
        this.lookupFieldGetters = new RowData.FieldGetter[lookupKeys.length];
        for (int i = 0; i < lookupKeys.length; ++i) {
            this.lookupFieldGetters[i] = RowData.createFieldGetter((LogicalType)rowType.getTypeAt(lookupKeys[i]), (int)lookupKeys[i]);
        }
        this.reloadInterval = reloadInterval;
        this.serializer = InternalSerializers.create((RowType)rowType);
        this.conf = conf;
    }

    public void open(FunctionContext context) throws Exception {
        super.open(context);
        this.cache = new HashMap<RowData, List<RowData>>();
        this.nextLoadTime = -1L;
        org.apache.hadoop.conf.Configuration hadoopConf = HadoopConfigurations.getHadoopConf(this.conf);
        this.metaClient = StreamerUtil.metaClientForReader(this.conf, hadoopConf);
    }

    public TypeInformation<RowData> getResultType() {
        return InternalTypeInfo.of((RowType)this.rowType);
    }

    public void eval(Object ... values2) {
        this.checkCacheReload();
        GenericRowData lookupKey = GenericRowData.of((Object[])values2);
        List<RowData> matchedRows = this.cache.get(lookupKey);
        if (matchedRows != null) {
            for (RowData matchedRow : matchedRows) {
                this.collect(matchedRow);
            }
        }
    }

    private void checkCacheReload() {
        if (this.nextLoadTime > System.currentTimeMillis()) {
            return;
        }
        if (this.nextLoadTime > 0L) {
            LOG.info("Lookup join cache has expired after {} minute(s), reloading", (Object)this.reloadInterval.toMinutes());
        } else {
            LOG.info("Populating lookup join cache");
        }
        HoodieActiveTimeline latestCommit = this.metaClient.reloadActiveTimeline();
        Option<HoodieInstant> latestCommitInstant = latestCommit.getCommitsTimeline().lastInstant();
        if (latestCommit.empty()) {
            LOG.info("No commit instant found currently.");
            return;
        }
        if (latestCommitInstant.get().equals(this.currentCommit)) {
            LOG.info("Ignore loading data because the commit instant " + this.currentCommit + " has not changed.");
            return;
        }
        int numRetry = 0;
        while (true) {
            this.cache.clear();
            try {
                RowData row;
                long count = 0L;
                GenericRowData reuse = new GenericRowData(this.rowType.getFieldCount());
                this.partitionReader.open();
                while ((row = this.partitionReader.read((RowData)reuse)) != null) {
                    ++count;
                    RowData rowData = (RowData)this.serializer.copy((Object)row);
                    RowData key = this.extractLookupKey(rowData);
                    List rows = this.cache.computeIfAbsent(key, k -> new ArrayList());
                    rows.add(rowData);
                }
                this.partitionReader.close();
                this.nextLoadTime = System.currentTimeMillis() + this.reloadInterval.toMillis();
                LOG.info("Loaded {} row(s) into lookup join cache", (Object)count);
                return;
            }
            catch (Exception e) {
                if (numRetry >= 3) {
                    throw new FlinkRuntimeException(String.format("Failed to load table into cache after %d retries", numRetry), (Throwable)e);
                }
                long toSleep = (long)(++numRetry) * RETRY_INTERVAL.toMillis();
                LOG.warn(String.format("Failed to load table into cache, will retry in %d seconds", toSleep / 1000L), (Throwable)e);
                try {
                    Thread.sleep(toSleep);
                }
                catch (InterruptedException ex) {
                    LOG.warn("Interrupted while waiting to retry failed cache load, aborting");
                    throw new FlinkRuntimeException((Throwable)ex);
                }
            }
        }
    }

    private RowData extractLookupKey(RowData row) {
        GenericRowData key = new GenericRowData(this.lookupFieldGetters.length);
        for (int i = 0; i < this.lookupFieldGetters.length; ++i) {
            key.setField(i, this.lookupFieldGetters[i].getFieldOrNull(row));
        }
        return key;
    }

    public void close() throws Exception {
    }

    @VisibleForTesting
    public Duration getReloadInterval() {
        return this.reloadInterval;
    }
}

