/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connectors.hive;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connectors.hive.ContinuousHiveSplitEnumerator;
import org.apache.flink.connectors.hive.HiveTablePartition;
import org.apache.flink.connectors.hive.read.HiveContinuousPartitionContext;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.filesystem.ContinuousPartitionFetcher;
import org.apache.flink.table.filesystem.PartitionFetcher;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.mapred.JobConf;
import org.apache.http.util.Asserts;
import org.junit.Assert;
import org.junit.Test;

public class PartitionMonitorTest {
    private ContinuousHiveSplitEnumerator.PartitionMonitor<Long> partitionMonitor;
    private List<Partition> testPartitionWithOffset = new ArrayList<Partition>();

    @Test
    public void testPartitionWithSameCreateTime() throws Exception {
        this.preparePartitionMonitor();
        this.commitPartitionWithGivenCreateTime(Arrays.asList("p1=A1", "p2=B1"), 1);
        this.commitPartitionWithGivenCreateTime(Arrays.asList("p1=A1", "p2=B2"), 2);
        ContinuousHiveSplitEnumerator.NewSplitsAndState newSplitsAndState = this.partitionMonitor.call();
        this.assertPartitionEquals(Arrays.asList(Arrays.asList("p1=A1", "p2=B1"), Arrays.asList("p1=A1", "p2=B2")), newSplitsAndState.getSeenPartitions());
        this.commitPartitionWithGivenCreateTime(Arrays.asList("p1=A1", "p2=B3"), 3);
        newSplitsAndState = this.partitionMonitor.call();
        this.assertPartitionEquals(Collections.singletonList(Arrays.asList("p1=A1", "p2=B3")), newSplitsAndState.getSeenPartitions());
        this.commitPartitionWithGivenCreateTime(Arrays.asList("p1=A1", "p2=B4"), 3);
        this.commitPartitionWithGivenCreateTime(Arrays.asList("p1=A1", "p2=B5"), 4);
        newSplitsAndState = this.partitionMonitor.call();
        this.assertPartitionEquals(Arrays.asList(Arrays.asList("p1=A1", "p2=B4"), Arrays.asList("p1=A1", "p2=B5")), newSplitsAndState.getSeenPartitions());
    }

    private void assertPartitionEquals(Collection<List<String>> expected, Collection<List<String>> actual) {
        Assert.assertTrue((expected != null && actual != null && expected.size() == actual.size() ? 1 : 0) != 0);
        Assert.assertArrayEquals((Object[])expected.stream().map(Object::toString).sorted().toArray(), (Object[])actual.stream().map(Object::toString).sorted().toArray());
    }

    private void commitPartitionWithGivenCreateTime(List<String> partitionValues, Integer createTime) {
        StorageDescriptor sd = new StorageDescriptor();
        sd.setLocation("/tmp/test");
        Partition partition = new Partition(partitionValues, "testDb", "testTable", createTime.intValue(), createTime.intValue(), sd, null);
        partition.setValues(partitionValues);
        this.testPartitionWithOffset.add(partition);
    }

    private void preparePartitionMonitor() {
        ArrayList seenPartitionsSinceOffset = new ArrayList();
        JobConf jobConf = new JobConf();
        Configuration configuration = new Configuration();
        ObjectPath tablePath = new ObjectPath("testDb", "testTable");
        configuration.setString("streaming-source.consume-order", "create-time");
        HiveContinuousPartitionContext<Partition, Long> fetcherContext = new HiveContinuousPartitionContext<Partition, Long>(){

            public HiveTablePartition toHiveTablePartition(Partition partition) {
                StorageDescriptor sd = partition.getSd();
                HashMap<String, String> partitionColValues = new HashMap<String, String>();
                for (String partCol : partition.getValues()) {
                    String[] arr = partCol.split("=");
                    Asserts.check((arr.length == 2 ? 1 : 0) != 0, (String)"partition string should be key=value format");
                    partitionColValues.put(arr[0], arr[1]);
                }
                return new HiveTablePartition(sd, partitionColValues, new Properties());
            }

            public ObjectPath getTablePath() {
                return null;
            }

            public TypeSerializer<Long> getTypeSerializer() {
                return null;
            }

            public Long getConsumeStartOffset() {
                return null;
            }

            public void open() throws Exception {
            }

            public Optional<Partition> getPartition(List<String> partValues) throws Exception {
                return Optional.empty();
            }

            public List<PartitionFetcher.Context.ComparablePartitionValue> getComparablePartitionValueList() throws Exception {
                return null;
            }

            public void close() throws Exception {
            }
        };
        ContinuousPartitionFetcher<Partition, Long> continuousPartitionFetcher = new ContinuousPartitionFetcher<Partition, Long>(){
            private static final long serialVersionUID = 1L;

            public List<Tuple2<Partition, Long>> fetchPartitions(ContinuousPartitionFetcher.Context<Partition, Long> context, Long previousOffset) throws Exception {
                return PartitionMonitorTest.this.testPartitionWithOffset.stream().filter(p -> (long)p.getCreateTime() >= previousOffset).map(p -> Tuple2.of((Object)p, (Object)p.getCreateTime())).collect(Collectors.toList());
            }

            public List<Partition> fetch(PartitionFetcher.Context<Partition> context) throws Exception {
                return null;
            }
        };
        this.partitionMonitor = new ContinuousHiveSplitEnumerator.PartitionMonitor((Comparable)Long.valueOf(0L), seenPartitionsSinceOffset, tablePath, jobConf, (ContinuousPartitionFetcher)continuousPartitionFetcher, (HiveContinuousPartitionContext)fetcherContext);
    }
}

