/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.writers;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.connect.writers.AbstractConnectWriter;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.schema.SchemaProvider;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

public class TestAbstractConnectWriter {
    private static final String TOPIC_NAME = "kafka-connect-test-topic";
    private static final int PARTITION_NUMBER = 4;
    private static final int NUM_RECORDS = 10;
    private static final int RECORD_KEY_INDEX = 0;
    private KafkaConnectConfigs configs;
    private TestKeyGenerator keyGenerator;
    private SchemaProvider schemaProvider;
    private long currentKafkaOffset;

    @BeforeEach
    public void setUp() throws Exception {
        this.keyGenerator = new TestKeyGenerator(new TypedProperties());
        this.schemaProvider = new TestSchemaProvider();
    }

    @ParameterizedTest
    @EnumSource(value=TestInputFormats.class)
    public void testAbstractWriterForAllFormats(TestInputFormats inputFormats) throws Exception {
        List<HoodieRecord> expectedRecords;
        List inputRecords;
        String formatConverter;
        Schema schema = this.schemaProvider.getSourceSchema();
        switch (inputFormats) {
            case JSON_STRING: {
                formatConverter = "org.apache.kafka.connect.storage.StringConverter";
                GenericDatumReader reader = new GenericDatumReader(schema, schema);
                inputRecords = SchemaTestUtil.generateTestJsonRecords((int)0, (int)10);
                expectedRecords = inputRecords.stream().map(s -> {
                    try {
                        return HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)reader.read(null, (Decoder)DecoderFactory.get().jsonDecoder(schema, s))), (Schema)schema);
                    }
                    catch (IOException exception) {
                        throw new HoodieException("Error converting JSON records to AVRO");
                    }
                }).map(p -> TestAbstractConnectWriter.convertToHoodieRecords((IndexedRecord)p, p.get(0).toString(), "000/00/00")).collect(Collectors.toList());
                break;
            }
            case AVRO: {
                formatConverter = "io.confluent.connect.avro.AvroConverter";
                inputRecords = SchemaTestUtil.generateTestRecords((int)0, (int)10);
                expectedRecords = inputRecords.stream().map(s -> HoodieAvroUtils.rewriteRecord((GenericRecord)((GenericRecord)s), (Schema)schema)).map(p -> TestAbstractConnectWriter.convertToHoodieRecords((IndexedRecord)p, p.get(0).toString(), "000/00/00")).collect(Collectors.toList());
                break;
            }
            default: {
                throw new HoodieException("Unknown test scenario " + (Object)((Object)inputFormats));
            }
        }
        this.configs = KafkaConnectConfigs.newBuilder().withProperties(Collections.singletonMap("value.converter", formatConverter)).build();
        AbstractHudiConnectWriterTestWrapper writer = new AbstractHudiConnectWriterTestWrapper(this.configs, this.keyGenerator, this.schemaProvider);
        for (int i = 0; i < 10; ++i) {
            writer.writeRecord(this.getNextKafkaRecord(inputRecords.get(i)));
        }
        TestAbstractConnectWriter.validateRecords(writer.getWrittenRecords(), expectedRecords);
    }

    private static void validateRecords(List<HoodieRecord> actualRecords, List<HoodieRecord> expectedRecords) {
        Assertions.assertEquals((int)actualRecords.size(), (int)expectedRecords.size());
        actualRecords.sort(Comparator.comparing(HoodieRecord::getRecordKey));
        expectedRecords.sort(Comparator.comparing(HoodieRecord::getRecordKey));
        Iterator<HoodieRecord> it1 = actualRecords.iterator();
        Iterator<HoodieRecord> it2 = expectedRecords.iterator();
        while (it1.hasNext()) {
            HoodieRecord t1 = it1.next();
            HoodieRecord t2 = it2.next();
            Assertions.assertEquals((Object)t1.getRecordKey(), (Object)t2.getRecordKey());
        }
    }

    private SinkRecord getNextKafkaRecord(Object record) {
        return new SinkRecord(TOPIC_NAME, 4, org.apache.kafka.connect.data.Schema.OPTIONAL_BYTES_SCHEMA, (Object)("key-" + this.currentKafkaOffset).getBytes(), org.apache.kafka.connect.data.Schema.OPTIONAL_BYTES_SCHEMA, record, this.currentKafkaOffset++);
    }

    private static HoodieRecord convertToHoodieRecords(IndexedRecord iRecord, String key, String partitionPath) {
        return new HoodieRecord(new HoodieKey(key, partitionPath), (HoodieRecordPayload)new HoodieAvroPayload(Option.of((Object)((GenericRecord)iRecord))));
    }

    static class TestSchemaProvider
    extends SchemaProvider {
        TestSchemaProvider() {
        }

        public Schema getSourceSchema() {
            try {
                return SchemaTestUtil.getSimpleSchema();
            }
            catch (IOException exception) {
                throw new HoodieException("Fatal error parsing schema", (Throwable)exception);
            }
        }
    }

    static class TestKeyGenerator
    extends KeyGenerator {
        protected TestKeyGenerator(TypedProperties config) {
            super(config);
        }

        public HoodieKey getKey(GenericRecord record) {
            return new HoodieKey(record.get(0).toString(), "000/00/00");
        }
    }

    private static enum TestInputFormats {
        AVRO,
        JSON_STRING;

    }

    private static class AbstractHudiConnectWriterTestWrapper
    extends AbstractConnectWriter {
        private List<HoodieRecord> writtenRecords = new ArrayList<HoodieRecord>();

        public AbstractHudiConnectWriterTestWrapper(KafkaConnectConfigs connectConfigs, KeyGenerator keyGenerator, SchemaProvider schemaProvider) {
            super(connectConfigs, keyGenerator, schemaProvider, "000");
        }

        public List<HoodieRecord> getWrittenRecords() {
            return this.writtenRecords;
        }

        protected void writeHudiRecord(HoodieRecord<?> record) {
            this.writtenRecords.add(record);
        }

        protected List<WriteStatus> flushRecords() {
            return null;
        }
    }
}

