/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cep.nfa;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.cep.Event;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.NFAState;
import org.apache.flink.cep.nfa.NFAStateSerializer;
import org.apache.flink.cep.nfa.State;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.WithinType;
import org.apache.flink.cep.pattern.conditions.BooleanConditions;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.cep.utils.NFATestHarness;
import org.apache.flink.cep.utils.NFAUtils;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

public class NFATest
extends TestLogger {
    @Test
    public void testSimpleNFA() throws Exception {
        ArrayList<StreamRecord<Event>> streamEvents = new ArrayList<StreamRecord<Event>>();
        streamEvents.add(new StreamRecord((Object)new Event(1, "start", 1.0), 1L));
        streamEvents.add(new StreamRecord((Object)new Event(2, "bar", 2.0), 2L));
        streamEvents.add(new StreamRecord((Object)new Event(3, "start", 3.0), 3L));
        streamEvents.add(new StreamRecord((Object)new Event(4, "end", 4.0), 4L));
        State startState = new State("start", State.StateType.Start);
        State endState = new State("end", State.StateType.Normal);
        State endingState = new State("", State.StateType.Final);
        startState.addTake(endState, (IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("start")));
        endState.addTake(endingState, (IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("end")));
        endState.addIgnore(BooleanConditions.trueFunction());
        ArrayList<State> states = new ArrayList<State>();
        states.add(startState);
        states.add(endState);
        states.add(endingState);
        ArrayList expectedPatterns = new ArrayList();
        HashMap<String, List<Event>> firstPattern = new HashMap<String, List<Event>>();
        firstPattern.put("start", Collections.singletonList(new Event(1, "start", 1.0)));
        firstPattern.put("end", Collections.singletonList(new Event(4, "end", 4.0)));
        HashMap<String, List<Event>> secondPattern = new HashMap<String, List<Event>>();
        secondPattern.put("start", Collections.singletonList(new Event(3, "start", 3.0)));
        secondPattern.put("end", Collections.singletonList(new Event(4, "end", 4.0)));
        expectedPatterns.add(firstPattern);
        expectedPatterns.add(secondPattern);
        NFA nfa = new NFA(states, Collections.emptyMap(), 0L, false);
        NFATestHarness nfaTestHarness = NFATestHarness.forNFA((NFA<Event>)nfa).build();
        Collection<Map<String, List<Event>>> actualPatterns = nfaTestHarness.consumeRecords(streamEvents);
        Assert.assertEquals(expectedPatterns, actualPatterns);
    }

    @Test
    public void testTimeoutWindowPruningWithinFirstAndLast() throws Exception {
        ArrayList<StreamRecord<Event>> streamEvents = new ArrayList<StreamRecord<Event>>();
        streamEvents.add(new StreamRecord((Object)new Event(1, "start", 1.0), 1L));
        streamEvents.add(new StreamRecord((Object)new Event(2, "bar", 2.0), 2L));
        streamEvents.add(new StreamRecord((Object)new Event(3, "start", 3.0), 3L));
        streamEvents.add(new StreamRecord((Object)new Event(4, "end", 4.0), 4L));
        ArrayList expectedPatterns = new ArrayList();
        HashMap<String, List<Event>> secondPattern = new HashMap<String, List<Event>>();
        secondPattern.put("start", Collections.singletonList(new Event(3, "start", 3.0)));
        secondPattern.put("end", Collections.singletonList(new Event(4, "end", 4.0)));
        expectedPatterns.add(secondPattern);
        NFA<Event> nfa = this.createStartEndNFA();
        NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).build();
        Collection<Map<String, List<Event>>> actualPatterns = nfaTestHarness.consumeRecords(streamEvents);
        Assert.assertEquals(expectedPatterns, actualPatterns);
    }

    @Test
    public void testTimeoutWindowPruningWithinPreviousAndNext() throws Exception {
        ArrayList<StreamRecord<Event>> streamEvents = new ArrayList<StreamRecord<Event>>();
        streamEvents.add(new StreamRecord((Object)new Event(1, "start", 1.0), 1L));
        streamEvents.add(new StreamRecord((Object)new Event(2, "end", 2.0), 2L));
        streamEvents.add(new StreamRecord((Object)new Event(3, "start", 3.0), 3L));
        streamEvents.add(new StreamRecord((Object)new Event(4, "end", 4.0), 6L));
        streamEvents.add(new StreamRecord((Object)new Event(5, "start", 5.0), 7L));
        streamEvents.add(new StreamRecord((Object)new Event(6, "end", 6.0), 8L));
        ArrayList expectedPatterns = new ArrayList();
        HashMap<String, List<Event>> secondPattern = new HashMap<String, List<Event>>();
        secondPattern.put("start", Collections.singletonList(new Event(1, "start", 1.0)));
        secondPattern.put("end", Collections.singletonList(new Event(2, "end", 2.0)));
        expectedPatterns.add(secondPattern);
        secondPattern = new HashMap();
        secondPattern.put("start", Collections.singletonList(new Event(5, "start", 5.0)));
        secondPattern.put("end", Collections.singletonList(new Event(6, "end", 6.0)));
        expectedPatterns.add(secondPattern);
        NFA<Event> nfa = this.createStartEndNFA(WithinType.PREVIOUS_AND_CURRENT);
        NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).build();
        Collection<Map<String, List<Event>>> actualPatterns = nfaTestHarness.consumeRecords(streamEvents);
        Assert.assertEquals(expectedPatterns, actualPatterns);
    }

    @Test
    public void testWindowBorders() throws Exception {
        ArrayList<StreamRecord<Event>> streamEvents = new ArrayList<StreamRecord<Event>>();
        streamEvents.add(new StreamRecord((Object)new Event(1, "start", 1.0), 1L));
        streamEvents.add(new StreamRecord((Object)new Event(2, "end", 2.0), 3L));
        List expectedPatterns = Collections.emptyList();
        NFA<Event> nfa = this.createStartEndNFA();
        NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).build();
        Collection<Map<String, List<Event>>> actualPatterns = nfaTestHarness.consumeRecords(streamEvents);
        Assert.assertEquals(expectedPatterns, actualPatterns);
    }

    @Test
    public void testTimeoutWindowPruningWindowBorders() throws Exception {
        ArrayList<StreamRecord<Event>> streamEvents = new ArrayList<StreamRecord<Event>>();
        streamEvents.add(new StreamRecord((Object)new Event(1, "start", 1.0), 1L));
        streamEvents.add(new StreamRecord((Object)new Event(2, "start", 2.0), 2L));
        streamEvents.add(new StreamRecord((Object)new Event(3, "foobar", 3.0), 3L));
        streamEvents.add(new StreamRecord((Object)new Event(4, "end", 4.0), 3L));
        ArrayList expectedPatterns = new ArrayList();
        HashMap<String, List<Event>> secondPattern = new HashMap<String, List<Event>>();
        secondPattern.put("start", Collections.singletonList(new Event(2, "start", 2.0)));
        secondPattern.put("end", Collections.singletonList(new Event(4, "end", 4.0)));
        expectedPatterns.add(secondPattern);
        NFA<Event> nfa = this.createStartEndNFA();
        NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).build();
        Collection<Map<String, List<Event>>> actualPatterns = nfaTestHarness.consumeRecords(streamEvents);
        Assert.assertEquals(expectedPatterns, actualPatterns);
    }

    @Test
    public void testNFASerialization() throws Exception {
        Pattern pattern1 = Pattern.begin((String)"start").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("a"))).followedByAny("middle").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("b"))).oneOrMore().optional().allowCombinations().followedByAny("end").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("d")));
        Pattern pattern2 = Pattern.begin((String)"start").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("a"))).notFollowedBy("not").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("c"))).followedByAny("middle").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("b"))).oneOrMore().optional().allowCombinations().followedByAny("end").where((IterativeCondition)new IterativeCondition<Event>(){
            private static final long serialVersionUID = 8061969839441121955L;

            public boolean filter(Event value, IterativeCondition.Context<Event> ctx) throws Exception {
                double sum = 0.0;
                for (Event e : ctx.getEventsForPattern("middle")) {
                    sum += e.getPrice();
                }
                return sum > 5.0;
            }
        });
        Pattern pattern3 = Pattern.begin((String)"start").notFollowedBy("not").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("c"))).followedByAny("middle").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("b"))).oneOrMore().allowCombinations().followedByAny("end").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("d")));
        ArrayList<Pattern> patterns = new ArrayList<Pattern>();
        patterns.add(pattern1);
        patterns.add(pattern2);
        patterns.add(pattern3);
        for (Pattern p : patterns) {
            NFA nfa = NFAUtils.compile(p, false);
            Event a = new Event(40, "a", 1.0);
            Event b = new Event(41, "b", 2.0);
            Event c = new Event(42, "c", 3.0);
            Event b1 = new Event(41, "b", 3.0);
            Event b2 = new Event(41, "b", 4.0);
            Event b3 = new Event(41, "b", 5.0);
            Event d = new Event(43, "d", 4.0);
            NFAState nfaState = nfa.createInitialNFAState();
            NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
            nfaTestHarness.consumeRecord((StreamRecord<Event>)new StreamRecord((Object)a, 1L));
            nfaTestHarness.consumeRecord((StreamRecord<Event>)new StreamRecord((Object)b, 2L));
            nfaTestHarness.consumeRecord((StreamRecord<Event>)new StreamRecord((Object)c, 3L));
            nfaTestHarness.consumeRecord((StreamRecord<Event>)new StreamRecord((Object)b1, 4L));
            nfaTestHarness.consumeRecord((StreamRecord<Event>)new StreamRecord((Object)b2, 5L));
            nfaTestHarness.consumeRecord((StreamRecord<Event>)new StreamRecord((Object)b3, 6L));
            nfaTestHarness.consumeRecord((StreamRecord<Event>)new StreamRecord((Object)d, 7L));
            nfaTestHarness.consumeRecord((StreamRecord<Event>)new StreamRecord((Object)a, 8L));
            NFAStateSerializer serializer = new NFAStateSerializer();
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            serializer.serialize(nfaState, (DataOutputView)new DataOutputViewStreamWrapper((OutputStream)baos));
            baos.close();
            ByteArrayInputStream in = new ByteArrayInputStream(baos.toByteArray());
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            serializer.duplicate().copy((DataInputView)new DataInputViewStreamWrapper((InputStream)in), (DataOutputView)new DataOutputViewStreamWrapper((OutputStream)out));
            in.close();
            out.close();
            ByteArrayInputStream bais = new ByteArrayInputStream(out.toByteArray());
            NFAState copy = (NFAState)serializer.duplicate().deserialize((DataInputView)new DataInputViewStreamWrapper((InputStream)bais));
            bais.close();
            Assert.assertEquals((Object)nfaState, (Object)copy);
        }
    }

    private NFA<Event> createStartEndNFA() {
        return this.createStartEndNFA(WithinType.FIRST_AND_LAST);
    }

    private NFA<Event> createStartEndNFA(WithinType withinType) {
        State startState = new State("start", State.StateType.Start);
        State endState = new State("end", State.StateType.Normal);
        State endingState = new State("", State.StateType.Final);
        startState.addTake(endState, (IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("start")));
        endState.addTake(endingState, (IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("end")));
        endState.addIgnore(BooleanConditions.trueFunction());
        ArrayList<State> states = new ArrayList<State>();
        states.add(startState);
        states.add(endState);
        states.add(endingState);
        boolean withinFirstAndLast = WithinType.FIRST_AND_LAST.equals((Object)withinType);
        return new NFA(states, withinFirstAndLast ? Collections.emptyMap() : Collections.singletonMap("end", 2L), withinFirstAndLast ? 2L : 0L, false);
    }
}

