/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.testutils;

import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieList;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;

public class HoodieFlinkClientTestHarness
extends HoodieCommonTestHarness
implements Serializable {
    protected static final Logger LOG = LogManager.getLogger(HoodieFlinkClientTestHarness.class);
    private String testMethodName;
    protected transient Configuration hadoopConf = null;
    protected transient FileSystem fs;
    protected transient MiniClusterWithClientResource flinkCluster = null;
    protected transient HoodieFlinkEngineContext context = null;
    protected transient ExecutorService executorService;
    protected transient HoodieFlinkWriteClient writeClient;
    protected transient HoodieTableFileSystemView tableView;
    protected final FlinkTaskContextSupplier supplier = new FlinkTaskContextSupplier(null);
    protected transient HdfsTestService hdfsTestService;
    protected transient MiniDFSCluster dfsCluster;
    protected transient DistributedFileSystem dfs;

    @BeforeEach
    public void setTestMethodName(TestInfo testInfo) {
        this.testMethodName = testInfo.getTestMethod().isPresent() ? ((Method)testInfo.getTestMethod().get()).getName() : "Unknown";
    }

    protected void initFlinkMiniCluster() {
        this.flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberSlotsPerTaskManager(2).setNumberTaskManagers(1).build());
    }

    protected void initFileSystem() {
        this.hadoopConf = new Configuration();
        this.initFileSystemWithConfiguration(this.hadoopConf);
        this.context = new HoodieFlinkEngineContext((TaskContextSupplier)this.supplier);
    }

    private void initFileSystemWithConfiguration(Configuration configuration) {
        if (this.basePath == null) {
            throw new IllegalStateException("The base path has not been initialized.");
        }
        this.fs = FSUtils.getFs((String)this.basePath, (Configuration)configuration);
        if (this.fs instanceof LocalFileSystem) {
            LocalFileSystem lfs = (LocalFileSystem)this.fs;
            lfs.setVerifyChecksum(true);
        }
    }

    protected void initMetaClient() throws IOException {
        this.initMetaClient(this.getTableType());
    }

    protected void initMetaClient(HoodieTableType tableType) throws IOException {
        if (this.basePath == null) {
            throw new IllegalStateException("The base path has not been initialized.");
        }
        this.metaClient = HoodieTestUtils.init((Configuration)this.hadoopConf, (String)this.basePath, (HoodieTableType)tableType);
    }

    protected List<HoodieRecord> tagLocation(HoodieIndex index, List<HoodieRecord> records, HoodieTable table) {
        return HoodieList.getList((HoodieData)index.tagLocation((HoodieData)HoodieList.of(records), (HoodieEngineContext)this.context, table));
    }

    protected void cleanupFileSystem() throws IOException {
        if (this.fs != null) {
            LOG.warn((Object)"Closing file-system instance used in previous test-run");
            this.fs.close();
            this.fs = null;
        }
    }

    public void cleanupResources() throws IOException {
        this.cleanupClients();
        this.cleanupFlinkContexts();
        this.cleanupTestDataGenerator();
        this.cleanupFileSystem();
        this.cleanupDFS();
        this.cleanupExecutorService();
        System.gc();
    }

    protected void cleanupFlinkMiniCluster() {
        if (this.flinkCluster != null) {
            this.flinkCluster.after();
            this.flinkCluster = null;
        }
    }

    protected void cleanupClients() throws IOException {
        if (this.metaClient != null) {
            this.metaClient = null;
        }
        if (this.writeClient != null) {
            this.writeClient.close();
            this.writeClient = null;
        }
        if (this.tableView != null) {
            this.tableView.close();
            this.tableView = null;
        }
    }

    protected void cleanupDFS() throws IOException {
        if (this.hdfsTestService != null) {
            this.hdfsTestService.stop();
            this.dfsCluster.shutdown();
            this.hdfsTestService = null;
            this.dfsCluster = null;
            this.dfs = null;
        }
        FileSystem.closeAll();
    }

    protected void cleanupExecutorService() {
        if (this.executorService != null) {
            this.executorService.shutdownNow();
            this.executorService = null;
        }
    }

    protected void cleanupFlinkContexts() {
        if (this.context != null) {
            LOG.info((Object)"Closing flink engine context used in previous test-case");
            this.context = null;
        }
    }

    public static class SimpleTestSinkFunction
    implements SinkFunction<HoodieRecord> {
        public static List<HoodieRecord> valuesList = new ArrayList<HoodieRecord>();

        public synchronized void invoke(HoodieRecord value, SinkFunction.Context context) throws Exception {
            valuesList.add(value);
        }
    }
}

