package org.apache.hudi.writers;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.DecoderFactory;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.connect.writers.AbstractConnectWriter;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.schema.SchemaProvider;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

/* loaded from: input_file:org/apache/hudi/writers/TestAbstractConnectWriter.class */
public class TestAbstractConnectWriter {
    private static final String TOPIC_NAME = "kafka-connect-test-topic";
    private static final int PARTITION_NUMBER = 4;
    private static final int NUM_RECORDS = 10;
    private static final int RECORD_KEY_INDEX = 0;
    private KafkaConnectConfigs configs;
    private TestKeyGenerator keyGenerator;
    private SchemaProvider schemaProvider;
    private long currentKafkaOffset;

    /* loaded from: input_file:org/apache/hudi/writers/TestAbstractConnectWriter$AbstractHudiConnectWriterTestWrapper.class */
    private static class AbstractHudiConnectWriterTestWrapper extends AbstractConnectWriter {
        private List<HoodieRecord> writtenRecords;

        public AbstractHudiConnectWriterTestWrapper(KafkaConnectConfigs kafkaConnectConfigs, KeyGenerator keyGenerator, SchemaProvider schemaProvider) {
            super(kafkaConnectConfigs, keyGenerator, schemaProvider, "000");
            this.writtenRecords = new ArrayList();
        }

        public List<HoodieRecord> getWrittenRecords() {
            return this.writtenRecords;
        }

        protected void writeHudiRecord(HoodieRecord<?> hoodieRecord) {
            this.writtenRecords.add(hoodieRecord);
        }

        protected List<WriteStatus> flushRecords() {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/hudi/writers/TestAbstractConnectWriter$TestInputFormats.class */
    private enum TestInputFormats {
        AVRO,
        JSON_STRING
    }

    /* loaded from: input_file:org/apache/hudi/writers/TestAbstractConnectWriter$TestKeyGenerator.class */
    static class TestKeyGenerator extends KeyGenerator {
        protected TestKeyGenerator(TypedProperties typedProperties) {
            super(typedProperties);
        }

        public HoodieKey getKey(GenericRecord genericRecord) {
            return new HoodieKey(genericRecord.get(TestAbstractConnectWriter.RECORD_KEY_INDEX).toString(), "000/00/00");
        }
    }

    /* loaded from: input_file:org/apache/hudi/writers/TestAbstractConnectWriter$TestSchemaProvider.class */
    static class TestSchemaProvider extends SchemaProvider {
        public Schema getSourceSchema() {
            try {
                return SchemaTestUtil.getSimpleSchema();
            } catch (IOException e) {
                throw new HoodieException("Fatal error parsing schema", e);
            }
        }
    }

    @BeforeEach
    public void setUp() throws Exception {
        this.keyGenerator = new TestKeyGenerator(new TypedProperties());
        this.schemaProvider = new TestSchemaProvider();
    }

    @EnumSource(TestInputFormats.class)
    @ParameterizedTest
    public void testAbstractWriterForAllFormats(TestInputFormats testInputFormats) throws Exception {
        Object obj;
        List generateTestRecords;
        List list;
        Schema sourceSchema = this.schemaProvider.getSourceSchema();
        switch (testInputFormats) {
            case JSON_STRING:
                obj = "org.apache.kafka.connect.storage.StringConverter";
                GenericDatumReader genericDatumReader = new GenericDatumReader(sourceSchema, sourceSchema);
                generateTestRecords = SchemaTestUtil.generateTestJsonRecords(RECORD_KEY_INDEX, NUM_RECORDS);
                list = (List) generateTestRecords.stream().map(str -> {
                    try {
                        return HoodieAvroUtils.rewriteRecord((GenericRecord) genericDatumReader.read((Object) null, DecoderFactory.get().jsonDecoder(sourceSchema, str)), sourceSchema);
                    } catch (IOException e) {
                        throw new HoodieException("Error converting JSON records to AVRO");
                    }
                }).map(genericRecord -> {
                    return convertToHoodieRecords(genericRecord, genericRecord.get(RECORD_KEY_INDEX).toString(), "000/00/00");
                }).collect(Collectors.toList());
                break;
            case AVRO:
                obj = "io.confluent.connect.avro.AvroConverter";
                generateTestRecords = SchemaTestUtil.generateTestRecords(RECORD_KEY_INDEX, NUM_RECORDS);
                list = (List) generateTestRecords.stream().map(obj2 -> {
                    return HoodieAvroUtils.rewriteRecord((GenericRecord) obj2, sourceSchema);
                }).map(genericRecord2 -> {
                    return convertToHoodieRecords(genericRecord2, genericRecord2.get(RECORD_KEY_INDEX).toString(), "000/00/00");
                }).collect(Collectors.toList());
                break;
            default:
                throw new HoodieException("Unknown test scenario " + testInputFormats);
        }
        this.configs = KafkaConnectConfigs.newBuilder().withProperties(Collections.singletonMap("value.converter", obj)).build();
        AbstractHudiConnectWriterTestWrapper abstractHudiConnectWriterTestWrapper = new AbstractHudiConnectWriterTestWrapper(this.configs, this.keyGenerator, this.schemaProvider);
        for (int i = RECORD_KEY_INDEX; i < NUM_RECORDS; i++) {
            abstractHudiConnectWriterTestWrapper.writeRecord(getNextKafkaRecord(generateTestRecords.get(i)));
        }
        validateRecords(abstractHudiConnectWriterTestWrapper.getWrittenRecords(), list);
    }

    private static void validateRecords(List<HoodieRecord> list, List<HoodieRecord> list2) {
        Assertions.assertEquals(list.size(), list2.size());
        list.sort(Comparator.comparing((v0) -> {
            return v0.getRecordKey();
        }));
        list2.sort(Comparator.comparing((v0) -> {
            return v0.getRecordKey();
        }));
        Iterator<HoodieRecord> it = list.iterator();
        Iterator<HoodieRecord> it2 = list2.iterator();
        while (it.hasNext()) {
            Assertions.assertEquals(it.next().getRecordKey(), it2.next().getRecordKey());
        }
    }

    private SinkRecord getNextKafkaRecord(Object obj) {
        org.apache.kafka.connect.data.Schema schema = org.apache.kafka.connect.data.Schema.OPTIONAL_BYTES_SCHEMA;
        byte[] bytes = ("key-" + this.currentKafkaOffset).getBytes();
        org.apache.kafka.connect.data.Schema schema2 = org.apache.kafka.connect.data.Schema.OPTIONAL_BYTES_SCHEMA;
        long j = this.currentKafkaOffset;
        this.currentKafkaOffset = j + 1;
        return new SinkRecord(TOPIC_NAME, PARTITION_NUMBER, schema, bytes, schema2, obj, j);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static HoodieRecord convertToHoodieRecords(IndexedRecord indexedRecord, String str, String str2) {
        return new HoodieAvroRecord(new HoodieKey(str, str2), new HoodieAvroPayload(Option.of((GenericRecord) indexedRecord)));
    }
}
