/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.graph.library.linkanalysis;

import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichJoinFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.operators.CoGroupOperator;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.operators.ReduceOperator;
import org.apache.flink.api.java.operators.UnionOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.directed.EdgeSourceDegrees;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees;
import org.apache.flink.graph.asm.result.PrintableResult;
import org.apache.flink.graph.asm.result.UnaryResultBase;
import org.apache.flink.graph.library.linkanalysis.Functions;
import org.apache.flink.graph.utils.GraphUtils;
import org.apache.flink.graph.utils.MurmurHash;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.types.DoubleValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

public class PageRank<K, VV, EV>
extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
    private static final String VERTEX_COUNT = "vertex count";
    private static final String SUM_OF_SCORES = "sum of scores";
    private static final String CHANGE_IN_SCORES = "change in scores";
    private final double dampingFactor;
    private int maxIterations;
    private double convergenceThreshold;
    private boolean includeZeroDegreeVertices = false;

    public PageRank(double dampingFactor, int iterations) {
        this(dampingFactor, iterations, Double.MAX_VALUE);
    }

    public PageRank(double dampingFactor, double convergenceThreshold) {
        this(dampingFactor, Integer.MAX_VALUE, convergenceThreshold);
    }

    public PageRank(double dampingFactor, int maxIterations, double convergenceThreshold) {
        Preconditions.checkArgument((0.0 < dampingFactor && dampingFactor < 1.0 ? 1 : 0) != 0, (Object)"Damping factor must be between zero and one");
        Preconditions.checkArgument((maxIterations > 0 ? 1 : 0) != 0, (Object)"Number of iterations must be greater than zero");
        Preconditions.checkArgument((convergenceThreshold > 0.0 ? 1 : 0) != 0, (Object)"Convergence threshold must be greater than zero");
        this.dampingFactor = dampingFactor;
        this.maxIterations = maxIterations;
        this.convergenceThreshold = convergenceThreshold;
    }

    public PageRank<K, VV, EV> setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) {
        this.includeZeroDegreeVertices = includeZeroDegreeVertices;
        return this;
    }

    @Override
    protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
        if (!super.canMergeConfigurationWith(other)) {
            return false;
        }
        PageRank rhs = (PageRank)other;
        return this.dampingFactor == rhs.dampingFactor && this.includeZeroDegreeVertices == rhs.includeZeroDegreeVertices;
    }

    @Override
    protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
        super.mergeConfiguration(other);
        PageRank rhs = (PageRank)other;
        this.maxIterations = Math.max(this.maxIterations, rhs.maxIterations);
        this.convergenceThreshold = Math.min(this.convergenceThreshold, rhs.convergenceThreshold);
    }

    @Override
    public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input) throws Exception {
        Operator passThrough;
        DataSet vertexDegree = (DataSet)input.run(new VertexDegrees().setIncludeZeroDegreeVertices(this.includeZeroDegreeVertices).setParallelism(this.parallelism));
        DataSet<LongValue> vertexCount = GraphUtils.count(vertexDegree);
        Operator edgeSourceDegree = ((MapOperator)((DataSet)input.run(new EdgeSourceDegrees().setParallelism(this.parallelism))).map(new ExtractSourceDegree()).setParallelism(this.parallelism)).name("Extract source degree");
        Operator sourceVertices = ((FlatMapOperator)vertexDegree.flatMap(new InitializeSourceVertices()).setParallelism(this.parallelism)).name("Initialize source vertex scores");
        Operator initialScores = ((MapOperator)((MapOperator)vertexDegree.map(new InitializeVertexScores()).withBroadcastSet(vertexCount, VERTEX_COUNT)).setParallelism(this.parallelism)).name("Initialize scores");
        IterativeDataSet iterative = (IterativeDataSet)initialScores.iterate(this.maxIterations).setParallelism(this.parallelism);
        Operator vertexScores = ((ReduceOperator)((CoGroupOperator)((CoGroupOperator)iterative.coGroup((DataSet)edgeSourceDegree).where(new int[]{0}).equalTo(new int[]{0}).with(new SendScore()).setParallelism(this.parallelism)).name("Send score")).groupBy(new int[]{0}).reduce(new Functions.SumScore()).setCombineHint(ReduceOperatorBase.CombineHint.HASH).setParallelism(this.parallelism)).name("Sum");
        Operator sumOfScores = ((ReduceOperator)vertexScores.reduce(new SumVertexScores()).setParallelism(this.parallelism)).name("Sum");
        Operator adjustedScores = ((MapOperator)((MapOperator)((MapOperator)((UnionOperator)vertexScores.union((DataSet)sourceVertices).name("Union with source vertices")).map(new AdjustScores(this.dampingFactor)).withBroadcastSet((DataSet)sumOfScores, SUM_OF_SCORES)).withBroadcastSet(vertexCount, VERTEX_COUNT)).setParallelism(this.parallelism)).name("Adjust scores");
        if (this.convergenceThreshold < Double.MAX_VALUE) {
            passThrough = ((JoinOperator)iterative.join((DataSet)adjustedScores).where(new int[]{0}).equalTo(new int[]{0}).with(new ChangeInScores()).setParallelism(this.parallelism)).name("Change in scores");
            iterative.registerAggregationConvergenceCriterion(CHANGE_IN_SCORES, (Aggregator)new DoubleSumAggregator(), (ConvergenceCriterion)new ScoreConvergence(this.convergenceThreshold));
        } else {
            passThrough = adjustedScores;
        }
        return ((MapOperator)iterative.closeWith((DataSet)passThrough).map(new TranslateResult()).setParallelism(this.parallelism)).name("Map result");
    }

    public static class Result<T>
    extends UnaryResultBase<T>
    implements PrintableResult {
        private DoubleValue pageRankScore;
        public static final int HASH_SEED = 1074835241;
        private transient MurmurHash hasher;

        public DoubleValue getPageRankScore() {
            return this.pageRankScore;
        }

        public void setPageRankScore(DoubleValue pageRankScore) {
            this.pageRankScore = pageRankScore;
        }

        @Override
        public String toString() {
            return "(" + this.getVertexId0() + "," + this.pageRankScore + ")";
        }

        @Override
        public String toPrintableString() {
            return "Vertex ID: " + this.getVertexId0() + ", PageRank score: " + this.pageRankScore;
        }

        public int hashCode() {
            if (this.hasher == null) {
                this.hasher = new MurmurHash(1074835241);
            }
            return this.hasher.reset().hash(this.getVertexId0().hashCode()).hash(this.pageRankScore.getValue()).hash();
        }
    }

    @FunctionAnnotation.ForwardedFields(value={"0->vertexId0; 1->pageRankScore"})
    private static class TranslateResult<T>
    implements MapFunction<Tuple2<T, DoubleValue>, Result<T>> {
        private Result<T> output = new Result();

        private TranslateResult() {
        }

        public Result<T> map(Tuple2<T, DoubleValue> value) throws Exception {
            this.output.setVertexId0(value.f0);
            this.output.setPageRankScore((DoubleValue)value.f1);
            return this.output;
        }
    }

    private static class ScoreConvergence
    implements ConvergenceCriterion<DoubleValue> {
        private double convergenceThreshold;

        public ScoreConvergence(double convergenceThreshold) {
            this.convergenceThreshold = convergenceThreshold;
        }

        public boolean isConverged(int iteration, DoubleValue value) {
            double val = value.getValue();
            return val <= this.convergenceThreshold;
        }
    }

    @FunctionAnnotation.ForwardedFieldsFirst(value={"0"})
    @FunctionAnnotation.ForwardedFieldsSecond(value={"*"})
    private static class ChangeInScores<T>
    extends RichJoinFunction<Tuple2<T, DoubleValue>, Tuple2<T, DoubleValue>, Tuple2<T, DoubleValue>> {
        private double changeInScores;

        private ChangeInScores() {
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            this.changeInScores = 0.0;
        }

        public void close() throws Exception {
            super.close();
            DoubleSumAggregator agg = (DoubleSumAggregator)this.getIterationRuntimeContext().getIterationAggregator(PageRank.CHANGE_IN_SCORES);
            agg.aggregate(this.changeInScores);
        }

        public Tuple2<T, DoubleValue> join(Tuple2<T, DoubleValue> first, Tuple2<T, DoubleValue> second) throws Exception {
            this.changeInScores += Math.abs(((DoubleValue)second.f1).getValue() - ((DoubleValue)first.f1).getValue());
            return second;
        }
    }

    @FunctionAnnotation.ForwardedFields(value={"0"})
    private static class AdjustScores<T>
    extends RichMapFunction<Tuple2<T, DoubleValue>, Tuple2<T, DoubleValue>> {
        private double dampingFactor;
        private long vertexCount;
        private double uniformlyDistributedScore;

        public AdjustScores(double dampingFactor) {
            this.dampingFactor = dampingFactor;
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            List sumOfScores = this.getRuntimeContext().getBroadcastVariable(PageRank.SUM_OF_SCORES);
            Iterator sumOfScoresIterator = sumOfScores.iterator();
            double sumOfSinks = 1.0 - (sumOfScoresIterator.hasNext() ? ((DoubleValue)((Tuple2)sumOfScoresIterator.next()).f1).getValue() : 0.0);
            List vertexCount = this.getRuntimeContext().getBroadcastVariable(PageRank.VERTEX_COUNT);
            Iterator vertexCountIterator = vertexCount.iterator();
            this.vertexCount = vertexCountIterator.hasNext() ? ((LongValue)vertexCountIterator.next()).getValue() : 0L;
            this.uniformlyDistributedScore = (1.0 - this.dampingFactor + this.dampingFactor * sumOfSinks) / (double)this.vertexCount;
        }

        public Tuple2<T, DoubleValue> map(Tuple2<T, DoubleValue> value) throws Exception {
            ((DoubleValue)value.f1).setValue(this.uniformlyDistributedScore + this.dampingFactor * ((DoubleValue)value.f1).getValue());
            return value;
        }
    }

    @FunctionAnnotation.ForwardedFields(value={"0"})
    private static class SumVertexScores<T>
    implements ReduceFunction<Tuple2<T, DoubleValue>> {
        private SumVertexScores() {
        }

        public Tuple2<T, DoubleValue> reduce(Tuple2<T, DoubleValue> first, Tuple2<T, DoubleValue> second) throws Exception {
            ((DoubleValue)first.f1).setValue(((DoubleValue)first.f1).getValue() + ((DoubleValue)second.f1).getValue());
            return first;
        }
    }

    @FunctionAnnotation.ForwardedFieldsSecond(value={"1->0"})
    private static class SendScore<T>
    implements CoGroupFunction<Tuple2<T, DoubleValue>, Edge<T, LongValue>, Tuple2<T, DoubleValue>> {
        private Tuple2<T, DoubleValue> output = new Tuple2(null, (Object)new DoubleValue());

        private SendScore() {
        }

        public void coGroup(Iterable<Tuple2<T, DoubleValue>> vertex, Iterable<Edge<T, LongValue>> edges, Collector<Tuple2<T, DoubleValue>> out) throws Exception {
            Iterator<Edge<T, LongValue>> edgeIterator = edges.iterator();
            if (edgeIterator.hasNext()) {
                Edge<T, LongValue> edge = edgeIterator.next();
                this.output.f0 = edge.f1;
                ((DoubleValue)this.output.f1).setValue(((DoubleValue)vertex.iterator().next().f1).getValue() / (double)((LongValue)edge.f2).getValue());
                out.collect(this.output);
                while (edgeIterator.hasNext()) {
                    edge = edgeIterator.next();
                    this.output.f0 = edge.f1;
                    out.collect(this.output);
                }
            }
        }
    }

    @FunctionAnnotation.ForwardedFields(value={"0"})
    private static class InitializeVertexScores<T>
    extends RichMapFunction<Vertex<T, VertexDegrees.Degrees>, Tuple2<T, DoubleValue>> {
        private Tuple2<T, DoubleValue> output = new Tuple2();

        private InitializeVertexScores() {
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            List vertexCount = this.getRuntimeContext().getBroadcastVariable(PageRank.VERTEX_COUNT);
            Iterator vertexCountIterator = vertexCount.iterator();
            this.output.f1 = new DoubleValue(vertexCountIterator.hasNext() ? 1.0 / (double)((LongValue)vertexCountIterator.next()).getValue() : Double.NaN);
        }

        public Tuple2<T, DoubleValue> map(Vertex<T, VertexDegrees.Degrees> vertex) throws Exception {
            this.output.f0 = vertex.f0;
            return this.output;
        }
    }

    @FunctionAnnotation.ForwardedFields(value={"0"})
    private static class InitializeSourceVertices<T>
    implements FlatMapFunction<Vertex<T, VertexDegrees.Degrees>, Tuple2<T, DoubleValue>> {
        private Tuple2<T, DoubleValue> output = new Tuple2(null, (Object)new DoubleValue(0.0));

        private InitializeSourceVertices() {
        }

        public void flatMap(Vertex<T, VertexDegrees.Degrees> vertex, Collector<Tuple2<T, DoubleValue>> out) throws Exception {
            if (((VertexDegrees.Degrees)((Object)vertex.f1)).getInDegree().getValue() == 0L) {
                this.output.f0 = vertex.f0;
                out.collect(this.output);
            }
        }
    }

    @FunctionAnnotation.ForwardedFields(value={"0; 1"})
    private static class ExtractSourceDegree<T, ET>
    implements MapFunction<Edge<T, Tuple2<ET, VertexDegrees.Degrees>>, Edge<T, LongValue>> {
        Edge<T, LongValue> output = new Edge();

        private ExtractSourceDegree() {
        }

        public Edge<T, LongValue> map(Edge<T, Tuple2<ET, VertexDegrees.Degrees>> edge) throws Exception {
            this.output.f0 = edge.f0;
            this.output.f1 = edge.f1;
            this.output.f2 = ((VertexDegrees.Degrees)((Object)((Tuple2)edge.f2).f1)).getOutDegree();
            return this.output;
        }
    }
}

