/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.connect.writers;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.connect.utils.KafkaConnectUtils;
import org.apache.hudi.connect.writers.ConnectWriter;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public abstract class AbstractConnectWriter
implements ConnectWriter<WriteStatus> {
    public static final String KAFKA_AVRO_CONVERTER = "io.confluent.connect.avro.AvroConverter";
    public static final String KAFKA_JSON_CONVERTER = "org.apache.kafka.connect.json.JsonConverter";
    public static final String KAFKA_STRING_CONVERTER = "org.apache.kafka.connect.storage.StringConverter";
    private static final Logger LOG = LogManager.getLogger(AbstractConnectWriter.class);
    protected final String instantTime;
    private final KeyGenerator keyGenerator;
    private final SchemaProvider schemaProvider;
    protected final KafkaConnectConfigs connectConfigs;

    public AbstractConnectWriter(KafkaConnectConfigs connectConfigs, KeyGenerator keyGenerator, SchemaProvider schemaProvider, String instantTime) {
        this.connectConfigs = connectConfigs;
        this.keyGenerator = keyGenerator;
        this.schemaProvider = schemaProvider;
        this.instantTime = instantTime;
    }

    @Override
    public void writeRecord(SinkRecord record) throws IOException {
        Option<GenericRecord> avroRecord;
        AvroConvertor convertor = new AvroConvertor(this.schemaProvider.getSourceSchema());
        switch (this.connectConfigs.getKafkaValueConverter()) {
            case "io.confluent.connect.avro.AvroConverter": {
                avroRecord = Option.of((GenericRecord)record.value());
                break;
            }
            case "org.apache.kafka.connect.storage.StringConverter": {
                avroRecord = Option.of(convertor.fromJson((String)record.value()));
                break;
            }
            case "org.apache.kafka.connect.json.JsonConverter": {
                throw new UnsupportedEncodingException("Currently JSON objects are not supported");
            }
            default: {
                throw new IOException("Unsupported Kafka Format type (" + this.connectConfigs.getKafkaValueConverter() + ")");
            }
        }
        HoodieAvroRecord<HoodieAvroPayload> hoodieRecord = new HoodieAvroRecord<HoodieAvroPayload>(this.keyGenerator.getKey(avroRecord.get()), new HoodieAvroPayload(avroRecord));
        String fileId = KafkaConnectUtils.hashDigest(String.format("%s-%s", record.kafkaPartition(), hoodieRecord.getPartitionPath()));
        hoodieRecord.unseal();
        hoodieRecord.setCurrentLocation(new HoodieRecordLocation(this.instantTime, fileId));
        hoodieRecord.setNewLocation(new HoodieRecordLocation(this.instantTime, fileId));
        hoodieRecord.seal();
        this.writeHudiRecord(hoodieRecord);
    }

    @Override
    public List<WriteStatus> close() {
        return this.flushRecords();
    }

    protected abstract void writeHudiRecord(HoodieRecord<?> var1);

    protected abstract List<WriteStatus> flushRecords();
}

