package org.apache.hudi.utilities.sources;

import java.io.IOException;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.config.S3SourceConfig;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.hudi.utilities.testutils.sources.AbstractCloudObjectsSourceTestBase;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/hudi/utilities/sources/TestS3EventsSource.class */
public class TestS3EventsSource extends AbstractCloudObjectsSourceTestBase {
    @Override // org.apache.hudi.utilities.testutils.sources.AbstractCloudObjectsSourceTestBase, org.apache.hudi.utilities.testutils.UtilitiesTestBase
    @BeforeEach
    public void setup() throws Exception {
        super.setup();
        this.dfsRoot = basePath + "/parquetFiles";
        this.fileSuffix = ".parquet";
        fs.mkdirs(new Path(this.dfsRoot));
        this.schemaProvider = new FilebasedSchemaProvider(UtilitiesTestBase.Helpers.setupSchemaOnDFS("streamer-config", "s3-metadata.avsc"), jsc);
    }

    @Override // org.apache.hudi.utilities.testutils.sources.AbstractCloudObjectsSourceTestBase, org.apache.hudi.utilities.testutils.UtilitiesTestBase
    @AfterEach
    public void teardown() throws Exception {
        super.teardown();
    }

    @Test
    public void testReadingFromSource() throws IOException {
        SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(prepareCloudObjectSource());
        generateMessageInQueue(null);
        Assertions.assertEquals(Option.empty(), sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
        generateMessageInQueue("1");
        InputBatch fetchNewDataInAvroFormat = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        Assertions.assertEquals(1L, ((JavaRDD) fetchNewDataInAvroFormat.getBatch().get()).count());
        generateMessageInQueue("2");
        InputBatch fetchNewDataInAvroFormat2 = sourceFormatAdapter.fetchNewDataInAvroFormat(Option.of(fetchNewDataInAvroFormat.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals(1L, ((JavaRDD) fetchNewDataInAvroFormat2.getBatch().get()).count());
        Assertions.assertEquals("2.parquet", ((GenericRecord) ((GenericRecord) ((GenericRecord) ((JavaRDD) fetchNewDataInAvroFormat2.getBatch().get()).rdd().first()).get("s3")).get("object")).get("key").toString());
    }

    @Override // org.apache.hudi.utilities.testutils.sources.AbstractCloudObjectsSourceTestBase
    public Source prepareCloudObjectSource() {
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.setProperty(S3SourceConfig.S3_SOURCE_QUEUE_URL.key(), this.sqsUrl);
        typedProperties.setProperty(S3SourceConfig.S3_SOURCE_QUEUE_REGION.key(), this.regionName);
        typedProperties.setProperty(S3SourceConfig.S3_SOURCE_QUEUE_FS.key(), "hdfs");
        S3EventsSource s3EventsSource = new S3EventsSource(typedProperties, jsc, sparkSession, this.schemaProvider);
        s3EventsSource.sqs = this.sqs;
        return s3EventsSource;
    }

    @Override // org.apache.hudi.utilities.testutils.sources.AbstractCloudObjectsSourceTestBase
    public void writeNewDataToFile(List<HoodieRecord> list, Path path) throws IOException {
        UtilitiesTestBase.Helpers.saveParquetToDFS(UtilitiesTestBase.Helpers.toGenericRecords(list), path);
    }
}
