package org.apache.hudi.sink.bucket;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.sink.clustering.FlinkClusteringConfig;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.FlinkMiniCluster;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestSQL;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@ExtendWith({FlinkMiniCluster.class})
/* loaded from: input_file:org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.class */
public class ITTestBucketStreamWrite {
    private static final Map<String, String> EXPECTED = new HashMap();

    @TempDir
    File tempFile;

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testBucketStreamWriteAfterRollbackFirstFileGroupCreation(boolean z) throws Exception {
        String absolutePath = this.tempFile.getAbsolutePath();
        doWrite(absolutePath, z);
        doDeleteCommit(absolutePath, z);
        doWrite(absolutePath, z);
        doWrite(absolutePath, z);
        if (z) {
            TestData.checkWrittenData(this.tempFile, EXPECTED, 4);
        } else {
            TestData.checkWrittenDataMOR(FSUtils.getFs(this.tempFile.getAbsolutePath(), new Configuration()), this.tempFile, EXPECTED, 4);
        }
    }

    private static void doDeleteCommit(String str, boolean z) throws Exception {
        FlinkClusteringConfig flinkClusteringConfig = new FlinkClusteringConfig();
        flinkClusteringConfig.path = str;
        org.apache.flink.configuration.Configuration flinkConfig = FlinkClusteringConfig.toFlinkConfig(flinkClusteringConfig);
        HoodieTableMetaClient createMetaClient = StreamerUtil.createMetaClient(flinkConfig);
        flinkConfig.setString(FlinkOptions.TABLE_TYPE, createMetaClient.getTableType().name());
        flinkConfig.setString(FlinkOptions.TABLE_NAME, createMetaClient.getTableConfig().getTableName());
        flinkConfig.setString(FlinkOptions.RECORD_KEY_FIELD, createMetaClient.getTableConfig().getRecordKeyFieldProp());
        flinkConfig.setString(FlinkOptions.PARTITION_PATH_FIELD, createMetaClient.getTableConfig().getPartitionFieldProp());
        CompactionUtil.setAvroSchema(flinkConfig, createMetaClient);
        HoodieTimeline filterCompletedInstants = createMetaClient.getActiveTimeline().filterCompletedInstants();
        Assertions.assertEquals(1L, filterCompletedInstants.getInstants().count());
        HoodieInstant hoodieInstant = (HoodieInstant) ((List) filterCompletedInstants.getInstants().collect(Collectors.toList())).get(0);
        String timestamp = hoodieInstant.getTimestamp();
        String fileName = ((HoodieInstant) ((List) filterCompletedInstants.getInstants().collect(Collectors.toList())).get(0)).getFileName();
        HoodieCommitMetadata hoodieCommitMetadata = (HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) createMetaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class);
        createMetaClient.getFs().delete(new Path(createMetaClient.getMetaPath() + "/" + fileName));
        IOType iOType = z ? IOType.CREATE : IOType.APPEND;
        hoodieCommitMetadata.getFileIdAndRelativePaths().forEach((str2, str3) -> {
            String[] split = str3.split("/");
            try {
                FileCreateUtils.createMarkerFile(str, split[0], timestamp, FSUtils.getCommitTime(split[1]), str2, iOType, z ? getWriteToken(split[1]) : FSUtils.getWriteTokenFromLogPath(new Path(str3)));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
    }

    private static String getWriteToken(String str) {
        Matcher matcher = Pattern.compile("_((\\d+)-(\\d+)-(\\d+))_").matcher(str);
        if (matcher.find()) {
            return matcher.group(1);
        }
        throw new RuntimeException("Invalid relative file path: " + str);
    }

    private static void doWrite(String str, boolean z) throws InterruptedException, ExecutionException {
        TableEnvironmentImpl create = TableEnvironmentImpl.create(EnvironmentSettings.newInstance().inBatchMode().build());
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkOptions.PATH.key(), str);
        hashMap.put(FlinkOptions.TABLE_TYPE.key(), z ? FlinkOptions.TABLE_TYPE_COPY_ON_WRITE : FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
        hashMap.put(FlinkOptions.INDEX_TYPE.key(), HoodieIndex.IndexType.BUCKET.name());
        hashMap.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), "1");
        create.executeSql(TestConfigurations.getCreateHoodieTableDDL("t1", hashMap));
        create.executeSql(TestSQL.INSERT_T1).await();
        TimeUnit.SECONDS.sleep(3L);
    }

    static {
        EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1000,par1, id2,par1,id2,Stephen,33,2000,par1]");
        EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3000,par2, id4,par2,id4,Fabian,31,4000,par2]");
        EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5000,par3, id6,par3,id6,Emma,20,6000,par3]");
        EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7000,par4, id8,par4,id8,Han,56,8000,par4]");
    }
}
