/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.hadoopcompatibility.mapred;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
import org.apache.flink.util.Collector;
import org.apache.flink.util.InstantiationUtil;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reporter;

@Public
public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
extends RichFlatMapFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>
implements ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>>,
Serializable {
    private static final long serialVersionUID = 1L;
    private transient Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapper;
    private transient JobConf jobConf;
    private transient HadoopOutputCollector<KEYOUT, VALUEOUT> outputCollector;
    private transient Reporter reporter;

    public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper) {
        this(hadoopMapper, new JobConf());
    }

    public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf) {
        if (hadoopMapper == null) {
            throw new NullPointerException("Mapper may not be null.");
        }
        if (conf == null) {
            throw new NullPointerException("JobConf may not be null.");
        }
        this.mapper = hadoopMapper;
        this.jobConf = conf;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.mapper.configure(this.jobConf);
        this.reporter = new HadoopDummyReporter();
        this.outputCollector = new HadoopOutputCollector();
    }

    @Override
    public void flatMap(Tuple2<KEYIN, VALUEIN> value, Collector<Tuple2<KEYOUT, VALUEOUT>> out) throws Exception {
        this.outputCollector.setFlinkCollector(out);
        this.mapper.map(value.f0, value.f1, this.outputCollector, this.reporter);
    }

    @Override
    public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType() {
        Class outKeyClass = (Class)TypeExtractor.getParameterType(Mapper.class, this.mapper.getClass(), 2);
        Class outValClass = (Class)TypeExtractor.getParameterType(Mapper.class, this.mapper.getClass(), 3);
        TypeInformation keyTypeInfo = TypeExtractor.getForClass(outKeyClass);
        TypeInformation valueTypleInfo = TypeExtractor.getForClass(outValClass);
        return new TupleTypeInfo<Tuple2<KEYOUT, VALUEOUT>>(keyTypeInfo, valueTypleInfo);
    }

    private void writeObject(ObjectOutputStream out) throws IOException {
        out.writeObject(this.mapper.getClass());
        this.jobConf.write((DataOutput)out);
    }

    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        Class mapperClass = (Class)in.readObject();
        this.mapper = (Mapper)InstantiationUtil.instantiate(mapperClass);
        this.jobConf = new JobConf();
        this.jobConf.readFields((DataInput)in);
    }
}

