/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.graph.asm.dataset;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.AnalyticHelper;
import org.apache.flink.graph.asm.dataset.DataSetAnalyticBase;

public class Collect<T>
extends DataSetAnalyticBase<T, List<T>> {
    private static final String COLLECT = "collect";
    private CollectHelper<T> collectHelper;
    private TypeSerializer<T> serializer;

    public Collect<T> run(DataSet<T> input) throws Exception {
        super.run((DataSet)input);
        this.serializer = input.getType().createSerializer(this.env.getConfig());
        this.collectHelper = new CollectHelper<T>(this.serializer);
        input.output(this.collectHelper).name("Collect");
        return this;
    }

    @Override
    public List<T> getResult() {
        ArrayList accResult = (ArrayList)this.collectHelper.getAccumulator(this.env, COLLECT);
        if (accResult != null) {
            try {
                return SerializedListAccumulator.deserializeList((ArrayList)accResult, this.serializer);
            }
            catch (ClassNotFoundException e) {
                throw new RuntimeException("Cannot find type class of collected data type", e);
            }
            catch (IOException e) {
                throw new RuntimeException("Serialization error while deserializing collected data", e);
            }
        }
        throw new RuntimeException("Unable to retrieve the DataSet");
    }

    private static class CollectHelper<U>
    extends AnalyticHelper<U> {
        private SerializedListAccumulator<U> accumulator;
        private final TypeSerializer<U> serializer;

        public CollectHelper(TypeSerializer<U> serializer) {
            this.serializer = serializer;
        }

        @Override
        public void open(int taskNumber, int numTasks) {
            this.accumulator = new SerializedListAccumulator();
        }

        public void writeRecord(U record) throws IOException {
            this.accumulator.add(record, this.serializer);
        }

        public void close() throws IOException {
            this.addAccumulator(Collect.COLLECT, this.accumulator);
        }
    }
}

