/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.functional;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.testutils.FunctionalTestHarness;
import org.apache.hudi.utilities.HoodieSnapshotExporter;
import org.apache.hudi.utilities.exception.HoodieSnapshotExporterException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tag(value="functional")
public class TestHoodieSnapshotExporter
extends FunctionalTestHarness {
    static final Logger LOG = LogManager.getLogger(TestHoodieSnapshotExporter.class);
    static final int NUM_RECORDS = 100;
    static final String COMMIT_TIME = "20200101000000";
    static final String PARTITION_PATH = "2020";
    static final String TABLE_NAME = "testing";
    String sourcePath;
    String targetPath;

    @BeforeEach
    public void init() throws Exception {
        this.sourcePath = this.dfsBasePath() + "/source/";
        this.targetPath = this.dfsBasePath() + "/target/";
        this.dfs().mkdirs(new Path(this.sourcePath));
        HoodieTableMetaClient.initTableType((Configuration)this.jsc().hadoopConfiguration(), (String)this.sourcePath, (HoodieTableType)HoodieTableType.COPY_ON_WRITE, (String)TABLE_NAME, (String)HoodieAvroPayload.class.getName());
        HoodieWriteConfig cfg = this.getHoodieWriteConfig(this.sourcePath);
        SparkRDDWriteClient hdfsWriteClient = new SparkRDDWriteClient(this.context(), cfg);
        hdfsWriteClient.startCommitWithTime(COMMIT_TIME);
        HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(new String[]{PARTITION_PATH});
        List records = dataGen.generateInserts(COMMIT_TIME, Integer.valueOf(100));
        JavaRDD recordsRDD = this.jsc().parallelize(records, 1);
        hdfsWriteClient.bulkInsert(recordsRDD, COMMIT_TIME);
        hdfsWriteClient.close();
        RemoteIterator itr = this.dfs().listFiles(new Path(this.sourcePath), true);
        while (itr.hasNext()) {
            LOG.info((Object)(">>> Prepared test file: " + ((LocatedFileStatus)itr.next()).getPath()));
        }
    }

    @AfterEach
    public void cleanUp() throws IOException {
        this.dfs().delete(new Path(this.sourcePath), true);
        this.dfs().delete(new Path(this.targetPath), true);
    }

    private HoodieWriteConfig getHoodieWriteConfig(String basePath) {
        return HoodieWriteConfig.newBuilder().withPath(basePath).withEmbeddedTimelineServerEnabled(false).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": null, \"type\": {\"type\": \"array\", \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withBulkInsertParallelism(2).withDeleteParallelism(2).forTable(TABLE_NAME).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
    }

    @Nested
    public class TestHoodieSnapshotExporterForRepartitioning {
        private HoodieSnapshotExporter.Config cfg;

        @BeforeEach
        public void setUp() {
            this.cfg = new HoodieSnapshotExporter.Config();
            this.cfg.sourceBasePath = TestHoodieSnapshotExporter.this.sourcePath;
            this.cfg.targetOutputPath = TestHoodieSnapshotExporter.this.targetPath;
            this.cfg.outputFormat = "json";
        }

        @Test
        public void testExportWithPartitionField() throws IOException {
            this.cfg.outputPartitionField = "driver";
            new HoodieSnapshotExporter().export(TestHoodieSnapshotExporter.this.jsc(), this.cfg);
            Assertions.assertEquals((long)100L, (long)TestHoodieSnapshotExporter.this.sqlContext().read().format("json").load(TestHoodieSnapshotExporter.this.targetPath).count());
            Assertions.assertTrue((boolean)TestHoodieSnapshotExporter.this.dfs().exists(new Path(TestHoodieSnapshotExporter.this.targetPath + "/_SUCCESS")));
            Assertions.assertTrue((TestHoodieSnapshotExporter.this.dfs().listStatus(new Path(TestHoodieSnapshotExporter.this.targetPath)).length > 1 ? 1 : 0) != 0);
        }

        @Test
        public void testExportForUserDefinedPartitioner() throws IOException {
            this.cfg.outputPartitioner = UserDefinedPartitioner.class.getName();
            new HoodieSnapshotExporter().export(TestHoodieSnapshotExporter.this.jsc(), this.cfg);
            Assertions.assertEquals((long)100L, (long)TestHoodieSnapshotExporter.this.sqlContext().read().format("json").load(TestHoodieSnapshotExporter.this.targetPath).count());
            Assertions.assertTrue((boolean)TestHoodieSnapshotExporter.this.dfs().exists(new Path(TestHoodieSnapshotExporter.this.targetPath + "/_SUCCESS")));
            Assertions.assertTrue((boolean)TestHoodieSnapshotExporter.this.dfs().exists(new Path(String.format("%s/%s=%s", TestHoodieSnapshotExporter.this.targetPath, "year", TestHoodieSnapshotExporter.PARTITION_PATH))));
        }
    }

    public static class UserDefinedPartitioner
    implements HoodieSnapshotExporter.Partitioner {
        public static final String PARTITION_NAME = "year";

        public DataFrameWriter<Row> partition(Dataset<Row> source) {
            return source.withColumnRenamed("_hoodie_partition_path", PARTITION_NAME).repartition(new Column[]{new Column(PARTITION_NAME)}).write().partitionBy(new String[]{PARTITION_NAME});
        }
    }

    @Nested
    public class TestHoodieSnapshotExporterForNonHudi {
        @ParameterizedTest
        @ValueSource(strings={"json", "parquet"})
        public void testExportAsNonHudi(String format) throws IOException {
            HoodieSnapshotExporter.Config cfg = new HoodieSnapshotExporter.Config();
            cfg.sourceBasePath = TestHoodieSnapshotExporter.this.sourcePath;
            cfg.targetOutputPath = TestHoodieSnapshotExporter.this.targetPath;
            cfg.outputFormat = format;
            new HoodieSnapshotExporter().export(TestHoodieSnapshotExporter.this.jsc(), cfg);
            Assertions.assertEquals((long)100L, (long)TestHoodieSnapshotExporter.this.sqlContext().read().format(format).load(TestHoodieSnapshotExporter.this.targetPath).count());
            Assertions.assertTrue((boolean)TestHoodieSnapshotExporter.this.dfs().exists(new Path(TestHoodieSnapshotExporter.this.targetPath + "/_SUCCESS")));
        }
    }

    @Nested
    public class TestHoodieSnapshotExporterForEarlyAbort {
        private HoodieSnapshotExporter.Config cfg;

        @BeforeEach
        public void setUp() {
            this.cfg = new HoodieSnapshotExporter.Config();
            this.cfg.sourceBasePath = TestHoodieSnapshotExporter.this.sourcePath;
            this.cfg.targetOutputPath = TestHoodieSnapshotExporter.this.targetPath;
            this.cfg.outputFormat = "hudi";
        }

        @Test
        public void testExportWhenTargetPathExists() throws IOException {
            TestHoodieSnapshotExporter.this.dfs().mkdirs(new Path(TestHoodieSnapshotExporter.this.targetPath));
            Throwable thrown = Assertions.assertThrows(HoodieSnapshotExporterException.class, () -> new HoodieSnapshotExporter().export(TestHoodieSnapshotExporter.this.jsc(), this.cfg));
            Assertions.assertEquals((Object)"The target output path already exists.", (Object)thrown.getMessage());
        }

        @Test
        public void testExportDatasetWithNoCommit() throws IOException {
            List commitFiles = Arrays.stream(TestHoodieSnapshotExporter.this.dfs().listStatus(new Path(TestHoodieSnapshotExporter.this.sourcePath + "/.hoodie"))).map(FileStatus::getPath).filter(filePath -> filePath.getName().endsWith(".commit")).collect(Collectors.toList());
            for (Path p : commitFiles) {
                TestHoodieSnapshotExporter.this.dfs().delete(p, false);
            }
            Throwable thrown = Assertions.assertThrows(HoodieSnapshotExporterException.class, () -> new HoodieSnapshotExporter().export(TestHoodieSnapshotExporter.this.jsc(), this.cfg));
            Assertions.assertEquals((Object)"No commits present. Nothing to snapshot.", (Object)thrown.getMessage());
        }

        @Test
        public void testExportDatasetWithNoPartition() throws IOException {
            TestHoodieSnapshotExporter.this.dfs().delete(new Path(TestHoodieSnapshotExporter.this.sourcePath + "/" + TestHoodieSnapshotExporter.PARTITION_PATH), true);
            Throwable thrown = Assertions.assertThrows(HoodieSnapshotExporterException.class, () -> new HoodieSnapshotExporter().export(TestHoodieSnapshotExporter.this.jsc(), this.cfg));
            Assertions.assertEquals((Object)"The source dataset has 0 partition to snapshot.", (Object)thrown.getMessage());
        }
    }

    @Nested
    public class TestHoodieSnapshotExporterForHudi {
        private HoodieSnapshotExporter.Config cfg;

        @BeforeEach
        public void setUp() {
            this.cfg = new HoodieSnapshotExporter.Config();
            this.cfg.sourceBasePath = TestHoodieSnapshotExporter.this.sourcePath;
            this.cfg.targetOutputPath = TestHoodieSnapshotExporter.this.targetPath;
            this.cfg.outputFormat = "hudi";
        }

        @Test
        public void testExportAsHudi() throws IOException {
            new HoodieSnapshotExporter().export(TestHoodieSnapshotExporter.this.jsc(), this.cfg);
            Assertions.assertTrue((boolean)TestHoodieSnapshotExporter.this.dfs().exists(new Path(TestHoodieSnapshotExporter.this.targetPath + "/.hoodie/" + TestHoodieSnapshotExporter.COMMIT_TIME + ".commit")));
            Assertions.assertTrue((boolean)TestHoodieSnapshotExporter.this.dfs().exists(new Path(TestHoodieSnapshotExporter.this.targetPath + "/.hoodie/" + TestHoodieSnapshotExporter.COMMIT_TIME + ".commit.requested")));
            Assertions.assertTrue((boolean)TestHoodieSnapshotExporter.this.dfs().exists(new Path(TestHoodieSnapshotExporter.this.targetPath + "/.hoodie/" + TestHoodieSnapshotExporter.COMMIT_TIME + ".inflight")));
            Assertions.assertTrue((boolean)TestHoodieSnapshotExporter.this.dfs().exists(new Path(TestHoodieSnapshotExporter.this.targetPath + "/.hoodie/hoodie.properties")));
            String partition = TestHoodieSnapshotExporter.this.targetPath + "/" + TestHoodieSnapshotExporter.PARTITION_PATH;
            long numParquetFiles = Arrays.stream(TestHoodieSnapshotExporter.this.dfs().listStatus(new Path(partition))).filter(fileStatus -> fileStatus.getPath().toString().endsWith(".parquet")).count();
            Assertions.assertTrue((numParquetFiles >= 1L ? 1 : 0) != 0, (String)"There should exist at least 1 parquet file.");
            Assertions.assertEquals((long)100L, (long)TestHoodieSnapshotExporter.this.sqlContext().read().parquet(partition).count());
            Assertions.assertTrue((boolean)TestHoodieSnapshotExporter.this.dfs().exists(new Path(partition + "/.hoodie_partition_metadata")));
            Assertions.assertTrue((boolean)TestHoodieSnapshotExporter.this.dfs().exists(new Path(TestHoodieSnapshotExporter.this.targetPath + "/_SUCCESS")));
        }
    }
}

