/*
 * Decompiled with CFR 0.152.
 */
package com.logicalclocks.hsfs.spark.engine.hudi;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.engine.FeatureGroupUtils;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.avro.Conversion;
import org.apache.avro.Conversions;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DeltaStreamerAvroDeserializer
implements Deserializer<GenericRecord> {
    private static final Logger LOGGER = LoggerFactory.getLogger(DeltaStreamerAvroDeserializer.class);
    private final ObjectMapper objectMapper = new ObjectMapper();
    private String subjectId;
    private String featureGroupId;
    private Schema schema;
    private Schema encodedSchema;
    private final BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(new byte[0], null);
    private List<String> complexFeatures = null;
    private DatumReader<GenericRecord> encodedDatumReader;
    private final FeatureGroupUtils featureGroupUtils = new FeatureGroupUtils();
    private final Map<String, Schema> complexFeatureSchemas = new HashMap<String, Schema>();
    private final Map<String, DatumReader<GenericRecord>> complexFeaturesDatumReaders = new HashMap<String, DatumReader<GenericRecord>>();

    public void configure(Map<String, ?> configs, boolean isKey) {
        this.subjectId = (String)configs.get("subjectId");
        this.featureGroupId = (String)configs.get("featureGroupId");
        GenericData.get().addLogicalTypeConversion((Conversion)new Conversions.DecimalConversion());
        String featureGroupSchema = (String)configs.get("com.logicalclocks.hsfs.spark.StreamFeatureGroup.avroSchema");
        String encodedFeatureGroupSchema = configs.get("com.logicalclocks.hsfs.spark.StreamFeatureGroup.encodedAvroSchema").toString().replace("\"type\":[\"bytes\",\"null\"]", "\"type\":[\"null\",\"bytes\"]");
        String complexFeatureString = (String)configs.get("com.logicalclocks.hsfs.spark.StreamFeatureGroup.complexFeatures");
        try {
            String[] stringArray = (String[])this.objectMapper.readValue(complexFeatureString, String[].class);
            this.complexFeatures = Arrays.asList(stringArray);
        }
        catch (JsonProcessingException e) {
            throw new SerializationException("Could not deserialize complex feature array: " + complexFeatureString, (Throwable)e);
        }
        this.schema = new Schema.Parser().parse(featureGroupSchema);
        this.encodedSchema = new Schema.Parser().parse(encodedFeatureGroupSchema);
        this.encodedDatumReader = new GenericDatumReader(this.encodedSchema);
        for (String complexFeature : this.complexFeatures) {
            Schema featureSchema = null;
            try {
                featureSchema = new Schema.Parser().parse(this.featureGroupUtils.getFeatureAvroSchema(complexFeature, this.schema));
            }
            catch (FeatureStoreException | IOException e) {
                throw new SerializationException("Can't deserialize complex feature schema: " + complexFeature, e);
            }
            this.complexFeatureSchemas.put(complexFeature, featureSchema);
            this.complexFeaturesDatumReaders.put(complexFeature, (DatumReader<GenericRecord>)new GenericDatumReader(featureSchema));
        }
    }

    public GenericRecord deserialize(String topic, Headers headers, byte[] data) {
        if (this.subjectId.equals(DeltaStreamerAvroDeserializer.getHeader(headers, "subjectId")) && this.featureGroupId.equals(DeltaStreamerAvroDeserializer.getHeader(headers, "featureGroupId"))) {
            return this.deserialize(topic, data);
        }
        return null;
    }

    public GenericRecord deserialize(String topic, byte[] data) {
        GenericData.Record finalResult = new GenericData.Record(this.schema);
        GenericData.Record result = null;
        try {
            if (data != null) {
                BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, this.binaryDecoder);
                result = new GenericData.Record(this.encodedSchema);
                result = (GenericRecord)this.encodedDatumReader.read((Object)result, (Decoder)decoder);
            }
        }
        catch (Exception ex) {
            LOGGER.info("Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", (Throwable)ex);
        }
        for (String complexFeature : this.complexFeatures) {
            ByteBuffer byteBuffer = (ByteBuffer)result.get(complexFeature);
            byte[] featureData = new byte[byteBuffer.remaining()];
            byteBuffer.get(featureData);
            Schema featureSchema = this.complexFeatureSchemas.get(complexFeature);
            try {
                BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(featureData, this.binaryDecoder);
                finalResult.put(complexFeature, this.complexFeaturesDatumReaders.get(complexFeature).read(null, (Decoder)decoder));
            }
            catch (Exception ex) {
                LOGGER.info("Can't deserialize complex feature data '" + Arrays.toString(featureData) + "' from topic '" + topic + "' with schema: " + featureSchema.toString(true), (Throwable)ex);
            }
        }
        for (String feature : this.schema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList())) {
            if (this.complexFeatures.contains(feature)) continue;
            finalResult.put(feature, result.get(feature));
        }
        return finalResult;
    }

    private static String getHeader(Headers headers, String headerKey) {
        Header header = headers.lastHeader(headerKey);
        if (header != null) {
            return new String(header.value(), StandardCharsets.UTF_8);
        }
        return null;
    }

    public void close() {
    }
}

