/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.hadoopcompatibility.mapred;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.InstantiationUtil;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

@Public
public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
extends RichGroupReduceFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>
implements ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>>,
Serializable {
    private static final long serialVersionUID = 1L;
    private transient Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reducer;
    private transient JobConf jobConf;
    private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> valueIterator;
    private transient HadoopOutputCollector<KEYOUT, VALUEOUT> reduceCollector;
    private transient Reporter reporter;

    public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer) {
        this(hadoopReducer, new JobConf());
    }

    public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf) {
        if (hadoopReducer == null) {
            throw new NullPointerException("Reducer may not be null.");
        }
        if (conf == null) {
            throw new NullPointerException("JobConf may not be null.");
        }
        this.reducer = hadoopReducer;
        this.jobConf = conf;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.reducer.configure(this.jobConf);
        this.reporter = new HadoopDummyReporter();
        this.reduceCollector = new HadoopOutputCollector();
        Class inKeyClass = (Class)TypeExtractor.getParameterType(Reducer.class, this.reducer.getClass(), 0);
        TypeSerializer keySerializer = TypeExtractor.getForClass(inKeyClass).createSerializer(this.getRuntimeContext().getExecutionConfig());
        this.valueIterator = new HadoopTupleUnwrappingIterator(keySerializer);
    }

    @Override
    public void reduce(Iterable<Tuple2<KEYIN, VALUEIN>> values2, Collector<Tuple2<KEYOUT, VALUEOUT>> out) throws Exception {
        this.reduceCollector.setFlinkCollector(out);
        this.valueIterator.set(values2.iterator());
        this.reducer.reduce(this.valueIterator.getCurrentKey(), this.valueIterator, this.reduceCollector, this.reporter);
    }

    @Override
    public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType() {
        Class outKeyClass = (Class)TypeExtractor.getParameterType(Reducer.class, this.reducer.getClass(), 2);
        Class outValClass = (Class)TypeExtractor.getParameterType(Reducer.class, this.reducer.getClass(), 3);
        TypeInformation keyTypeInfo = TypeExtractor.getForClass(outKeyClass);
        TypeInformation valueTypleInfo = TypeExtractor.getForClass(outValClass);
        return new TupleTypeInfo<Tuple2<KEYOUT, VALUEOUT>>(keyTypeInfo, valueTypleInfo);
    }

    private void writeObject(ObjectOutputStream out) throws IOException {
        out.writeObject(this.reducer.getClass());
        this.jobConf.write((DataOutput)out);
    }

    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        Class reducerClass = (Class)in.readObject();
        this.reducer = (Reducer)InstantiationUtil.instantiate(reducerClass);
        this.jobConf = new JobConf();
        this.jobConf.readFields((DataInput)in);
    }
}

